1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.transport.fix; 21 22 import org.apache.axis2.context.MessageContext; 23 import quickfix.Message; 24 import quickfix.Session; 25 import quickfix.SessionID; 26 import quickfix.SessionNotFound; 27 28 import java.util.HashMap; 29 import java.util.Map; 30 31 /** 32 * FIXOutgoingMessageHandler makes sure that messages are delivered in the order they were received by 33 * a FIX acceptor. In case the message arrived over a different transport srill this class will try to 34 * put the messages in correct order based on the counter value of the message. 35 */ 36 public class FIXOutgoingMessageHandler { 37 38 private Map<String, Integer> countersMap; 39 private Map<String, Map<Integer,Object[]>> messagesMap; 40 private FIXSessionFactory sessionFactory; 41 42 public FIXOutgoingMessageHandler() { 43 countersMap = new HashMap<String, Integer>(); 44 messagesMap = new HashMap<String, Map<Integer,Object[]>>(); 45 } 46 47 public void setSessionFactory(FIXSessionFactory sessionFactory) { 48 this.sessionFactory = sessionFactory; 49 } 50 51 /** 52 * Performs the actual send operation on the message. Tries to send the messages in the order they 53 * arrived over the FIX transport 54 * 55 * @param message the FIX message to be sent 56 * @param targetSession ID of the target FIXSession 57 * @param sourceSession String that uniquely identifies the incoming session 58 * @param counter application level sequence number of the message 59 * @param msgCtx Axis2 MessageContext for the outgoing message 60 * @param targetEPR the target EPR to forward the message 61 * 62 * @throws SessionNotFound on error 63 */ 64 public synchronized void sendMessage(Message message, SessionID targetSession, String sourceSession, 65 int counter, MessageContext msgCtx, String targetEPR) throws SessionNotFound { 66 67 if (sourceSession != null && counter != -1) { 68 69 int expectedValue; 70 if (countersMap.containsKey(sourceSession)) { 71 expectedValue = countersMap.get(sourceSession); 72 } 73 else { 74 //create new entries in the respective Maps 75 //counter starts at 1 76 countersMap.put(sourceSession, 1); 77 messagesMap.put(sourceSession, new HashMap<Integer,Object[]>()); 78 expectedValue = 1; 79 } 80 81 if (expectedValue == counter) { 82 sendToTarget(msgCtx, targetEPR, message, targetSession); 83 countersMap.put(sourceSession, expectedValue++); 84 sendQueuedMessages(expectedValue, sourceSession); 85 } 86 else { 87 //save the message to be sent later... 88 Map<Integer,Object[]> messages = messagesMap.get(sourceSession); 89 Object[] obj = new Object[4]; 90 obj[0] = message; 91 obj[1] = targetSession; 92 obj[2] = msgCtx; 93 obj[3] = targetEPR; 94 messages.put(counter, obj); 95 messagesMap.put(sourceSession, messages); 96 } 97 } 98 else { 99 //insufficient information to send the messages in order... 100 // send it right away... 101 sendToTarget(msgCtx, targetEPR, message, targetSession); 102 } 103 } 104 105 /** 106 * Sends the FIX message to the given target session. If MessageContext and the target EPR 107 * are not null then save the outgoing MessageContext in the FIX application to handle the 108 * response. 109 * 110 * @param msgCtx the Axis2 MessageContext of the outgoing message 111 * @param targetEPR the target EPR to send the message 112 * @param message the FIX message 113 * @param sessionID the ID of the target FIX session 114 * 115 * @throws SessionNotFound on error 116 */ 117 private void sendToTarget(MessageContext msgCtx, String targetEPR, Message message, 118 SessionID sessionID) throws SessionNotFound { 119 if (msgCtx != null && targetEPR != null) { 120 FIXIncomingMessageHandler messageHandler = (FIXIncomingMessageHandler) sessionFactory. 121 getApplication(targetEPR); 122 messageHandler.setOutgoingMessageContext(msgCtx); 123 } 124 Session.sendToTarget(message, sessionID); 125 } 126 127 /** 128 * Sends any messages in the queues. Maintains the order of the messages. 129 * 130 * @param expectedValue expected counter value 131 * @param session source FIX session 132 * 133 * @throws SessionNotFound on error 134 */ 135 private void sendQueuedMessages(int expectedValue, String session) throws SessionNotFound { 136 Map<Integer, Object[]> messages = messagesMap.get(session); 137 Object[] obj = messages.get(expectedValue); 138 while (obj != null) { 139 Message message = (Message) obj[0]; 140 SessionID sessionID = (SessionID) obj[1]; 141 MessageContext msgCtx = null; 142 String targetEPR = null; 143 if (obj[2] != null) { 144 msgCtx = (MessageContext) obj[2]; 145 targetEPR = obj[3].toString(); 146 } 147 sendToTarget(msgCtx, targetEPR, message, sessionID); 148 messages.remove(expectedValue); 149 obj = messages.get(expectedValue++); 150 } 151 messagesMap.put(session, messages); 152 countersMap.put(session, expectedValue); 153 } 154 155 public void cleanUpMessages(String session) { 156 if (countersMap.containsKey(session)) { 157 int expectedValue = countersMap.get(session); 158 Map<Integer, Object[]> messages = messagesMap.get(session); 159 while (!messages.isEmpty()) { 160 Object[] obj = messages.get(expectedValue); 161 if (obj != null) { 162 Message message = (Message) obj[0]; 163 SessionID sessionID = (SessionID) obj[1]; 164 try { 165 Session.sendToTarget(message, sessionID); 166 } catch (SessionNotFound ignore) { } 167 168 messages.remove(expectedValue); 169 } 170 expectedValue++; 171 } 172 messagesMap.remove(session); 173 countersMap.remove(session); 174 } 175 } 176 }