Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *   or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.transport.fix;
   21   
   22   import org.apache.axiom.attachments.ByteArrayDataSource;
   23   import org.apache.axiom.om.OMElement;
   24   import org.apache.axiom.soap.SOAPBody;
   25   import org.apache.axiom.soap.SOAPEnvelope;
   26   import org.apache.axiom.soap.SOAPFactory;
   27   import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
   28   import org.apache.axis2.AxisFault;
   29   import org.apache.axis2.context.MessageContext;
   30   import org.apache.commons.logging.Log;
   31   import org.apache.commons.logging.LogFactory;
   32   import quickfix;
   33   import quickfix.field;
   34   
   35   import javax.activation.DataHandler;
   36   import javax.activation.DataSource;
   37   import javax.xml.namespace.QName;
   38   import java.io.ByteArrayOutputStream;
   39   import java.io.IOException;
   40   import java.net.InetSocketAddress;
   41   import java.net.SocketAddress;
   42   import java.util;
   43   
   44   public class FIXUtils {
   45   
   46       private static final Log log = LogFactory.getLog(FIXUtils.class);
   47       private static FIXUtils _instance = new FIXUtils();
   48   
   49       public static FIXUtils getInstance() {
   50           return _instance;
   51       }
   52   
   53       /**
   54        * FIX messages are non-XML. So convert them into XML using the AXIOM API.
   55        * Put the FIX message into an Axis2 MessageContext.The basic format of the
   56        * generated SOAP envelope;
   57        * <p/>
   58        * <soapEnvelope>
   59        * <soapBody>
   60        * <message>
   61        * <header> ....</header>
   62        * <body> .... </body>
   63        * <trailer> .... </trailer>
   64        * </message>
   65        * </soapBody>
   66        * </soapEnvelope>
   67        *
   68        * @param message   the FIX message
   69        * @param counter   application level sequence number of the message
   70        * @param sessionID the incoming session
   71        * @param msgCtx    the Axis2 MessageContext to hold the FIX message
   72        * @throws AxisFault the exception thrown when invalid soap envelopes are set to the msgCtx
   73        */
   74       public void setSOAPEnvelope(Message message, int counter, String sessionID,
   75                                   MessageContext msgCtx) throws AxisFault {
   76   
   77           if (log.isDebugEnabled()) {
   78               log.debug("Creating SOAP envelope for FIX message...");
   79           }
   80   
   81           SOAPFactory soapFactory = new SOAP11Factory();
   82           OMElement msg = soapFactory.createOMElement(FIXConstants.FIX_MESSAGE, null);
   83           msg.addAttribute(soapFactory.createOMAttribute(FIXConstants.FIX_MESSAGE_INCOMING_SESSION, null, sessionID));
   84           msg.addAttribute(soapFactory.createOMAttribute
   85                   (FIXConstants.FIX_MESSAGE_COUNTER, null, String.valueOf(counter)));
   86   
   87           OMElement header = soapFactory.createOMElement(FIXConstants.FIX_HEADER, null);
   88           OMElement body = soapFactory.createOMElement(FIXConstants.FIX_BODY, null);
   89           OMElement trailer = soapFactory.createOMElement(FIXConstants.FIX_TRAILER, null);
   90   
   91           //process FIX header
   92           Iterator<Field<?>> iter = message.getHeader().iterator();
   93           if (iter != null) {
   94               while (iter.hasNext()) {
   95                   Field<?> field = iter.next();
   96                   OMElement msgField = soapFactory.createOMElement(FIXConstants.FIX_FIELD, null);
   97                   msgField.addAttribute(soapFactory.
   98                           createOMAttribute(FIXConstants.FIX_FIELD_ID, null, String.valueOf(field.getTag())));
   99                   Object value = field.getObject();
  100   
  101                   if (value instanceof byte[]) {
  102                       DataSource dataSource = new ByteArrayDataSource((byte[]) value);
  103                       DataHandler dataHandler = new DataHandler(dataSource);
  104                       String contentID = msgCtx.addAttachment(dataHandler);
  105                       OMElement binaryData = soapFactory.createOMElement(FIXConstants.FIX_BINARY_FIELD, null);
  106                       String binaryCID = "cid:" + contentID;
  107                       binaryData.addAttribute(FIXConstants.FIX_MESSAGE_REFERENCE, binaryCID, null);
  108                       msgField.addChild(binaryData);
  109                   } else {
  110                       soapFactory.createOMText(msgField, value.toString(), OMElement.CDATA_SECTION_NODE);
  111                   }
  112                   header.addChild(msgField);
  113               }
  114           }
  115           //process FIX body
  116           iter = message.iterator();
  117           if (iter != null) {
  118               while (iter.hasNext()) {
  119                   Field<?> field = iter.next();
  120                   OMElement msgField = soapFactory.createOMElement(FIXConstants.FIX_FIELD, null);
  121                   msgField.addAttribute(soapFactory.
  122                           createOMAttribute(FIXConstants.FIX_FIELD_ID, null, String.valueOf(field.getTag())));
  123                   Object value = field.getObject();
  124                   if (value instanceof byte[]) {
  125                       DataSource dataSource = new ByteArrayDataSource((byte[]) value);
  126                       DataHandler dataHandler = new DataHandler(dataSource);
  127                       String contentID = msgCtx.addAttachment(dataHandler);
  128                       OMElement binaryData = soapFactory.createOMElement(FIXConstants.FIX_BINARY_FIELD, null);
  129                       String binaryCID = "cid:" + contentID;
  130                       binaryData.addAttribute(FIXConstants.FIX_MESSAGE_REFERENCE, binaryCID, null);
  131                       msgField.addChild(binaryData);
  132                   } else {
  133                       soapFactory.createOMText(msgField, value.toString(), OMElement.CDATA_SECTION_NODE);
  134                   }
  135                   body.addChild(msgField);
  136               }
  137           }
  138           //process FIX trailer
  139           iter = message.getTrailer().iterator();
  140           if (iter != null) {
  141               while (iter.hasNext()) {
  142                   Field<?> field = iter.next();
  143                   OMElement msgField = soapFactory.createOMElement(FIXConstants.FIX_FIELD, null);
  144                   msgField.addAttribute(soapFactory.
  145                           createOMAttribute(FIXConstants.FIX_FIELD_ID, null, String.valueOf(field.getTag())));
  146                   Object value = field.getObject();
  147   
  148                   if (value instanceof byte[]) {
  149                       DataSource dataSource = new ByteArrayDataSource((byte[]) value);
  150                       DataHandler dataHandler = new DataHandler(dataSource);
  151                       String contentID = msgCtx.addAttachment(dataHandler);
  152                       OMElement binaryData = soapFactory.createOMElement(FIXConstants.FIX_BINARY_FIELD, null);
  153                       String binaryCID = "cid:" + contentID;
  154                       binaryData.addAttribute(FIXConstants.FIX_MESSAGE_REFERENCE, binaryCID, null);
  155                       msgField.addChild(binaryData);
  156                   } else {
  157                       soapFactory.createOMText(msgField, value.toString(), OMElement.CDATA_SECTION_NODE);
  158                   }
  159                   trailer.addChild(msgField);
  160               }
  161           }
  162   
  163           msg.addChild(header);
  164           msg.addChild(body);
  165           msg.addChild(trailer);
  166           SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
  167           envelope.getBody().addChild(msg);
  168           msgCtx.setEnvelope(envelope);
  169       }
  170   
  171   
  172       /**
  173        * Extract the FIX message embedded in an Axis2 MessageContext
  174        *
  175        * @param msgCtx the Axis2 MessageContext
  176        * @return a FIX message
  177        * @throws java.io.IOException the exception thrown when handling erroneous binary content
  178        */
  179       public Message createFIXMessage(MessageContext msgCtx) throws IOException {
  180           if (log.isDebugEnabled()) {
  181               log.debug("Extracting FIX message from the message context (Message ID: " + msgCtx.getMessageID() + ")");
  182           }
  183   
  184           Message message = new Message();
  185           SOAPBody soapBody = msgCtx.getEnvelope().getBody();
  186           OMElement messageNode = soapBody.getFirstChildWithName(new QName(FIXConstants.FIX_MESSAGE));
  187           Iterator<OMElement> messageElements = messageNode.getChildElements();
  188   
  189           while (messageElements.hasNext()) {
  190               OMElement node = messageElements.next();
  191               //create FIX header
  192               if (node.getQName().getLocalPart().equals(FIXConstants.FIX_HEADER)) {
  193                   Iterator<OMElement> headerElements = node.getChildElements();
  194                   while (headerElements.hasNext()) {
  195                       OMElement headerNode = headerElements.next();
  196                       String tag = headerNode.getAttributeValue(new QName(FIXConstants.FIX_FIELD_ID));
  197                       String value = null;
  198   
  199                       OMElement child = headerNode.getFirstElement();
  200                       if (child != null) {
  201                           String href = headerNode.getFirstElement().
  202                                   getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_REFERENCE));
  203                           if (href != null) {
  204                               DataHandler binaryDataHandler = msgCtx.getAttachment(href.substring(4));
  205                               ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
  206                               binaryDataHandler.writeTo(outputStream);
  207                               value = new String(outputStream.toByteArray());
  208                           }
  209                       } else {
  210                           value = headerNode.getText();
  211                       }
  212   
  213                       if (value != null) {
  214                           message.getHeader().setString(Integer.parseInt(tag), value);
  215                       }
  216                   }
  217   
  218               } else if (node.getQName().getLocalPart().equals(FIXConstants.FIX_BODY)) {
  219                   //create FIX body
  220                   Iterator<OMElement> bodyElements = node.getChildElements();
  221                   while (bodyElements.hasNext()) {
  222                       OMElement bodyNode = bodyElements.next();
  223                       String tag = bodyNode.getAttributeValue(new QName(FIXConstants.FIX_FIELD_ID));
  224                       String value = null;
  225   
  226                       OMElement child = bodyNode.getFirstElement();
  227                       if (child != null) {
  228                           String href = bodyNode.getFirstElement().
  229                                   getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_REFERENCE));
  230                           if (href != null) {
  231                               DataHandler binaryDataHandler = msgCtx.getAttachment(href.substring(4));
  232                               ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
  233                               binaryDataHandler.writeTo(outputStream);
  234                               value = new String(outputStream.toByteArray());
  235                           }
  236                       } else {
  237                           value = bodyNode.getText();
  238                       }
  239   
  240                       if (value != null) {
  241                           message.setString(Integer.parseInt(tag), value);
  242                       }
  243                   }
  244               } else if (node.getQName().getLocalPart().equals(FIXConstants.FIX_TRAILER)) {
  245                   //create FIX trailer
  246                   Iterator<OMElement> trailerElements = node.getChildElements();
  247                   while (trailerElements.hasNext()) {
  248                       OMElement trailerNode = trailerElements.next();
  249                       String tag = trailerNode.getAttributeValue(new QName(FIXConstants.FIX_FIELD_ID));
  250                       String value = null;
  251   
  252                       OMElement child = trailerNode.getFirstElement();
  253                       if (child != null) {
  254                           String href = trailerNode.getFirstElement().
  255                                   getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_REFERENCE));
  256                           if (href != null) {
  257                               DataHandler binaryDataHandler = msgCtx.getAttachment(href.substring(4));
  258                               ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
  259                               binaryDataHandler.writeTo(outputStream);
  260                               value = new String(outputStream.toByteArray());
  261                           }
  262                       } else {
  263                           value = trailerNode.getText();
  264                       }
  265   
  266                       if (value != null) {
  267                           message.getTrailer().setString(Integer.parseInt(tag), value);
  268                       }
  269                   }
  270               }
  271           }
  272           return message;
  273       }
  274   
  275   
  276       /**
  277        * Generate EPRs for the specified FIX service. A FIX end point can be uniquely
  278        * identified by a <host(IP), port> pair. Add some additional FIX session details
  279        * so the EPRs are more self descriptive.
  280        * A FIX EPR generated here looks like;
  281        * fix://10.100.1.80:9898?BeginString=FIX.4.4&SenderCompID=BANZAI&TargetCompID=EXEC&
  282        * SessionQualifier=mySession&Serviec=StockQuoteProxy
  283        *
  284        * @param acceptor    the SocketAcceptor associated with the service
  285        * @param serviceName the name of the service
  286        * @param ip          the IP address of the host
  287        * @return an array of EPRs for the specified service in String format
  288        */
  289       public static String[] generateEPRs(SocketAcceptor acceptor, String serviceName, String ip) {
  290           //Get all the addresses associated with the acceptor
  291           Map<SessionID, SocketAddress> socketAddresses = acceptor.getAcceptorAddresses();
  292           //Get all the sessions (SessionIDs) associated with the acceptor
  293           ArrayList<SessionID> sessions = acceptor.getSessions();
  294           String[] EPRList = new String[sessions.size()];
  295   
  296           //Generate an EPR for each session/socket address
  297           for (int i = 0; i < sessions.size(); i++) {
  298               SessionID sessionID = sessions.get(i);
  299               InetSocketAddress socketAddress = (InetSocketAddress) socketAddresses.get(sessionID);
  300               EPRList[i] = FIXConstants.FIX_PREFIX + ip + ":" + socketAddress.getPort() +
  301                       "?" + FIXConstants.BEGIN_STRING + "=" + sessionID.getBeginString() +
  302                       "&" + FIXConstants.SENDER_COMP_ID + "=" + sessionID.getTargetCompID() +
  303                       "&" + FIXConstants.TARGET_COMP_ID + "=" + sessionID.getSenderCompID();
  304   
  305               String sessionQualifier = sessionID.getSessionQualifier();
  306               if (sessionQualifier != null && !sessionQualifier.equals("")) {
  307                   EPRList[i] += "&" + FIXConstants.SESSION_QUALIFIER + "=" + sessionQualifier;
  308               }
  309   
  310               String senderSubID = sessionID.getSenderSubID();
  311               if (senderSubID != null && !senderSubID.equals("")) {
  312                   EPRList[i] += "&" + FIXConstants.SENDER_SUB_ID + "=" + senderSubID;
  313               }
  314   
  315               String targetSubID = sessionID.getTargetSubID();
  316               if (targetSubID != null && !targetSubID.equals("")) {
  317                   EPRList[i] += "&" + FIXConstants.TARGET_SUB_ID + "=" + targetSubID;
  318               }
  319   
  320               String senderLocationID = sessionID.getSenderLocationID();
  321               if (senderLocationID != null && !senderLocationID.equals("")) {
  322                   EPRList[i] += "&" + FIXConstants.SENDER_LOCATION_ID + "=" + senderLocationID;
  323               }
  324   
  325               String targetLocationID = sessionID.getTargetLocationID();
  326               if (targetLocationID != null && !targetLocationID.equals("")) {
  327                   EPRList[i] += "&" + FIXConstants.TARGET_LOCATION_ID + "=" + targetLocationID;
  328               }
  329   
  330               EPRList[i] += "&Service=" + serviceName;
  331           }
  332           return EPRList;
  333       }
  334   
  335       /**
  336        * Extracts parameters embedded in FIX EPRs
  337        *
  338        * @param url a valid FIX EPR
  339        * @return a Hashtable of FIX properties
  340        */
  341       public static Hashtable getProperties(String url) {
  342           Hashtable<String, String> h = new Hashtable<String, String>();
  343           int propPos = url.indexOf("?");
  344           if (propPos != -1) {
  345               StringTokenizer st = new StringTokenizer(url.substring(propPos + 1), "&");
  346               while (st.hasMoreTokens()) {
  347                   String token = st.nextToken();
  348                   int sep = token.indexOf("=");
  349                   if (sep != -1) {
  350                       h.put(token.substring(0, sep), token.substring(sep + 1));
  351                   }
  352               }
  353           }
  354           return h;
  355       }
  356   
  357       /*
  358        * This is here because AXIOM does not support removing CDATA tags yet. Given a String embedded in
  359        * CDATA tags this method will return the String element only.
  360        *
  361        * @param str the String with CDATA tags
  362        * @return String with CDATA tags stripped
  363        *
  364       private static String removeCDATA(String str) {
  365           if (str.indexOf("<![CDATA[") != -1) {
  366               str = str.split("CDATA")[1].split("]></field>")[0];
  367   		    str= str.substring(1, str.length()-1);
  368   		    return str;
  369           } else {
  370               return str;
  371           }
  372       }*/
  373   
  374       /**
  375        * Extracts the fields related to message forwarding (third party routing) from
  376        * the FIX header.
  377        *
  378        * @param message the FIX message
  379        * @return a Map of forwarding parameters
  380        */
  381       public static Map<String, String> getMessageForwardingParameters(Message message) {
  382   
  383           Map<String, String> map = new HashMap<String, String>();
  384           String value = getHeaderFieldValue(message, BeginString.FIELD);
  385           map.put(FIXConstants.BEGIN_STRING, value);
  386           value = getHeaderFieldValue(message, SenderCompID.FIELD);
  387           map.put(FIXConstants.SENDER_COMP_ID, value);
  388           value = getHeaderFieldValue(message, SenderSubID.FIELD);
  389           map.put(FIXConstants.SENDER_SUB_ID, value);
  390           value = getHeaderFieldValue(message, SenderLocationID.FIELD);
  391           map.put(FIXConstants.SENDER_LOCATION_ID, value);
  392           value = getHeaderFieldValue(message, TargetCompID.FIELD);
  393           map.put(FIXConstants.TARGET_COMP_ID, value);
  394           value = getHeaderFieldValue(message, DeliverToCompID.FIELD);
  395           map.put(FIXConstants.DELIVER_TO_COMP_ID, value);
  396           value = getHeaderFieldValue(message, DeliverToSubID.FIELD);
  397           map.put(FIXConstants.DELIVER_TO_SUB_ID, value);
  398           value = getHeaderFieldValue(message, DeliverToLocationID.FIELD);
  399           map.put(FIXConstants.DELIVER_TO_LOCATION_ID, value);
  400           value = getHeaderFieldValue(message, OnBehalfOfCompID.FIELD);
  401           map.put(FIXConstants.ON_BEHALF_OF_COMP_ID, value);
  402           value = getHeaderFieldValue(message, OnBehalfOfSubID.FIELD);
  403           map.put(FIXConstants.ON_BEHALF_OF_SUB_ID, value);
  404           value = getHeaderFieldValue(message, OnBehalfOfLocationID.FIELD);
  405           map.put(FIXConstants.ON_BEHALF_OF_LOCATION_ID, value);
  406           return map;
  407       }
  408   
  409       private static String getHeaderFieldValue(Message message, int tag) {
  410           try {
  411               return message.getHeader().getString(tag);
  412           } catch (FieldNotFound fieldNotFound) {
  413               return null;
  414           }
  415       }
  416   
  417       /**
  418        * Extracts the name of the service which processed the message from the MessageContext
  419        *
  420        * @param msgCtx Axis2 MessageContext of a message
  421        * @return name of the AxisService
  422        * @throws org.apache.axis2.AxisFault on error
  423        */
  424       public static String getServiceName(MessageContext msgCtx) throws AxisFault {
  425   
  426           Object serviceParam = msgCtx.getProperty(FIXConstants.FIX_SERVICE_NAME);
  427           if (serviceParam != null) {
  428               String serviceName = serviceParam.toString();
  429               if (serviceName != null && !serviceName.equals("")) {
  430                   return serviceName;
  431               }
  432           }
  433   
  434           Map<String, String> trpHeaders = (Map) msgCtx.getProperty(MessageContext.TRANSPORT_HEADERS);
  435           //try to get the service from the transport headers
  436           if (trpHeaders != null) {
  437               String serviceName = (trpHeaders.get(FIXConstants.FIX_MESSAGE_SERVICE));
  438               if (serviceName != null) {
  439                   return serviceName;
  440               }
  441           }
  442           throw new AxisFault("Unable to find a valid service for the message");
  443       }
  444   
  445       /**
  446        * Extracts the application type for the message from the message context
  447        *
  448        * @param msgCtx Axis2 Message Context
  449        * @return application type of the message
  450        */
  451       public static String getFixApplication(MessageContext msgCtx) {
  452           Map<String, String> trpHeaders = (Map) msgCtx.getProperty(MessageContext.TRANSPORT_HEADERS);
  453           //try to get the application type from the transport headers
  454           String fixApplication = null;
  455           if (trpHeaders != null) {
  456               fixApplication = trpHeaders.get(FIXConstants.FIX_MESSAGE_APPLICATION);
  457           }
  458           return fixApplication;
  459       }
  460   
  461       /**
  462        * Creates a Map of transport headers for a message
  463        *
  464        * @param serviceName    name of the service to which the message belongs to
  465        * @param fixApplication FIX application type
  466        * @return a Map of transport headers
  467        */
  468       public static Map<String, String> getTransportHeaders(String serviceName, String fixApplication) {
  469           Map<String, String> trpHeaders = new HashMap<String, String>();
  470           trpHeaders.put(FIXConstants.FIX_MESSAGE_SERVICE, serviceName);
  471           trpHeaders.put(FIXConstants.FIX_MESSAGE_APPLICATION, fixApplication);
  472           return trpHeaders;
  473       }
  474   
  475       /**
  476        * Reads a FIX EPR and returns the host and port on a String array
  477        *
  478        * @param fixEPR a FIX EPR
  479        * @return an array of Strings containing addressing elements
  480        * @throws AxisFault on error
  481        */
  482       public static String[] getSocketAddressElements(String fixEPR) throws AxisFault {
  483           int propPos = fixEPR.indexOf("?");
  484           if (propPos != -1 && fixEPR.startsWith(FIXConstants.FIX_PREFIX)) {
  485               String address = fixEPR.substring(FIXConstants.FIX_PREFIX.length(), propPos);
  486               String[] socketAddressElemets = address.split(":");
  487               if (socketAddressElemets.length == 2) {
  488                   return socketAddressElemets;
  489               }
  490           }
  491           throw new AxisFault("Malformed FIX EPR: " + fixEPR);
  492       }
  493   
  494       /**
  495        * Reads the SOAP body of a message and attempts to retreive the application level sequence number
  496        *
  497        * @param msgCtx Axis2 MessageContext
  498        * @return application level sequence number or -1
  499        */
  500       public static int getSequenceNumber(MessageContext msgCtx) {
  501           SOAPBody body = msgCtx.getEnvelope().getBody();
  502           OMElement messageNode = body.getFirstChildWithName(new QName(FIXConstants.FIX_MESSAGE));
  503           String value = messageNode.getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_COUNTER));
  504           if (value != null) {
  505               return Integer.parseInt(value);
  506           } else {
  507               return -1;
  508           }
  509       }
  510   
  511       /**
  512        * Reads the SOAP body of a message and attempts to retreive the session identifier string
  513        *
  514        * @param msgCtx Axis2 MessageContext
  515        * @return a String uniquely identifying a session or null
  516        */
  517       public static String getSourceSession(MessageContext msgCtx) {
  518           SOAPBody body = msgCtx.getEnvelope().getBody();
  519           OMElement messageNode = body.getFirstChildWithName(new QName(FIXConstants.FIX_MESSAGE));
  520           return messageNode.getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_INCOMING_SESSION));
  521       }
  522   }

Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]