1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.transport.fix; 21 22 import org.apache.axis2.AxisFault; 23 import org.apache.axis2.Constants; 24 import org.apache.axis2.context.ConfigurationContext; 25 import org.apache.axis2.context.MessageContext; 26 import org.apache.axis2.description.AxisOperation; 27 import org.apache.axis2.description.AxisService; 28 import org.apache.axis2.description.Parameter; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.synapse.transport.base.AbstractTransportListener; 32 import org.apache.synapse.transport.base.AbstractTransportSender; 33 import org.apache.synapse.transport.base.BaseConstants; 34 import org.apache.synapse.transport.base.BaseUtils; 35 import org.apache.synapse.transport.base.threads.WorkerPool; 36 import quickfix; 37 import quickfix.field.MsgSeqNum; 38 import quickfix.field.MsgType; 39 import quickfix.field.SenderCompID; 40 import quickfix.field.TargetCompID; 41 42 import javax.xml.namespace.QName; 43 import java.util.HashMap; 44 import java.util.Map; 45 import java.util.Queue; 46 import java.util.concurrent.LinkedBlockingQueue; 47 import java.util.concurrent.Semaphore; 48 49 /** 50 * FIXIncomingMessageHandler is responsible for handling all incoming FIX messages. This is where the 51 * Quickfix/J engine meets Synapse core. Admin level FIX messages are handled by Quickfix/J itself. 52 * All the application level messages are handed over to the Synapse core. 53 */ 54 public class FIXIncomingMessageHandler implements Application { 55 56 private ConfigurationContext cfgCtx; 57 /** A thread pool used to process incoming FIX messages */ 58 private WorkerPool workerPool; 59 /** AxisService to which this FIX application is bound to */ 60 private AxisService service; 61 private Log log; 62 /** A boolean value indicating the type of the FIX application */ 63 private boolean acceptor; 64 /** A Map of counters with one counter per session */ 65 private Map<SessionID, Integer> countersMap; 66 private Queue<MessageContext> outgoingMessages; 67 private boolean allNewApproach; 68 private Semaphore semaphore; 69 70 public FIXIncomingMessageHandler(ConfigurationContext cfgCtx, WorkerPool workerPool, 71 AxisService service, boolean acceptor) { 72 this.cfgCtx = cfgCtx; 73 this.workerPool = workerPool; 74 this.service = service; 75 this.log = LogFactory.getLog(this.getClass()); 76 this.acceptor = acceptor; 77 countersMap = new HashMap<SessionID, Integer>(); 78 outgoingMessages = new LinkedBlockingQueue<MessageContext>(); 79 semaphore = new Semaphore(0); 80 getResponseHandlingApproach(); 81 } 82 83 private void getResponseHandlingApproach() { 84 Parameter param = service.getParameter(FIXConstants.FIX_RESPONSE_HANDLER_APPROACH); 85 if (param != null) { 86 if ("false".equals(param.getValue().toString())) { 87 allNewApproach = false; 88 return; 89 } 90 } 91 allNewApproach = true; 92 } 93 94 public void setOutgoingMessageContext(MessageContext msgCtx) { 95 if (!allNewApproach) { 96 outgoingMessages.offer(msgCtx); 97 } 98 } 99 100 public void acquire() throws InterruptedException { 101 semaphore.acquire(); 102 } 103 104 private void handleException(String msg, Exception e) { 105 log.error(msg, e); 106 throw new AxisFIXException(msg, e); 107 } 108 109 /** 110 * This method is called when quickfix creates a new session. A session 111 * comes into and remains in existence for the life of the application. 112 * Sessions exist whether or not a counter party is connected to it. As soon 113 * as a session is created, the application can begin sending messages to it. If no one 114 * is logged on, the messages will be sent at the time a connection is 115 * established with the counterparty. 116 * 117 * @param sessionID QuickFIX session ID 118 */ 119 public void onCreate(SessionID sessionID) { 120 log.info("New FIX session created: " + sessionID.toString()); 121 } 122 123 /** 124 * This callback notifies when a valid logon has been established with a 125 * counter party. This is called when a connection has been established and 126 * the FIX logon process has completed with both parties exchanging valid 127 * logon messages. 128 * 129 * @param sessionID QuickFIX session ID 130 */ 131 public void onLogon(SessionID sessionID) { 132 countersMap.put(sessionID, 0); 133 log.info("FIX session logged on: " + sessionID.toString()); 134 semaphore.release(); 135 } 136 137 /** 138 * This callback notifies when a FIX session is no longer online. This 139 * could happen during a normal logout exchange or because of a forced 140 * termination or a loss of network connection. 141 * 142 * @param sessionID QuickFIX session ID 143 */ 144 public void onLogout(SessionID sessionID) { 145 countersMap.put(sessionID, 0); 146 FIXTransportSender trpSender = (FIXTransportSender) cfgCtx.getAxisConfiguration(). 147 getTransportOut(FIXConstants.TRANSPORT_NAME).getSender(); 148 trpSender.logOutIncomingSession(sessionID); 149 log.info("FIX session logged out: " + sessionID.toString()); 150 } 151 152 /** 153 * This callback provides Synapse with a peek at the administrative messages 154 * that are being sent from your FIX engine to the counter party. This is 155 * normally not useful for an application however it is provided for any 156 * logging one may wish to do. 157 * 158 * @param message QuickFIX message 159 * @param sessionID QuickFIX session ID 160 */ 161 public void toAdmin(Message message, SessionID sessionID) { 162 if (log.isDebugEnabled()) { 163 StringBuffer sb = new StringBuffer(); 164 try { 165 sb.append("Sending admin level FIX message to ").append(message.getHeader().getField(new TargetCompID()).getValue()); 166 sb.append("\nMessage Type: ").append(message.getHeader().getField(new MsgType()).getValue()); 167 sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue()); 168 sb.append("\nSender ID: ").append(message.getHeader().getField(new SenderCompID()).getValue()); 169 } catch (FieldNotFound e) { 170 sb.append("Sending admin level FIX message..."); 171 log.warn("One or more required fields are not found in the response message", e); 172 } 173 log.debug(sb.toString()); 174 if (log.isTraceEnabled()) { 175 log.trace("Message: " + message.toString()); 176 } 177 } 178 } 179 180 /** 181 * This callback notifies when an administrative message is sent from a 182 * counterparty to the FIX engine. 183 * 184 * @param message QuickFIX message 185 * @param sessionID QuickFIX session ID 186 * @throws FieldNotFound 187 * @throws IncorrectDataFormat 188 * @throws IncorrectTagValue 189 * @throws RejectLogon causes a logon reject 190 */ 191 public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, 192 IncorrectDataFormat, IncorrectTagValue, RejectLogon { 193 194 if (log.isDebugEnabled()) { 195 StringBuffer sb = new StringBuffer(); 196 sb.append("Received admin level FIX message from ").append(message.getHeader().getField(new SenderCompID()).getValue()); 197 sb.append("\nMessage Type: ").append(message.getHeader().getField(new MsgType()).getValue()); 198 sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue()); 199 sb.append("\nReceiver ID: ").append(message.getHeader().getField(new TargetCompID()).getValue()); 200 log.debug(sb.toString()); 201 if (log.isTraceEnabled()) { 202 log.trace("Message: " + message.toString()); 203 } 204 } 205 } 206 207 /** 208 * This is a callback for application messages that are being sent to a 209 * counterparty. 210 * 211 * @param message QuickFIX message 212 * @param sessionID QuickFIX session ID 213 * @throws DoNotSend This exception aborts message transmission 214 */ 215 public void toApp(Message message, SessionID sessionID) throws DoNotSend { 216 if (log.isDebugEnabled()) { 217 StringBuffer sb = new StringBuffer(); 218 try { 219 sb.append("Sending application level FIX message to ").append(message.getHeader().getField(new TargetCompID()).getValue()); 220 sb.append("\nMessage Type: ").append(message.getHeader().getField(new MsgType()).getValue()); 221 sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue()); 222 sb.append("\nSender ID: ").append(message.getHeader().getField(new SenderCompID()).getValue()); 223 } catch (FieldNotFound e) { 224 sb.append("Sending application level FIX message..."); 225 log.warn("One or more required fields are not found in the response message", e); 226 } 227 log.debug(sb.toString()); 228 if (log.isTraceEnabled()) { 229 log.trace("Message: " + message.toString()); 230 } 231 } 232 } 233 234 /** 235 * This callback receives messages for the application. This is one of the 236 * core entry points for the FIX application. Every application level 237 * request will come through here. A new thread will be spawned from the 238 * thread pool for each incoming message. 239 * 240 * @param message QuickFIX message 241 * @param sessionID QuickFIX session ID 242 * @throws FieldNotFound 243 * @throws IncorrectDataFormat 244 * @throws IncorrectTagValue 245 * @throws UnsupportedMessageType 246 */ 247 public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, 248 IncorrectTagValue, UnsupportedMessageType { 249 if (log.isDebugEnabled()) { 250 StringBuffer sb = new StringBuffer(); 251 sb.append("Received FIX message from ").append(message.getHeader().getField(new SenderCompID()).getValue()); 252 sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue()); 253 sb.append("\nReceiver ID: ").append(message.getHeader().getField(new TargetCompID()).getValue()); 254 log.debug(sb.toString()); 255 if (log.isTraceEnabled()) { 256 log.trace("Message: " + message.toString()); 257 } 258 } 259 260 int counter = countersMap.get(sessionID); 261 counter++; 262 countersMap.put(sessionID, counter); 263 264 workerPool.execute(new FIXWorkerThread(message, sessionID, counter)); 265 } 266 267 /** 268 * This Runnable class can be used when it is required to process each incoming message 269 * using separate threads. 270 */ 271 class FIXWorkerThread implements Runnable { 272 273 private Message message; 274 private SessionID sessionID; 275 private int counter; 276 277 public FIXWorkerThread(Message message, SessionID sessionID, int counter) { 278 this.message = message; 279 this.sessionID = sessionID; 280 this.counter = counter; 281 } 282 283 private void handleIncomingRequest() { 284 //Create message context for the incmong message 285 AbstractTransportListener trpListener = (AbstractTransportListener) cfgCtx.getAxisConfiguration(). 286 getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver(); 287 288 MessageContext msgCtx = trpListener.createMessageContext(); 289 msgCtx.setProperty(Constants.OUT_TRANSPORT_INFO, new FIXOutTransportInfo(sessionID)); 290 291 if (service != null) { 292 // Set the service for which the message is intended to 293 msgCtx.setAxisService(service); 294 // find the operation for the message, or default to one 295 Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); 296 QName operationQName = ( 297 operationParam != null ? 298 BaseUtils.getQNameFromString(operationParam.getValue()) : 299 BaseConstants.DEFAULT_OPERATION); 300 301 AxisOperation operation = service.getOperation(operationQName); 302 if (operation != null) { 303 msgCtx.setAxisOperation(operation); 304 msgCtx.setSoapAction("urn:" + operation.getName().getLocalPart()); 305 } 306 } 307 308 String fixApplication = FIXConstants.FIX_INITIATOR; 309 if (acceptor) { 310 fixApplication = FIXConstants.FIX_ACCEPTOR; 311 } 312 else { 313 msgCtx.setProperty("synapse.isresponse", true); 314 } 315 316 try { 317 //Put the FIX message in a SOAPEnvelope 318 FIXUtils.getInstance().setSOAPEnvelope(message, counter, sessionID.toString(), msgCtx); 319 trpListener.handleIncomingMessage( 320 msgCtx, 321 FIXUtils.getTransportHeaders(service.getName(), fixApplication), 322 null, 323 FIXConstants.FIX_DEFAULT_CONTENT_TYPE 324 ); 325 } catch (AxisFault e) { 326 handleException("Error while processing FIX message", e); 327 } 328 } 329 330 private void handleIncomingResponse(MessageContext outMsgCtx) { 331 AbstractTransportSender trpSender = (AbstractTransportSender) cfgCtx.getAxisConfiguration(). 332 getTransportOut(FIXConstants.TRANSPORT_NAME).getSender(); 333 334 MessageContext msgCtx = trpSender.createResponseMessageContext(outMsgCtx); 335 336 try { 337 //Put the FIX message in a SOAPEnvelope 338 FIXUtils.getInstance().setSOAPEnvelope(message, counter, sessionID.toString(), msgCtx); 339 msgCtx.setServerSide(true); 340 trpSender.handleIncomingMessage( 341 msgCtx, 342 FIXUtils.getTransportHeaders(service.getName(), FIXConstants.FIX_INITIATOR), 343 null, 344 FIXConstants.FIX_DEFAULT_CONTENT_TYPE 345 ); 346 } catch (AxisFault e) { 347 handleException("Error while processing response FIX message", e); 348 } 349 } 350 351 public void run() { 352 353 if (allNewApproach) { 354 //treat all messages (including responses) as new messages 355 handleIncomingRequest(); 356 } 357 else { 358 if (acceptor) { 359 //treat messages coming from an acceptor as new request messages 360 handleIncomingRequest(); 361 } 362 else { 363 MessageContext outMsgCtx = outgoingMessages.poll(); 364 if (outMsgCtx != null) { 365 //handle as a response to an outgoing message 366 handleIncomingResponse(outMsgCtx); 367 } 368 else { 369 //handle as a new request message 370 handleIncomingRequest(); 371 } 372 } 373 } 374 } 375 376 } 377 378 }