Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.transport.fix;
   21   
   22   import org.apache.axis2.AxisFault;
   23   import org.apache.axis2.description.AxisService;
   24   import org.apache.axis2.description.Parameter;
   25   import org.apache.commons.logging.Log;
   26   import org.apache.commons.logging.LogFactory;
   27   import quickfix;
   28   
   29   import java.io.IOException;
   30   import java.io.InputStream;
   31   import java.net.MalformedURLException;
   32   import java.net.URL;
   33   import java.util.HashMap;
   34   import java.util.Hashtable;
   35   import java.util.Iterator;
   36   import java.util.Map;
   37   
   38   /**
   39    * The FIXSessionFactory is responsible for creating and managing FIX sessions. A FIX session can be
   40    * initiated in one of two modes, namely the acceptor mode and the initiator mode. FIX sessions
   41    * requested by the transport listener at service deployment are created in acceptor mode. When
   42    * the transport sender is about to send a FIX message it will check whether a valid FIX session exists.
   43    * If not it will request the FIXSessionFactory to create a new session in the initiator mode.
   44    * <p/>
   45    * To create a new FIX session (in either mode) the FIXSessionFactory has to create a LogFactory (nullable),
   46    * and a MessageStoreFactroy. By default this implementation attempts to pass null as the LogFactory and a
   47    * MemoryStoreFactory as the MessageStoreFactory. These can be configured in the services.xml as follows.
   48    * <p/>
   49    * <parameter name="transport.fix.AcceptorLogger">file</parameter>
   50    * (acceptable values: console, file, jdbc)
   51    * <p/>
   52    * <parameter name="transport.fix.AcceptorMessageStore">file</parameter>
   53    * (acceptable values: file, jdbc, memory, sleepycat)
   54    * <p/>
   55    * The configuraion details related to these factories has to be specified in the FIX configuration file
   56    * as requested by the Quickfix/J API.
   57    */
   58   public class FIXSessionFactory {
   59   
   60       /** A Map Containing all the FIX Acceptors created by this factory, keyed by the service name */
   61       private Map<String,Acceptor> acceptorStore;
   62       /** A Map containing all the FIX Initiators created by this factory, keyed by FIX EPR */
   63       private Map<String,Initiator> initiatorStore;
   64       /** A Map containing all the FIX application created for initiators, keyed by FIX EPR */
   65       private Map<String, Application> applicationStore;
   66       /** An ApplicationFactory handles creating FIX Applications (FIXIncomingMessageHandler Objects) */
   67       private FIXApplicationFactory applicationFactory;
   68   
   69       private Log log;
   70   
   71       public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
   72           this.applicationFactory = applicationFactory;
   73           this.log = LogFactory.getLog(this.getClass());
   74           this.acceptorStore = new HashMap<String,Acceptor>();
   75           this.initiatorStore = new HashMap<String, Initiator>();
   76           this.applicationStore = new HashMap<String, Application>();
   77       }
   78   
   79       /**
   80        * Get the FIX configuration settings and initialize a new FIX session for the specified
   81        * service. Create an Acceptor and a new FIX Application. Put the Acceptor into the
   82        * acceptorStore keyed by the service name and start it.
   83        *
   84        * @param service the AxisService
   85        */
   86       public void createFIXAcceptor(AxisService service) {
   87   
   88           //Try to get an InputStream to the FIX configuration file
   89           InputStream fixConfigStream = getFIXConfigAsStream(service, true);
   90   
   91           if (fixConfigStream != null) {
   92               try {
   93                   if (log.isDebugEnabled()) {
   94                       log.debug ("Initializing a new FIX session for the service " + service.getName());
   95                   }
   96   
   97                   SessionSettings settings = new SessionSettings(fixConfigStream);
   98                   MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, true);
   99                   MessageFactory messageFactory = new DefaultMessageFactory();
  100                   quickfix.LogFactory logFactory = getLogFactory(service, settings, true);
  101                   //Get a new FIX Application
  102                   Application messageHandler = applicationFactory.getFIXApplication(service, true);
  103                   //Create a new FIX Acceptor
  104                   Acceptor acceptor = new SocketAcceptor(
  105                           messageHandler,
  106                           storeFactory,
  107                           settings,
  108                           logFactory,
  109                           messageFactory);
  110   
  111                   acceptorStore.put(service.getName(),acceptor);
  112                   acceptor.start();
  113                   return;
  114               } catch (ConfigError e) {
  115                   log.error("Error in the specified FIX configuration. Unable to initialize a " +
  116                           "FIX session for the service " + service.getName(), e);
  117               }
  118           }
  119           log.error("Unable to initialize a FIX session for the service " + service.getName());
  120       }
  121   
  122       /**
  123        * Extract the parameters embedded in the given EPR and initialize a new FIX session.
  124        * Create a new FIX initiator and a new FIX Application.Put the initiator into the
  125        * initiatorStore keyed by the EPR and start the initiator.
  126        *
  127        * @param fixEPR the EPR to send FIX messages
  128        * @param service the AxisService
  129        * @param sessionID the SessionID of the session created
  130        * @throws org.apache.axis2.AxisFault Exception thrown
  131        */
  132       public void createFIXInitiator(String fixEPR, AxisService service, SessionID sessionID) throws AxisFault {
  133   
  134           if (log.isDebugEnabled()) {
  135               log.debug("Initializing a new FIX initiator for the service " + service.getName());
  136           }
  137           SessionSettings settings;
  138           InputStream fixConfigStream = getFIXConfigAsStream(service, false);
  139   
  140           if (fixConfigStream == null) {
  141               settings = new SessionSettings();
  142               settings.setLong(sessionID, FIXConstants.HEART_BY_INT, FIXConstants.DEFAULT_HEART_BT_INT_VALUE);
  143               settings.setString(sessionID, FIXConstants.START_TIME, FIXConstants.DEFAULT_START_TIME_VALUE);
  144               settings.setString(sessionID, FIXConstants.END_TIME, FIXConstants.DEFAULT_END_TIME_VALUE);
  145           } else {
  146               try {
  147                   settings = new SessionSettings(fixConfigStream);
  148               } catch (ConfigError e) {
  149                   throw new AxisFault("Error in the specified FIX configuration. Unable to initialize a " +
  150                           "FIX session for the service " + service.getName(), e);
  151               }
  152           }
  153   
  154           Hashtable<String,String> properties = FIXUtils.getProperties(fixEPR);
  155           Iterator<String> keys = properties.keySet().iterator();
  156           while (keys.hasNext()) {
  157               String currentKey = keys.next();
  158               settings.setString(sessionID, currentKey, properties.get(currentKey));
  159           }
  160   
  161           String[] socketAddressElements = FIXUtils.getSocketAddressElements(fixEPR);
  162           settings.setString(sessionID, FIXConstants.CONNECTION_TYPE, FIXConstants.FIX_INITIATOR);
  163           settings.setString(sessionID, FIXConstants.SOCKET_CONNECT_HOST, socketAddressElements[0]);
  164           settings.setString(sessionID, FIXConstants.SOCKET_CONNECT_PORT, socketAddressElements[1]);
  165   
  166           quickfix.LogFactory logFactory = getLogFactory(service, settings, false);
  167           MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, false);
  168           MessageFactory messageFactory = new DefaultMessageFactory();
  169           //Get a new FIX application
  170           Application messageHandler = applicationFactory.getFIXApplication(service, false);
  171   
  172           try {
  173              //Create a new FIX initiator
  174               Initiator initiator = new SocketInitiator(
  175                       messageHandler,
  176                       storeFactory,
  177                       settings,
  178                       logFactory,
  179                       messageFactory);
  180   
  181               initiatorStore.put(fixEPR, initiator);
  182               applicationStore.put(fixEPR, messageHandler);
  183               initiator.start();
  184   
  185               FIXIncomingMessageHandler fixMessageHandler = (FIXIncomingMessageHandler) messageHandler;
  186               log.info("Waiting for logon procedure to complete...");
  187               fixMessageHandler.acquire();
  188   
  189           } catch (ConfigError e) {
  190               throw new AxisFault("Error in the specified FIX configuration. Unable to initialize a " +
  191                       "FIX initiator.", e);
  192           } catch (InterruptedException ignore) { }
  193       }
  194   
  195       /**
  196        * Get the FIX Acceptor for the specified service from the sessionStore Map and
  197        * stop it. Then remove the Acceptor from the Map.
  198        *
  199        * @param service the AxisService
  200        */
  201       public void disposeFIXAcceptor(AxisService service) {
  202           if (log.isDebugEnabled()) {
  203               log.debug("Stopping the FIX acceptor for the service " + service.getName());
  204           }
  205           //Get the Acceptor for the service
  206           Acceptor acceptor = acceptorStore.get(service.getName());
  207           if (acceptor != null) {
  208               //Stop the Acceptor
  209               acceptor.stop();
  210               log.info("FIX session for service " + service.getName() + " terminated...");
  211               //Remove the Acceptor from the store
  212               acceptorStore.remove(service.getName());
  213           }
  214       }
  215   
  216       /**
  217        * Returns an array of Strings representing EPRs for the specified service
  218        *
  219        * @param serviceName the name of the service
  220        * @param ip the IP address of the host
  221        * @return an array of EPRs for the specified service
  222        */
  223       public String[] getServiceEPRs(String serviceName, String ip) {
  224           if (log.isDebugEnabled()) {
  225               log.debug("Getting EPRs for the service " + serviceName);
  226           }
  227           //Get the acceptpr for the specified service
  228           SocketAcceptor acceptor = (SocketAcceptor) acceptorStore.get(serviceName);
  229   
  230           if (acceptor != null) {
  231               return FIXUtils.generateEPRs(acceptor, serviceName, ip);
  232           } else
  233               return null;
  234       }
  235   
  236       /**
  237        * Finds a FIX Acceptor for the specified service from the acceptorStore
  238        *
  239        * @param serviceName the name of the AxisService
  240        * @return a FIX Acceptor for the service
  241        */
  242       public Acceptor getAccepter(String serviceName) {
  243           return acceptorStore.get(serviceName);
  244       }
  245   
  246       /**
  247        * Finds a FIX initiator for the specified EPR from the initiatorStore
  248        *
  249        * @param fixEPR a valid FIX EPR
  250        * @return  a FIX initiator for the EPR
  251        */
  252      public Initiator getInitiator(String fixEPR) {
  253           return initiatorStore.get(fixEPR);
  254       }
  255   
  256       /**
  257        * Get the FIX configuration URL from the services.xml.
  258        *
  259        * @param service the AxisService
  260        * @param acceptor boolean value indicating the FIX application type
  261        * @return an InputStream to the FIX configuration file/resource
  262        */
  263       private InputStream getFIXConfigAsStream(AxisService service, boolean acceptor) {
  264           InputStream fixConfigStream = null;
  265           Parameter fixConfigURLParam;
  266   
  267           if (acceptor) {
  268               fixConfigURLParam = service.getParameter(FIXConstants.FIX_ACCEPTOR_CONFIG_URL_PARAM);
  269           } else {
  270               fixConfigURLParam = service.getParameter(FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM);
  271           }
  272   
  273           if (fixConfigURLParam != null) {
  274               String fixConfigURLValue = fixConfigURLParam.getValue().toString();
  275               try {
  276                   URL url = new URL(fixConfigURLValue);
  277                   fixConfigStream = url.openStream();
  278               } catch (MalformedURLException e) {
  279                   log.error("The FIX configuration URL " + fixConfigURLValue + " is" +
  280                          " malformed.", e);
  281               } catch (IOException e) {
  282                   log.error("Error while reading from the URL " + fixConfigURLValue, e);
  283               }
  284           } else {
  285               log.error("FIX configuration URL is not specified for the service " + service.getName());
  286           }
  287   
  288           return fixConfigStream;
  289       }
  290   
  291       /**
  292        * Creates a Quickfix LogFactory object for logging as specified in the services.xml and
  293        * the FIX configuration file. Default is null.
  294        *
  295        * @param service the AxisService
  296        * @param settings SessionSettings to be used with the service
  297        * @param acceptor a boolean value indicating the type of the FIX application
  298        * @return a LogFactory for the FIX application
  299        */
  300       private quickfix.LogFactory getLogFactory(AxisService service, SessionSettings settings,
  301                                                 boolean acceptor) {
  302   
  303           quickfix.LogFactory logFactory = null;
  304           Parameter fixLogMethod;
  305   
  306           //Read the parameter from the services.xml
  307           if (acceptor) {
  308              fixLogMethod = service.getParameter(FIXConstants.FIX_ACCEPTOR_LOGGER_PARAM);
  309           } else {
  310               fixLogMethod = service.getParameter(FIXConstants.FIX_INITIATOR_LOGGER_PARAM);
  311           }
  312   
  313           if (fixLogMethod != null) {
  314                  String method = fixLogMethod.getValue().toString();
  315                   log.info("FIX logging method = " + method);
  316   
  317                   if (FIXConstants.FILE_BASED_MESSAGE_LOGGING.equals(method)) {
  318                       logFactory = new FileLogFactory(settings);
  319                   } else if (FIXConstants.JDBC_BASED_MESSAGE_LOGGING.equals(method)) {
  320                       logFactory = new JdbcLogFactory(settings);
  321                   } else if (FIXConstants.CONSOLE_BASED_MESSAGE_LOGGING.equals(method)) {
  322                       logFactory = new ScreenLogFactory();
  323                   } else {
  324                       log.warn("Invalid acceptor log method " + method + ". Using defaults.");
  325                   }
  326           }
  327           return logFactory;
  328       }
  329   
  330       /**
  331        * Creates a Quickfix MessageStoreFactory for storing FIX messages as specified in the services.xml
  332        * and the FIX configuration file. Default is FileStoreFactory.
  333        *
  334        * @param service the AxisService
  335        * @param settings SessionSettings to be used with the service
  336        * @param acceptor a boolean value indicating the type of the FIX application
  337        * @return a MessageStoreFactory for the FIX application
  338        */
  339       private MessageStoreFactory getMessageStoreFactory(AxisService service, SessionSettings settings,
  340                                                          boolean acceptor) {
  341   
  342           MessageStoreFactory storeFactory = new MemoryStoreFactory();
  343           Parameter msgLogMethod;
  344   
  345           //Read the parameter from the services.xml
  346           if (acceptor) {
  347               msgLogMethod = service.getParameter(FIXConstants.FIX_ACCEPTOR_MESSAGE_STORE_PARAM);
  348           } else {
  349               msgLogMethod = service.getParameter(FIXConstants.FIX_INITIATOR_MESSAGE_STORE_PARAM);
  350           }
  351   
  352           if (msgLogMethod != null) {
  353               String method = msgLogMethod.getValue().toString();
  354               log.info("FIX message logging method = " + method);
  355   
  356               if (FIXConstants.JDBC_BASED_MESSAGE_STORE.equals(method)) {
  357                   storeFactory = new JdbcStoreFactory(settings);
  358               } else if (FIXConstants.SLEEPYCAT_BASED_MESSAGE_STORE.equals(method)) {
  359                   storeFactory = new SleepycatStoreFactory(settings);
  360               } else if (FIXConstants.FILE_BASED_MESSAGE_STORE.equals(method)) {
  361                   storeFactory = new FileStoreFactory(settings);
  362               } else if (!FIXConstants.MEMORY_BASED_MESSAGE_STORE.equals(method)) {
  363                   log.warn("Invalid message store " + method + ". Using defaults.");
  364               }
  365           }
  366   
  367           return storeFactory;
  368       }
  369       
  370       public Application getApplication(String fixEPR) {
  371           return applicationStore.get(fixEPR);
  372       }
  373   }

Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]