Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.transport.fix;
   21   
   22   import org.apache.axis2.context.MessageContext;
   23   import quickfix.Message;
   24   import quickfix.Session;
   25   import quickfix.SessionID;
   26   import quickfix.SessionNotFound;
   27   
   28   import java.util.HashMap;
   29   import java.util.Map;
   30   
   31   /**
   32    * FIXOutgoingMessageHandler makes sure that messages are delivered in the order they were received by
   33    * a FIX acceptor. In case the message arrived over a different transport srill this class will try to
   34    * put the messages in correct order based on the counter value of the message.
   35    */
   36   public class FIXOutgoingMessageHandler {
   37   
   38       private Map<String, Integer> countersMap;
   39       private Map<String, Map<Integer,Object[]>> messagesMap;
   40       private FIXSessionFactory sessionFactory;
   41   
   42       public FIXOutgoingMessageHandler() {
   43           countersMap = new HashMap<String, Integer>();
   44           messagesMap = new HashMap<String, Map<Integer,Object[]>>();
   45       }
   46   
   47       public void setSessionFactory(FIXSessionFactory sessionFactory) {
   48           this.sessionFactory = sessionFactory;
   49       }
   50   
   51       /**
   52        * Performs the actual send operation on the message. Tries to send the messages in the order they
   53        * arrived over the FIX transport
   54        *
   55        * @param message the FIX message to be sent
   56        * @param targetSession ID of the target FIXSession
   57        * @param sourceSession String that uniquely identifies the incoming session
   58        * @param counter application level sequence number of the message
   59        * @param msgCtx Axis2 MessageContext for the outgoing message
   60        * @param targetEPR the target EPR to forward the message
   61        *
   62        * @throws SessionNotFound on error
   63        */
   64       public synchronized void sendMessage(Message message, SessionID targetSession, String sourceSession,
   65                               int counter, MessageContext msgCtx, String targetEPR) throws SessionNotFound {
   66   
   67           if (sourceSession != null && counter != -1) {
   68   
   69               int expectedValue;
   70               if (countersMap.containsKey(sourceSession)) {
   71                   expectedValue = countersMap.get(sourceSession);
   72               }
   73               else {
   74                   //create new entries in the respective Maps
   75                   //counter starts at 1
   76                   countersMap.put(sourceSession, 1);
   77                   messagesMap.put(sourceSession, new HashMap<Integer,Object[]>());
   78                   expectedValue = 1;
   79               }
   80   
   81               if (expectedValue == counter) {
   82                   sendToTarget(msgCtx, targetEPR, message, targetSession);
   83                   countersMap.put(sourceSession, expectedValue++);
   84                   sendQueuedMessages(expectedValue, sourceSession);
   85               }
   86               else {
   87                   //save the message to be sent later...
   88                   Map<Integer,Object[]> messages = messagesMap.get(sourceSession);
   89                   Object[] obj = new Object[4];
   90                   obj[0] = message;
   91                   obj[1] = targetSession;
   92                   obj[2] = msgCtx;
   93                   obj[3] = targetEPR;
   94                   messages.put(counter, obj);
   95                   messagesMap.put(sourceSession, messages);
   96               }
   97           }
   98           else {
   99               //insufficient information to send the messages in order...
  100               // send it right away...
  101               sendToTarget(msgCtx, targetEPR, message, targetSession);
  102           }
  103       }
  104   
  105       /**
  106        * Sends the FIX message to the given target session. If MessageContext and the target EPR
  107        * are not null then save the outgoing MessageContext in the FIX application to handle the
  108        * response.
  109        *
  110        * @param msgCtx the Axis2 MessageContext of the outgoing message
  111        * @param targetEPR the target EPR to send the message
  112        * @param message the FIX message
  113        * @param sessionID the ID of the target FIX session
  114        *
  115        * @throws SessionNotFound on error
  116        */
  117       private void sendToTarget(MessageContext msgCtx, String targetEPR, Message message,
  118                                 SessionID sessionID) throws SessionNotFound {
  119           if (msgCtx != null && targetEPR != null) {
  120               FIXIncomingMessageHandler messageHandler = (FIXIncomingMessageHandler) sessionFactory.
  121                       getApplication(targetEPR);
  122               messageHandler.setOutgoingMessageContext(msgCtx);
  123           }
  124           Session.sendToTarget(message, sessionID);
  125       }
  126   
  127       /**
  128        * Sends any messages in the queues. Maintains the order of the messages.
  129        *
  130        * @param expectedValue expected counter value
  131        * @param session source FIX session
  132        *
  133        * @throws SessionNotFound on error
  134        */
  135       private void sendQueuedMessages(int expectedValue, String session) throws SessionNotFound {
  136           Map<Integer, Object[]> messages = messagesMap.get(session);
  137           Object[] obj = messages.get(expectedValue);
  138           while (obj != null) {
  139               Message message = (Message) obj[0];
  140               SessionID sessionID = (SessionID) obj[1];
  141               MessageContext msgCtx = null;
  142               String targetEPR = null;
  143               if (obj[2] != null) {
  144                   msgCtx = (MessageContext) obj[2];
  145                   targetEPR = obj[3].toString();
  146               }
  147               sendToTarget(msgCtx, targetEPR, message, sessionID);
  148               messages.remove(expectedValue);
  149               obj = messages.get(expectedValue++);
  150           }
  151           messagesMap.put(session, messages);
  152           countersMap.put(session, expectedValue);
  153       }
  154   
  155       public void cleanUpMessages(String session) {
  156           if (countersMap.containsKey(session)) {
  157               int expectedValue = countersMap.get(session);
  158               Map<Integer,  Object[]> messages = messagesMap.get(session);
  159               while (!messages.isEmpty()) {
  160                   Object[] obj = messages.get(expectedValue);
  161                   if (obj != null) {
  162                       Message message = (Message) obj[0];
  163                       SessionID sessionID = (SessionID) obj[1];
  164                       try {
  165                           Session.sendToTarget(message, sessionID);
  166                       } catch (SessionNotFound ignore) { }
  167   
  168                       messages.remove(expectedValue);
  169                   }
  170                   expectedValue++;
  171               }
  172               messagesMap.remove(session);
  173               countersMap.remove(session);
  174           }
  175       }
  176   }

Home » synapse-1.2-src » org.apache.synapse.transport.fix » [javadoc | source]