Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.transport.fix;
   21   
   22   import org.apache.axis2.AxisFault;
   23   import org.apache.axis2.context.ConfigurationContext;
   24   import org.apache.axis2.context.MessageContext;
   25   import org.apache.axis2.description.AxisService;
   26   import org.apache.axis2.description.Parameter;
   27   import org.apache.axis2.description.TransportOutDescription;
   28   import org.apache.axis2.transport.OutTransportInfo;
   29   import org.apache.commons.logging.LogFactory;
   30   import org.apache.synapse.transport.base.AbstractTransportSender;
   31   import quickfix;
   32   import quickfix.field;
   33   
   34   import java.io.IOException;
   35   import java.util.ArrayList;
   36   import java.util.Hashtable;
   37   import java.util.Map;
   38   
   39   /**
   40    * The FIX transport sender implementation. This implementation looks at the SOAPBody of the message
   41    * context to identify how the message was first received by Axis2 engine and also looks at some FIX
   42    * header fields to make the optimum routing decision.
   43    * <p/>
   44    * This transport sender implementation does not support forwarding FIX messages to sessions with
   45    * different BeginString values.When it performs a message forwarding it makes sure the forwarding
   46    * takes place according to the conditions specified in the 'Thirs Party Routing' section in the
   47    * FIX protocol specification.
   48    */
   49   public class FIXTransportSender extends AbstractTransportSender {
   50   
   51       private FIXSessionFactory sessionFactory;
   52       private FIXOutgoingMessageHandler messageSender;
   53   
   54       public FIXTransportSender() {
   55           this.log = LogFactory.getLog(this.getClass());
   56       }
   57   
   58   
   59       public void setSessionFactory(FIXSessionFactory sessionFactory) {
   60           this.sessionFactory = sessionFactory;
   61           this.messageSender.setSessionFactory(sessionFactory);
   62       }
   63   
   64       /**
   65        * @param cfgCtx       the axis2 configuration context
   66        * @param transportOut the Out Transport description
   67        * @throws AxisFault on error
   68        */
   69       public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
   70           setTransportName(FIXConstants.TRANSPORT_NAME);
   71           super.init(cfgCtx, transportOut);
   72           messageSender = new FIXOutgoingMessageHandler();
   73           log.info("FIX transport sender initialized...");
   74       }
   75   
   76       /**
   77        * Performs the actual sending of the message.
   78        *
   79        * @param msgCtx           the axis2 message context of the message to be sent
   80        * @param targetEPR        the EPR for which the message is to be sent
   81        * @param outTransportInfo the OutTransportInfo for the message
   82        * @throws AxisFault on error
   83        */
   84       public void sendMessage(MessageContext msgCtx, String targetEPR,
   85                               OutTransportInfo outTransportInfo) throws AxisFault {
   86   
   87           if (log.isDebugEnabled()) {
   88               log.debug("Attempting to send a FIX message, Message ID:" + msgCtx.getMessageID());
   89           }
   90           Message fixMessage = null;
   91           String serviceName = FIXUtils.getServiceName(msgCtx);
   92           String fixApplication = FIXUtils.getFixApplication(msgCtx);
   93           String sourceSession = FIXUtils.getSourceSession(msgCtx);
   94           int counter = FIXUtils.getSequenceNumber(msgCtx);
   95   
   96           try {
   97               fixMessage = FIXUtils.getInstance().createFIXMessage(msgCtx);
   98           } catch (IOException e) {
   99               handleException("Exception occured while creating the FIX message from SOAP Envelope", e);
  100           }
  101   
  102           if (FIXConstants.FIX_ACCEPTOR.equals(fixApplication)) {
  103               //A message came in through an acceptor bound to a service
  104               if (targetEPR != null) {
  105                   //Forward the message to the given EPR
  106                   sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
  107               } else if (outTransportInfo != null && outTransportInfo instanceof FIXOutTransportInfo) {
  108                   //Send the message back to the sender
  109                   sendUsingTrpOutInfo(outTransportInfo, serviceName, fixMessage, sourceSession, counter);
  110               }
  111           } else if (FIXConstants.FIX_INITIATOR.equals(fixApplication)) {
  112   
  113               if (sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter)) {
  114                   return;
  115               } else if (targetEPR != null) {
  116                   sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
  117                   return;
  118               }
  119               handleException("Unable to find a session to send the message...");
  120   
  121           } else {
  122               //A message generated in Axis2 engine or a message arrived over a different transport
  123               if (targetEPR != null) {
  124                   sendUsingEPR(targetEPR, serviceName, fixMessage, sourceSession, counter, msgCtx);
  125               } else {
  126                   sendUsingAcceptorSession(serviceName, fixMessage, sourceSession, counter);
  127               }
  128           }
  129       }
  130   
  131       private boolean isTargetVald(Map<String, String> fieldValues, SessionID targetSession) {
  132           String beginString = fieldValues.get(FIXConstants.BEGIN_STRING);
  133           String deliverToCompID = fieldValues.get(FIXConstants.DELIVER_TO_COMP_ID);
  134           String deliverToSubID = fieldValues.get(FIXConstants.DELIVER_TO_SUB_ID);
  135           String deliverToLocationID = fieldValues.get(FIXConstants.DELIVER_TO_LOCATION_ID);
  136   
  137           if (!targetSession.getBeginString().equals(beginString)) {
  138               return false;
  139           } else if (!targetSession.getTargetCompID().equals(deliverToCompID)) {
  140               return false;
  141           } else if (deliverToSubID != null && !deliverToSubID.equals(targetSession.getTargetSubID())) {
  142               return false;
  143           } else if (deliverToLocationID != null && !deliverToLocationID.equals(targetSession.getTargetLocationID())) {
  144               return false;
  145           }
  146           return true;
  147       }
  148   
  149       /**
  150        * Prepares the message to be forwarded according to the conditions specified in the FIX protocol
  151        * specification.
  152        *
  153        * @param message     the FIX message to be forwarded
  154        * @param fieldValues a Map of field values for quick access
  155        */
  156       private void prepareToForwardMessage(Message message, Map<String, String> fieldValues) {
  157           //set OnBehalfOf* fields
  158           message.getHeader().setField(new OnBehalfOfCompID(fieldValues.get(FIXConstants.SENDER_COMP_ID)));
  159           if (fieldValues.get(FIXConstants.SENDER_SUB_ID) != null) {
  160               message.getHeader().setField(new OnBehalfOfSubID(fieldValues.get(FIXConstants.SENDER_SUB_ID)));
  161           }
  162   
  163           if (fieldValues.get(FIXConstants.SENDER_LOCATION_ID) != null) {
  164               message.getHeader().setField(new OnBehalfOfLocationID(fieldValues.get(FIXConstants.SENDER_LOCATION_ID)));
  165           }
  166   
  167           //remove additional Sender* fields and DeliverTo* fields
  168           message.getHeader().removeField(SenderSubID.FIELD);
  169           message.getHeader().removeField(SenderLocationID.FIELD);
  170           message.getHeader().removeField(DeliverToCompID.FIELD);
  171           message.getHeader().removeField(DeliverToSubID.FIELD);
  172           message.getHeader().removeField(DeliverToLocationID.FIELD);
  173       }
  174   
  175       /**
  176        * Puts DeliverToX fields in the message to enable the message to be forwarded at the destination.
  177        *
  178        * @param message   the FIX message to be forwarded
  179        * @param targetEPR the EPR to which the message will be sent
  180        */
  181       private void setDeliverToXFields(Message message, String targetEPR) {
  182           Hashtable<String, String> properties = FIXUtils.getProperties(targetEPR);
  183           String deliverTo = properties.get(FIXConstants.DELIVER_TO_COMP_ID);
  184           //If a DeliverToCompID field is given in EPR put the field in the message
  185           if (deliverTo != null) {
  186               message.getHeader().setField(new DeliverToCompID(deliverTo));
  187               deliverTo = properties.get(FIXConstants.DELIVER_TO_SUB_ID);
  188               if (deliverTo != null) {
  189                   message.getHeader().setField(new DeliverToSubID(deliverTo));
  190               }
  191   
  192               deliverTo = properties.get(FIXConstants.DELIVER_TO_LOCATION_ID);
  193               if (deliverTo != null) {
  194                   message.getHeader().setField(new DeliverToLocationID(deliverTo));
  195               }
  196           }
  197       }
  198   
  199       /**
  200        * Puts DeliverToX fields in the message to enable the message to be forwarded at the destination.
  201        *
  202        * @param message     the FIX message to be forwarded
  203        * @param fieldValues the Map of field values for quick access
  204        */
  205       private void setDeliverToXFields(Message message, Map<String, String> fieldValues) {
  206           //Use the fields of the message to set DeliverToX fields
  207           String onBehalf = fieldValues.get(FIXConstants.ON_BEHALF_OF_COMP_ID);
  208           if (onBehalf != null) {
  209               message.getHeader().setField(new DeliverToCompID(onBehalf));
  210               onBehalf = fieldValues.get(FIXConstants.ON_BEHALF_OF_SUB_ID);
  211               if (onBehalf != null) {
  212                   message.getHeader().setField(new DeliverToSubID(onBehalf));
  213               }
  214   
  215               onBehalf = fieldValues.get(FIXConstants.ON_BEHALF_OF_LOCATION_ID);
  216               if (onBehalf != null) {
  217                   message.getHeader().setField(new DeliverToLocationID(onBehalf));
  218               }
  219   
  220               message.getHeader().removeField(OnBehalfOfCompID.FIELD);
  221               message.getHeader().removeField(OnBehalfOfSubID.FIELD);
  222               message.getHeader().removeField(OnBehalfOfLocationID.FIELD);
  223           }
  224       }
  225   
  226       /**
  227        * Puts DeliverToX fields in the message to enable the message to be forwarded at the destination.
  228        * This method retireves the parameters from the services.xml and put them in the message as
  229        * DeliverToX fields. Should be used when a response message has to forwarded at the destination.
  230        *
  231        * @param message the FIX message to be forwarded
  232        * @param service the AxisService of the message
  233        */
  234       private void setDeliverToXFields(Message message, AxisService service) {
  235           Parameter param = service.getParameter(FIXConstants.FIX_RESPONSE_DELIVER_TO_COMP_ID_PARAM);
  236           if (param != null) {
  237               message.getHeader().setField(new DeliverToCompID(param.getValue().toString()));
  238               param = service.getParameter(FIXConstants.FIX_RESPONSE_DELIVER_TO_SUB_ID_PARAM);
  239               if (param != null) {
  240                   message.getHeader().setField(new DeliverToSubID(param.getValue().toString()));
  241               }
  242   
  243               param = service.getParameter(FIXConstants.FIX_RESPONSE_DELIVER_TO_LOCATION_ID_PARAM);
  244               if (param != null) {
  245                   message.getHeader().setField(new DeliverToLocationID(param.getValue().toString()));
  246               }
  247           }
  248       }
  249   
  250       /**
  251        * Sends a FIX message to the given EPR
  252        *
  253        * @param targetEPR   the EPR to which the message is sent to
  254        * @param serviceName name of the service which processed the message
  255        * @param fixMessage  the FIX message
  256        * @param srcSession  String uniquely identifying the incoming session
  257        * @param counter     application level sequence number of the message
  258        * @param msgCtx      the Axis2 MessageContext for the message
  259        * @return boolean value indicating the result
  260        * @throws AxisFault on error
  261        */
  262       private boolean sendUsingEPR(String targetEPR, String serviceName, Message fixMessage,
  263                                    String srcSession, int counter, MessageContext msgCtx) throws AxisFault {
  264   
  265           FIXOutTransportInfo fixOut = new FIXOutTransportInfo(targetEPR);
  266           SessionID sessionID = fixOut.getSessionID();
  267           Map<String, String> fieldValues = FIXUtils.getMessageForwardingParameters(fixMessage);
  268           String beginString = fieldValues.get(FIXConstants.BEGIN_STRING);
  269           String deliverToCompID = fieldValues.get(FIXConstants.DELIVER_TO_COMP_ID);
  270   
  271           //match BeginString values
  272           if (beginString != null && !beginString.equals(sessionID.getBeginString())) {
  273               handleException("Cannot forward messages to a session with a different BeginString");
  274           }
  275   
  276           if (deliverToCompID != null) {
  277               //message needs to be delivered
  278               if (!deliverToCompID.equals(sessionID.getTargetCompID())) {
  279                   handleException("Cannot forward messages that do not have a valid DeliverToCompID field");
  280               } else {
  281                   prepareToForwardMessage(fixMessage, fieldValues);
  282                   setDeliverToXFields(fixMessage, targetEPR);
  283               }
  284           }
  285   
  286           if (!Session.doesSessionExist(sessionID)) {
  287               //try to create initiator to send the message
  288               AxisService service = cfgCtx.getAxisConfiguration().getService(serviceName);
  289               sessionFactory.createFIXInitiator(targetEPR, service, sessionID);
  290           }
  291   
  292           try {
  293               messageSender.sendMessage(fixMessage, sessionID, srcSession, counter, msgCtx, targetEPR);
  294               return true;
  295           } catch (SessionNotFound e) {
  296               log.error("Error while sending the FIX message. Session " + sessionID.toString() + " does" +
  297                       " not exist", e);
  298               return false;
  299           }
  300       }
  301   
  302       /**
  303        * Sends a FIX message using the SessionID in the OutTransportInfo
  304        *
  305        * @param trpOutInfo  the TransportOutInfo for the message
  306        * @param fixMessage  the FIX message to be sent
  307        * @param srcSession  String uniquely identifying the incoming session
  308        * @param counter     application level sequence number of the message
  309        * @param serviceName name of the AxisSerivce for the message
  310        * @return boolean value indicating the result
  311        * @throws AxisFault on error
  312        */
  313       private boolean sendUsingTrpOutInfo(OutTransportInfo trpOutInfo, String serviceName,
  314                                           Message fixMessage, String srcSession, int counter) throws AxisFault {
  315   
  316           FIXOutTransportInfo fixOut = (FIXOutTransportInfo) trpOutInfo;
  317           SessionID sessionID = fixOut.getSessionID();
  318           Map<String, String> fieldValues = FIXUtils.getMessageForwardingParameters(fixMessage);
  319           String beginString = fieldValues.get(FIXConstants.BEGIN_STRING);
  320           String deliverToCompID = fieldValues.get(FIXConstants.DELIVER_TO_COMP_ID);
  321   
  322           //match BeginString values
  323           if (beginString != null && !beginString.equals(sessionID.getBeginString())) {
  324               handleException("Cannot forward messages to a session with a different BeginString");
  325           }
  326   
  327           if (deliverToCompID != null) {
  328               //message needs to be delivered to some other party
  329               if (!deliverToCompID.equals(sessionID.getTargetCompID())) {
  330                   handleException("Cannot forward messages that do not have a valid DeliverToCompID field");
  331               } else {
  332                   prepareToForwardMessage(fixMessage, fieldValues);
  333                   AxisService service = cfgCtx.getAxisConfiguration().getService(serviceName);
  334                   setDeliverToXFields(fixMessage, service);
  335               }
  336           } else {
  337               setDeliverToXFields(fixMessage, fieldValues);
  338           }
  339   
  340           try {
  341               messageSender.sendMessage(fixMessage, sessionID, srcSession, counter, null, null);
  342               return true;
  343           } catch (SessionNotFound e) {
  344               log.error("Error while sending the FIX message. Session " + sessionID.toString() + " does" +
  345                       " not exist", e);
  346               return false;
  347           }
  348       }
  349   
  350       /**
  351        * Send the message using a session in the aaceptor side
  352        *
  353        * @param serviceName the service of the message
  354        * @param fixMessage  the FIX message to be sent
  355        * @param srcSession  String uniquely identifying the incoming session
  356        * @param counter     the application level sequence number of the message
  357        * @return boolean value indicating the result
  358        * @throws AxisFault on error
  359        */
  360       private boolean sendUsingAcceptorSession(String serviceName, Message fixMessage, String srcSession,
  361                                                int counter) throws AxisFault {
  362   
  363           Map<String, String> fieldValues = FIXUtils.getMessageForwardingParameters(fixMessage);
  364           String deliverToCompID = fieldValues.get(FIXConstants.DELIVER_TO_COMP_ID);
  365   
  366           Acceptor acceptor = sessionFactory.getAccepter(serviceName);
  367           SessionID sessionID = null;
  368   
  369           if (acceptor != null) {
  370               ArrayList<SessionID> sessions = acceptor.getSessions();
  371               if (sessions.size() == 1) {
  372                   sessionID = sessions.get(0);
  373                   if (deliverToCompID != null && !isTargetVald(fieldValues, sessionID)) {
  374                       sessionID = null;
  375                   }
  376   
  377               } else if (sessions.size() > 1 && deliverToCompID != null) {
  378                   for (int i = 0; i < sessions.size(); i++) {
  379                       sessionID = sessions.get(i);
  380                       if (isTargetVald(fieldValues, sessionID)) {
  381                           break;
  382                       }
  383                   }
  384               }
  385           }
  386   
  387           if (sessionID != null) {
  388               //Found a valid session. Now forward the message...
  389               FIXOutTransportInfo fixOutInfo = new FIXOutTransportInfo(sessionID);
  390               return sendUsingTrpOutInfo(fixOutInfo, serviceName, fixMessage, srcSession, counter);
  391           }
  392           return false;
  393       }
  394   
  395       public void logOutIncomingSession(SessionID sessionID) {
  396           messageSender.cleanUpMessages(sessionID.toString());
  397       }
  398   
  399   }

Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]