Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]

    1   /*
    2   *  Licensed to the Apache Software Foundation (ASF) under one
    3   *  or more contributor license agreements.  See the NOTICE file
    4   *  distributed with this work for additional information
    5   *  regarding copyright ownership.  The ASF licenses this file
    6   *  to you under the Apache License, Version 2.0 (the
    7   *  "License"); you may not use this file except in compliance
    8   *  with the License.  You may obtain a copy of the License at
    9   *
   10   *   http://www.apache.org/licenses/LICENSE-2.0
   11   *
   12   *  Unless required by applicable law or agreed to in writing,
   13   *  software distributed under the License is distributed on an
   14   *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15   *  KIND, either express or implied.  See the License for the
   16   *  specific language governing permissions and limitations
   17   *  under the License.
   18   */
   19   
   20   package org.apache.synapse.transport.fix;
   21   
   22   import org.apache.axis2.AxisFault;
   23   import org.apache.axis2.Constants;
   24   import org.apache.axis2.context.ConfigurationContext;
   25   import org.apache.axis2.context.MessageContext;
   26   import org.apache.axis2.description.AxisOperation;
   27   import org.apache.axis2.description.AxisService;
   28   import org.apache.axis2.description.Parameter;
   29   import org.apache.commons.logging.Log;
   30   import org.apache.commons.logging.LogFactory;
   31   import org.apache.synapse.transport.base.AbstractTransportListener;
   32   import org.apache.synapse.transport.base.AbstractTransportSender;
   33   import org.apache.synapse.transport.base.BaseConstants;
   34   import org.apache.synapse.transport.base.BaseUtils;
   35   import org.apache.synapse.transport.base.threads.WorkerPool;
   36   import quickfix;
   37   import quickfix.field.MsgSeqNum;
   38   import quickfix.field.MsgType;
   39   import quickfix.field.SenderCompID;
   40   import quickfix.field.TargetCompID;
   41   
   42   import javax.xml.namespace.QName;
   43   import java.util.HashMap;
   44   import java.util.Map;
   45   import java.util.Queue;
   46   import java.util.concurrent.LinkedBlockingQueue;
   47   import java.util.concurrent.Semaphore;
   48   
   49   /**
   50    * FIXIncomingMessageHandler is responsible for handling all incoming FIX messages. This is where the
   51    * Quickfix/J engine meets Synapse core. Admin level FIX messages are handled by Quickfix/J itself.
   52    * All the application level messages are handed over to the Synapse core.
   53    */
   54   public class FIXIncomingMessageHandler implements Application {
   55   
   56       private ConfigurationContext cfgCtx;
   57       /** A thread pool used to process incoming FIX messages */
   58       private WorkerPool workerPool;
   59       /** AxisService to which this FIX application is bound to */
   60       private AxisService service;
   61       private Log log;
   62       /** A boolean value indicating the type of the FIX application */
   63       private boolean acceptor;
   64       /** A Map of counters with one counter per session */
   65       private Map<SessionID, Integer> countersMap;
   66       private Queue<MessageContext> outgoingMessages;
   67       private boolean allNewApproach;
   68       private Semaphore semaphore;
   69   
   70       public FIXIncomingMessageHandler(ConfigurationContext cfgCtx, WorkerPool workerPool,
   71                                AxisService service, boolean acceptor) {
   72           this.cfgCtx = cfgCtx;
   73           this.workerPool = workerPool;
   74           this.service = service;
   75           this.log = LogFactory.getLog(this.getClass());
   76           this.acceptor = acceptor;
   77           countersMap = new HashMap<SessionID, Integer>();
   78           outgoingMessages = new LinkedBlockingQueue<MessageContext>();
   79           semaphore = new Semaphore(0);
   80           getResponseHandlingApproach();
   81       }
   82   
   83       private void getResponseHandlingApproach() {
   84           Parameter param = service.getParameter(FIXConstants.FIX_RESPONSE_HANDLER_APPROACH);
   85           if (param != null) {
   86               if ("false".equals(param.getValue().toString())) {
   87                   allNewApproach = false;
   88                   return;
   89               }
   90           }
   91           allNewApproach = true;
   92       }
   93   
   94       public void setOutgoingMessageContext(MessageContext msgCtx) {
   95           if (!allNewApproach) {
   96               outgoingMessages.offer(msgCtx);
   97           }
   98       }
   99   
  100       public void acquire() throws InterruptedException {
  101           semaphore.acquire();
  102       }
  103   
  104       private void handleException(String msg, Exception e) {
  105           log.error(msg, e);
  106           throw new AxisFIXException(msg, e);
  107       }
  108   
  109       /**
  110        * This method is called when quickfix creates a new session. A session
  111        * comes into and remains in existence for the life of the application.
  112        * Sessions exist whether or not a counter party is connected to it. As soon
  113        * as a session is created, the application can begin sending messages to it. If no one
  114        * is logged on, the messages will be sent at the time a connection is
  115        * established with the counterparty.
  116        *
  117        * @param sessionID QuickFIX session ID
  118        */
  119       public void onCreate(SessionID sessionID) {
  120           log.info("New FIX session created: " + sessionID.toString());
  121       }
  122   
  123       /**
  124        * This callback notifies when a valid logon has been established with a
  125        * counter party. This is called when a connection has been established and
  126        * the FIX logon process has completed with both parties exchanging valid
  127        * logon messages.
  128        *
  129        * @param sessionID QuickFIX session ID
  130        */
  131       public void onLogon(SessionID sessionID) {
  132           countersMap.put(sessionID, 0);
  133           log.info("FIX session logged on: " + sessionID.toString());
  134           semaphore.release();
  135       }
  136   
  137       /**
  138        * This callback notifies when a FIX session is no longer online. This
  139        * could happen during a normal logout exchange or because of a forced
  140        * termination or a loss of network connection.
  141        *
  142        * @param sessionID QuickFIX session ID
  143        */
  144       public void onLogout(SessionID sessionID) {
  145           countersMap.put(sessionID, 0);
  146           FIXTransportSender trpSender = (FIXTransportSender) cfgCtx.getAxisConfiguration().
  147                   getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
  148           trpSender.logOutIncomingSession(sessionID);
  149           log.info("FIX session logged out: " + sessionID.toString());
  150       }
  151   
  152       /**
  153        * This callback provides Synapse with a peek at the administrative messages
  154        * that are being sent from your FIX engine to the counter party. This is
  155        * normally not useful for an application however it is provided for any
  156        * logging one may wish to do.
  157        *
  158        * @param message QuickFIX message
  159        * @param sessionID QuickFIX session ID
  160        */
  161       public void toAdmin(Message message, SessionID sessionID) {
  162           if (log.isDebugEnabled()) {
  163               StringBuffer sb = new StringBuffer();
  164               try {
  165                   sb.append("Sending admin level FIX message to ").append(message.getHeader().getField(new TargetCompID()).getValue());
  166                   sb.append("\nMessage Type: ").append(message.getHeader().getField(new MsgType()).getValue());
  167                   sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue());
  168                   sb.append("\nSender ID: ").append(message.getHeader().getField(new SenderCompID()).getValue());
  169               } catch (FieldNotFound e) {
  170                   sb.append("Sending admin level FIX message...");
  171                   log.warn("One or more required fields are not found in the response message", e);
  172               }
  173               log.debug(sb.toString());
  174               if (log.isTraceEnabled()) {
  175                   log.trace("Message: " + message.toString());
  176               }
  177           }
  178       }
  179   
  180       /**
  181        * This callback notifies when an administrative message is sent from a
  182        * counterparty to the FIX engine.
  183        *
  184        * @param message QuickFIX message
  185        * @param sessionID QuickFIX session ID
  186        * @throws FieldNotFound
  187        * @throws IncorrectDataFormat
  188        * @throws IncorrectTagValue
  189        * @throws RejectLogon causes a logon reject
  190        */
  191       public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound,
  192               IncorrectDataFormat, IncorrectTagValue, RejectLogon {
  193   
  194           if (log.isDebugEnabled()) {
  195               StringBuffer sb = new StringBuffer();
  196               sb.append("Received admin level FIX message from ").append(message.getHeader().getField(new SenderCompID()).getValue());
  197               sb.append("\nMessage Type: ").append(message.getHeader().getField(new MsgType()).getValue());
  198               sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue());
  199               sb.append("\nReceiver ID: ").append(message.getHeader().getField(new TargetCompID()).getValue());
  200               log.debug(sb.toString());
  201               if (log.isTraceEnabled()) {
  202                   log.trace("Message: " + message.toString());
  203               }
  204           }
  205       }
  206   
  207       /**
  208        * This is a callback for application messages that are being sent to a
  209        * counterparty.
  210        *
  211        * @param message QuickFIX message
  212        * @param sessionID QuickFIX session ID
  213        * @throws DoNotSend This exception aborts message transmission
  214        */
  215       public void toApp(Message message, SessionID sessionID) throws DoNotSend {
  216             if (log.isDebugEnabled()) {
  217               StringBuffer sb = new StringBuffer();
  218               try {
  219                   sb.append("Sending application level FIX message to ").append(message.getHeader().getField(new TargetCompID()).getValue());
  220                   sb.append("\nMessage Type: ").append(message.getHeader().getField(new MsgType()).getValue());
  221                   sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue());
  222                   sb.append("\nSender ID: ").append(message.getHeader().getField(new SenderCompID()).getValue());
  223               } catch (FieldNotFound e) {
  224                   sb.append("Sending application level FIX message...");
  225                   log.warn("One or more required fields are not found in the response message", e);
  226               }
  227               log.debug(sb.toString());
  228               if (log.isTraceEnabled()) {
  229                   log.trace("Message: " + message.toString());
  230               }
  231           }
  232       }
  233   
  234       /**
  235        * This callback receives messages for the application. This is one of the
  236        * core entry points for the FIX application. Every application level
  237        * request will come through here. A new thread will be spawned from the
  238        * thread pool for each incoming message.
  239        *
  240        * @param message QuickFIX message
  241        * @param sessionID QuickFIX session ID
  242        * @throws FieldNotFound
  243        * @throws IncorrectDataFormat
  244        * @throws IncorrectTagValue
  245        * @throws UnsupportedMessageType
  246        */
  247       public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat,
  248               IncorrectTagValue, UnsupportedMessageType {
  249           if (log.isDebugEnabled()) {
  250               StringBuffer sb = new StringBuffer();
  251               sb.append("Received FIX message from ").append(message.getHeader().getField(new SenderCompID()).getValue());
  252               sb.append("\nMessage Sequence Number: ").append(message.getHeader().getField(new MsgSeqNum()).getValue());
  253               sb.append("\nReceiver ID: ").append(message.getHeader().getField(new TargetCompID()).getValue());
  254               log.debug(sb.toString());
  255               if (log.isTraceEnabled()) {
  256                   log.trace("Message: " + message.toString());
  257               }
  258           }
  259   
  260           int counter = countersMap.get(sessionID);
  261           counter++;
  262           countersMap.put(sessionID, counter);
  263   
  264           workerPool.execute(new FIXWorkerThread(message, sessionID, counter));
  265       }
  266   
  267       /**
  268        * This Runnable class can be used when it is required to process each incoming message
  269        * using separate threads.
  270        */
  271       class FIXWorkerThread implements Runnable {
  272   
  273           private Message message;
  274           private SessionID sessionID;
  275           private int counter;
  276   
  277           public FIXWorkerThread(Message message, SessionID sessionID, int counter) {
  278               this.message = message;
  279               this.sessionID = sessionID;
  280               this.counter = counter;
  281           }
  282   
  283           private void handleIncomingRequest() {
  284               //Create message context for the incmong message
  285               AbstractTransportListener trpListener = (AbstractTransportListener) cfgCtx.getAxisConfiguration().
  286                       getTransportIn(FIXConstants.TRANSPORT_NAME).getReceiver();
  287   
  288               MessageContext msgCtx = trpListener.createMessageContext();
  289               msgCtx.setProperty(Constants.OUT_TRANSPORT_INFO, new FIXOutTransportInfo(sessionID));
  290   
  291               if (service != null) {
  292                   // Set the service for which the message is intended to
  293                   msgCtx.setAxisService(service);
  294                   // find the operation for the message, or default to one
  295                   Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM);
  296                   QName operationQName = (
  297                       operationParam != null ?
  298                           BaseUtils.getQNameFromString(operationParam.getValue()) :
  299                           BaseConstants.DEFAULT_OPERATION);
  300   
  301                   AxisOperation operation = service.getOperation(operationQName);
  302                   if (operation != null) {
  303                       msgCtx.setAxisOperation(operation);
  304                       msgCtx.setSoapAction("urn:" + operation.getName().getLocalPart());
  305                   }
  306               }
  307   
  308               String fixApplication = FIXConstants.FIX_INITIATOR;
  309               if (acceptor) {
  310                   fixApplication = FIXConstants.FIX_ACCEPTOR;
  311               }
  312               else {
  313                   msgCtx.setProperty("synapse.isresponse", true);
  314               }
  315   
  316               try {
  317                   //Put the FIX message in a SOAPEnvelope
  318                   FIXUtils.getInstance().setSOAPEnvelope(message, counter, sessionID.toString(), msgCtx);
  319                   trpListener.handleIncomingMessage(
  320                           msgCtx,
  321                           FIXUtils.getTransportHeaders(service.getName(), fixApplication),
  322                           null,
  323                           FIXConstants.FIX_DEFAULT_CONTENT_TYPE
  324                   );
  325               } catch (AxisFault e) {
  326                   handleException("Error while processing FIX message", e);
  327               }
  328           }
  329   
  330           private void handleIncomingResponse(MessageContext outMsgCtx) {
  331               AbstractTransportSender trpSender = (AbstractTransportSender) cfgCtx.getAxisConfiguration().
  332                           getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
  333   
  334               MessageContext msgCtx = trpSender.createResponseMessageContext(outMsgCtx);
  335   
  336               try {
  337                   //Put the FIX message in a SOAPEnvelope
  338                   FIXUtils.getInstance().setSOAPEnvelope(message, counter, sessionID.toString(), msgCtx);
  339                   msgCtx.setServerSide(true);
  340                   trpSender.handleIncomingMessage(
  341                           msgCtx,
  342                           FIXUtils.getTransportHeaders(service.getName(), FIXConstants.FIX_INITIATOR),
  343                           null,
  344                           FIXConstants.FIX_DEFAULT_CONTENT_TYPE
  345                   );
  346               } catch (AxisFault e) {
  347                   handleException("Error while processing response FIX message", e);
  348               }
  349           }
  350   
  351           public void run() {
  352   
  353               if (allNewApproach) {
  354                   //treat all messages (including responses) as new messages
  355                   handleIncomingRequest();
  356               }
  357               else {
  358                   if (acceptor) {
  359                       //treat messages coming from an acceptor as new request messages
  360                       handleIncomingRequest();
  361                   }
  362                   else {
  363                       MessageContext outMsgCtx = outgoingMessages.poll();
  364                       if (outMsgCtx != null) {
  365                           //handle as a response to an outgoing message
  366                           handleIncomingResponse(outMsgCtx);
  367                       }
  368                       else {
  369                           //handle as a new request message
  370                           handleIncomingRequest();
  371                       }
  372                   }
  373               }
  374           }
  375   
  376       }
  377   
  378   }

Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]