1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.transport.fix; 21 22 import org.apache.axis2.AxisFault; 23 import org.apache.axis2.description.AxisService; 24 import org.apache.axis2.description.Parameter; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import quickfix; 28 29 import java.io.IOException; 30 import java.io.InputStream; 31 import java.net.MalformedURLException; 32 import java.net.URL; 33 import java.util.HashMap; 34 import java.util.Hashtable; 35 import java.util.Iterator; 36 import java.util.Map; 37 38 /** 39 * The FIXSessionFactory is responsible for creating and managing FIX sessions. A FIX session can be 40 * initiated in one of two modes, namely the acceptor mode and the initiator mode. FIX sessions 41 * requested by the transport listener at service deployment are created in acceptor mode. When 42 * the transport sender is about to send a FIX message it will check whether a valid FIX session exists. 43 * If not it will request the FIXSessionFactory to create a new session in the initiator mode. 44 * <p/> 45 * To create a new FIX session (in either mode) the FIXSessionFactory has to create a LogFactory (nullable), 46 * and a MessageStoreFactroy. By default this implementation attempts to pass null as the LogFactory and a 47 * MemoryStoreFactory as the MessageStoreFactory. These can be configured in the services.xml as follows. 48 * <p/> 49 * <parameter name="transport.fix.AcceptorLogger">file</parameter> 50 * (acceptable values: console, file, jdbc) 51 * <p/> 52 * <parameter name="transport.fix.AcceptorMessageStore">file</parameter> 53 * (acceptable values: file, jdbc, memory, sleepycat) 54 * <p/> 55 * The configuraion details related to these factories has to be specified in the FIX configuration file 56 * as requested by the Quickfix/J API. 57 */ 58 public class FIXSessionFactory { 59 60 /** A Map Containing all the FIX Acceptors created by this factory, keyed by the service name */ 61 private Map<String,Acceptor> acceptorStore; 62 /** A Map containing all the FIX Initiators created by this factory, keyed by FIX EPR */ 63 private Map<String,Initiator> initiatorStore; 64 /** A Map containing all the FIX application created for initiators, keyed by FIX EPR */ 65 private Map<String, Application> applicationStore; 66 /** An ApplicationFactory handles creating FIX Applications (FIXIncomingMessageHandler Objects) */ 67 private FIXApplicationFactory applicationFactory; 68 69 private Log log; 70 71 public FIXSessionFactory(FIXApplicationFactory applicationFactory) { 72 this.applicationFactory = applicationFactory; 73 this.log = LogFactory.getLog(this.getClass()); 74 this.acceptorStore = new HashMap<String,Acceptor>(); 75 this.initiatorStore = new HashMap<String, Initiator>(); 76 this.applicationStore = new HashMap<String, Application>(); 77 } 78 79 /** 80 * Get the FIX configuration settings and initialize a new FIX session for the specified 81 * service. Create an Acceptor and a new FIX Application. Put the Acceptor into the 82 * acceptorStore keyed by the service name and start it. 83 * 84 * @param service the AxisService 85 */ 86 public void createFIXAcceptor(AxisService service) { 87 88 //Try to get an InputStream to the FIX configuration file 89 InputStream fixConfigStream = getFIXConfigAsStream(service, true); 90 91 if (fixConfigStream != null) { 92 try { 93 if (log.isDebugEnabled()) { 94 log.debug ("Initializing a new FIX session for the service " + service.getName()); 95 } 96 97 SessionSettings settings = new SessionSettings(fixConfigStream); 98 MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, true); 99 MessageFactory messageFactory = new DefaultMessageFactory(); 100 quickfix.LogFactory logFactory = getLogFactory(service, settings, true); 101 //Get a new FIX Application 102 Application messageHandler = applicationFactory.getFIXApplication(service, true); 103 //Create a new FIX Acceptor 104 Acceptor acceptor = new SocketAcceptor( 105 messageHandler, 106 storeFactory, 107 settings, 108 logFactory, 109 messageFactory); 110 111 acceptorStore.put(service.getName(),acceptor); 112 acceptor.start(); 113 return; 114 } catch (ConfigError e) { 115 log.error("Error in the specified FIX configuration. Unable to initialize a " + 116 "FIX session for the service " + service.getName(), e); 117 } 118 } 119 log.error("Unable to initialize a FIX session for the service " + service.getName()); 120 } 121 122 /** 123 * Extract the parameters embedded in the given EPR and initialize a new FIX session. 124 * Create a new FIX initiator and a new FIX Application.Put the initiator into the 125 * initiatorStore keyed by the EPR and start the initiator. 126 * 127 * @param fixEPR the EPR to send FIX messages 128 * @param service the AxisService 129 * @param sessionID the SessionID of the session created 130 * @throws org.apache.axis2.AxisFault Exception thrown 131 */ 132 public void createFIXInitiator(String fixEPR, AxisService service, SessionID sessionID) throws AxisFault { 133 134 if (log.isDebugEnabled()) { 135 log.debug("Initializing a new FIX initiator for the service " + service.getName()); 136 } 137 SessionSettings settings; 138 InputStream fixConfigStream = getFIXConfigAsStream(service, false); 139 140 if (fixConfigStream == null) { 141 settings = new SessionSettings(); 142 settings.setLong(sessionID, FIXConstants.HEART_BY_INT, FIXConstants.DEFAULT_HEART_BT_INT_VALUE); 143 settings.setString(sessionID, FIXConstants.START_TIME, FIXConstants.DEFAULT_START_TIME_VALUE); 144 settings.setString(sessionID, FIXConstants.END_TIME, FIXConstants.DEFAULT_END_TIME_VALUE); 145 } else { 146 try { 147 settings = new SessionSettings(fixConfigStream); 148 } catch (ConfigError e) { 149 throw new AxisFault("Error in the specified FIX configuration. Unable to initialize a " + 150 "FIX session for the service " + service.getName(), e); 151 } 152 } 153 154 Hashtable<String,String> properties = FIXUtils.getProperties(fixEPR); 155 Iterator<String> keys = properties.keySet().iterator(); 156 while (keys.hasNext()) { 157 String currentKey = keys.next(); 158 settings.setString(sessionID, currentKey, properties.get(currentKey)); 159 } 160 161 String[] socketAddressElements = FIXUtils.getSocketAddressElements(fixEPR); 162 settings.setString(sessionID, FIXConstants.CONNECTION_TYPE, FIXConstants.FIX_INITIATOR); 163 settings.setString(sessionID, FIXConstants.SOCKET_CONNECT_HOST, socketAddressElements[0]); 164 settings.setString(sessionID, FIXConstants.SOCKET_CONNECT_PORT, socketAddressElements[1]); 165 166 quickfix.LogFactory logFactory = getLogFactory(service, settings, false); 167 MessageStoreFactory storeFactory = getMessageStoreFactory(service, settings, false); 168 MessageFactory messageFactory = new DefaultMessageFactory(); 169 //Get a new FIX application 170 Application messageHandler = applicationFactory.getFIXApplication(service, false); 171 172 try { 173 //Create a new FIX initiator 174 Initiator initiator = new SocketInitiator( 175 messageHandler, 176 storeFactory, 177 settings, 178 logFactory, 179 messageFactory); 180 181 initiatorStore.put(fixEPR, initiator); 182 applicationStore.put(fixEPR, messageHandler); 183 initiator.start(); 184 185 FIXIncomingMessageHandler fixMessageHandler = (FIXIncomingMessageHandler) messageHandler; 186 log.info("Waiting for logon procedure to complete..."); 187 fixMessageHandler.acquire(); 188 189 } catch (ConfigError e) { 190 throw new AxisFault("Error in the specified FIX configuration. Unable to initialize a " + 191 "FIX initiator.", e); 192 } catch (InterruptedException ignore) { } 193 } 194 195 /** 196 * Get the FIX Acceptor for the specified service from the sessionStore Map and 197 * stop it. Then remove the Acceptor from the Map. 198 * 199 * @param service the AxisService 200 */ 201 public void disposeFIXAcceptor(AxisService service) { 202 if (log.isDebugEnabled()) { 203 log.debug("Stopping the FIX acceptor for the service " + service.getName()); 204 } 205 //Get the Acceptor for the service 206 Acceptor acceptor = acceptorStore.get(service.getName()); 207 if (acceptor != null) { 208 //Stop the Acceptor 209 acceptor.stop(); 210 log.info("FIX session for service " + service.getName() + " terminated..."); 211 //Remove the Acceptor from the store 212 acceptorStore.remove(service.getName()); 213 } 214 } 215 216 /** 217 * Returns an array of Strings representing EPRs for the specified service 218 * 219 * @param serviceName the name of the service 220 * @param ip the IP address of the host 221 * @return an array of EPRs for the specified service 222 */ 223 public String[] getServiceEPRs(String serviceName, String ip) { 224 if (log.isDebugEnabled()) { 225 log.debug("Getting EPRs for the service " + serviceName); 226 } 227 //Get the acceptpr for the specified service 228 SocketAcceptor acceptor = (SocketAcceptor) acceptorStore.get(serviceName); 229 230 if (acceptor != null) { 231 return FIXUtils.generateEPRs(acceptor, serviceName, ip); 232 } else 233 return null; 234 } 235 236 /** 237 * Finds a FIX Acceptor for the specified service from the acceptorStore 238 * 239 * @param serviceName the name of the AxisService 240 * @return a FIX Acceptor for the service 241 */ 242 public Acceptor getAccepter(String serviceName) { 243 return acceptorStore.get(serviceName); 244 } 245 246 /** 247 * Finds a FIX initiator for the specified EPR from the initiatorStore 248 * 249 * @param fixEPR a valid FIX EPR 250 * @return a FIX initiator for the EPR 251 */ 252 public Initiator getInitiator(String fixEPR) { 253 return initiatorStore.get(fixEPR); 254 } 255 256 /** 257 * Get the FIX configuration URL from the services.xml. 258 * 259 * @param service the AxisService 260 * @param acceptor boolean value indicating the FIX application type 261 * @return an InputStream to the FIX configuration file/resource 262 */ 263 private InputStream getFIXConfigAsStream(AxisService service, boolean acceptor) { 264 InputStream fixConfigStream = null; 265 Parameter fixConfigURLParam; 266 267 if (acceptor) { 268 fixConfigURLParam = service.getParameter(FIXConstants.FIX_ACCEPTOR_CONFIG_URL_PARAM); 269 } else { 270 fixConfigURLParam = service.getParameter(FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM); 271 } 272 273 if (fixConfigURLParam != null) { 274 String fixConfigURLValue = fixConfigURLParam.getValue().toString(); 275 try { 276 URL url = new URL(fixConfigURLValue); 277 fixConfigStream = url.openStream(); 278 } catch (MalformedURLException e) { 279 log.error("The FIX configuration URL " + fixConfigURLValue + " is" + 280 " malformed.", e); 281 } catch (IOException e) { 282 log.error("Error while reading from the URL " + fixConfigURLValue, e); 283 } 284 } else { 285 log.error("FIX configuration URL is not specified for the service " + service.getName()); 286 } 287 288 return fixConfigStream; 289 } 290 291 /** 292 * Creates a Quickfix LogFactory object for logging as specified in the services.xml and 293 * the FIX configuration file. Default is null. 294 * 295 * @param service the AxisService 296 * @param settings SessionSettings to be used with the service 297 * @param acceptor a boolean value indicating the type of the FIX application 298 * @return a LogFactory for the FIX application 299 */ 300 private quickfix.LogFactory getLogFactory(AxisService service, SessionSettings settings, 301 boolean acceptor) { 302 303 quickfix.LogFactory logFactory = null; 304 Parameter fixLogMethod; 305 306 //Read the parameter from the services.xml 307 if (acceptor) { 308 fixLogMethod = service.getParameter(FIXConstants.FIX_ACCEPTOR_LOGGER_PARAM); 309 } else { 310 fixLogMethod = service.getParameter(FIXConstants.FIX_INITIATOR_LOGGER_PARAM); 311 } 312 313 if (fixLogMethod != null) { 314 String method = fixLogMethod.getValue().toString(); 315 log.info("FIX logging method = " + method); 316 317 if (FIXConstants.FILE_BASED_MESSAGE_LOGGING.equals(method)) { 318 logFactory = new FileLogFactory(settings); 319 } else if (FIXConstants.JDBC_BASED_MESSAGE_LOGGING.equals(method)) { 320 logFactory = new JdbcLogFactory(settings); 321 } else if (FIXConstants.CONSOLE_BASED_MESSAGE_LOGGING.equals(method)) { 322 logFactory = new ScreenLogFactory(); 323 } else { 324 log.warn("Invalid acceptor log method " + method + ". Using defaults."); 325 } 326 } 327 return logFactory; 328 } 329 330 /** 331 * Creates a Quickfix MessageStoreFactory for storing FIX messages as specified in the services.xml 332 * and the FIX configuration file. Default is FileStoreFactory. 333 * 334 * @param service the AxisService 335 * @param settings SessionSettings to be used with the service 336 * @param acceptor a boolean value indicating the type of the FIX application 337 * @return a MessageStoreFactory for the FIX application 338 */ 339 private MessageStoreFactory getMessageStoreFactory(AxisService service, SessionSettings settings, 340 boolean acceptor) { 341 342 MessageStoreFactory storeFactory = new MemoryStoreFactory(); 343 Parameter msgLogMethod; 344 345 //Read the parameter from the services.xml 346 if (acceptor) { 347 msgLogMethod = service.getParameter(FIXConstants.FIX_ACCEPTOR_MESSAGE_STORE_PARAM); 348 } else { 349 msgLogMethod = service.getParameter(FIXConstants.FIX_INITIATOR_MESSAGE_STORE_PARAM); 350 } 351 352 if (msgLogMethod != null) { 353 String method = msgLogMethod.getValue().toString(); 354 log.info("FIX message logging method = " + method); 355 356 if (FIXConstants.JDBC_BASED_MESSAGE_STORE.equals(method)) { 357 storeFactory = new JdbcStoreFactory(settings); 358 } else if (FIXConstants.SLEEPYCAT_BASED_MESSAGE_STORE.equals(method)) { 359 storeFactory = new SleepycatStoreFactory(settings); 360 } else if (FIXConstants.FILE_BASED_MESSAGE_STORE.equals(method)) { 361 storeFactory = new FileStoreFactory(settings); 362 } else if (!FIXConstants.MEMORY_BASED_MESSAGE_STORE.equals(method)) { 363 log.warn("Invalid message store " + method + ". Using defaults."); 364 } 365 } 366 367 return storeFactory; 368 } 369 370 public Application getApplication(String fixEPR) { 371 return applicationStore.get(fixEPR); 372 } 373 }