1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.transport.fix; 21 22 import org.apache.axiom.attachments.ByteArrayDataSource; 23 import org.apache.axiom.om.OMElement; 24 import org.apache.axiom.soap.SOAPBody; 25 import org.apache.axiom.soap.SOAPEnvelope; 26 import org.apache.axiom.soap.SOAPFactory; 27 import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory; 28 import org.apache.axis2.AxisFault; 29 import org.apache.axis2.context.MessageContext; 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import quickfix; 33 import quickfix.field; 34 35 import javax.activation.DataHandler; 36 import javax.activation.DataSource; 37 import javax.xml.namespace.QName; 38 import java.io.ByteArrayOutputStream; 39 import java.io.IOException; 40 import java.net.InetSocketAddress; 41 import java.net.SocketAddress; 42 import java.util; 43 44 public class FIXUtils { 45 46 private static final Log log = LogFactory.getLog(FIXUtils.class); 47 private static FIXUtils _instance = new FIXUtils(); 48 49 public static FIXUtils getInstance() { 50 return _instance; 51 } 52 53 /** 54 * FIX messages are non-XML. So convert them into XML using the AXIOM API. 55 * Put the FIX message into an Axis2 MessageContext.The basic format of the 56 * generated SOAP envelope; 57 * <p/> 58 * <soapEnvelope> 59 * <soapBody> 60 * <message> 61 * <header> ....</header> 62 * <body> .... </body> 63 * <trailer> .... </trailer> 64 * </message> 65 * </soapBody> 66 * </soapEnvelope> 67 * 68 * @param message the FIX message 69 * @param counter application level sequence number of the message 70 * @param sessionID the incoming session 71 * @param msgCtx the Axis2 MessageContext to hold the FIX message 72 * @throws AxisFault the exception thrown when invalid soap envelopes are set to the msgCtx 73 */ 74 public void setSOAPEnvelope(Message message, int counter, String sessionID, 75 MessageContext msgCtx) throws AxisFault { 76 77 if (log.isDebugEnabled()) { 78 log.debug("Creating SOAP envelope for FIX message..."); 79 } 80 81 SOAPFactory soapFactory = new SOAP11Factory(); 82 OMElement msg = soapFactory.createOMElement(FIXConstants.FIX_MESSAGE, null); 83 msg.addAttribute(soapFactory.createOMAttribute(FIXConstants.FIX_MESSAGE_INCOMING_SESSION, null, sessionID)); 84 msg.addAttribute(soapFactory.createOMAttribute 85 (FIXConstants.FIX_MESSAGE_COUNTER, null, String.valueOf(counter))); 86 87 OMElement header = soapFactory.createOMElement(FIXConstants.FIX_HEADER, null); 88 OMElement body = soapFactory.createOMElement(FIXConstants.FIX_BODY, null); 89 OMElement trailer = soapFactory.createOMElement(FIXConstants.FIX_TRAILER, null); 90 91 //process FIX header 92 Iterator<Field<?>> iter = message.getHeader().iterator(); 93 if (iter != null) { 94 while (iter.hasNext()) { 95 Field<?> field = iter.next(); 96 OMElement msgField = soapFactory.createOMElement(FIXConstants.FIX_FIELD, null); 97 msgField.addAttribute(soapFactory. 98 createOMAttribute(FIXConstants.FIX_FIELD_ID, null, String.valueOf(field.getTag()))); 99 Object value = field.getObject(); 100 101 if (value instanceof byte[]) { 102 DataSource dataSource = new ByteArrayDataSource((byte[]) value); 103 DataHandler dataHandler = new DataHandler(dataSource); 104 String contentID = msgCtx.addAttachment(dataHandler); 105 OMElement binaryData = soapFactory.createOMElement(FIXConstants.FIX_BINARY_FIELD, null); 106 String binaryCID = "cid:" + contentID; 107 binaryData.addAttribute(FIXConstants.FIX_MESSAGE_REFERENCE, binaryCID, null); 108 msgField.addChild(binaryData); 109 } else { 110 soapFactory.createOMText(msgField, value.toString(), OMElement.CDATA_SECTION_NODE); 111 } 112 header.addChild(msgField); 113 } 114 } 115 //process FIX body 116 iter = message.iterator(); 117 if (iter != null) { 118 while (iter.hasNext()) { 119 Field<?> field = iter.next(); 120 OMElement msgField = soapFactory.createOMElement(FIXConstants.FIX_FIELD, null); 121 msgField.addAttribute(soapFactory. 122 createOMAttribute(FIXConstants.FIX_FIELD_ID, null, String.valueOf(field.getTag()))); 123 Object value = field.getObject(); 124 if (value instanceof byte[]) { 125 DataSource dataSource = new ByteArrayDataSource((byte[]) value); 126 DataHandler dataHandler = new DataHandler(dataSource); 127 String contentID = msgCtx.addAttachment(dataHandler); 128 OMElement binaryData = soapFactory.createOMElement(FIXConstants.FIX_BINARY_FIELD, null); 129 String binaryCID = "cid:" + contentID; 130 binaryData.addAttribute(FIXConstants.FIX_MESSAGE_REFERENCE, binaryCID, null); 131 msgField.addChild(binaryData); 132 } else { 133 soapFactory.createOMText(msgField, value.toString(), OMElement.CDATA_SECTION_NODE); 134 } 135 body.addChild(msgField); 136 } 137 } 138 //process FIX trailer 139 iter = message.getTrailer().iterator(); 140 if (iter != null) { 141 while (iter.hasNext()) { 142 Field<?> field = iter.next(); 143 OMElement msgField = soapFactory.createOMElement(FIXConstants.FIX_FIELD, null); 144 msgField.addAttribute(soapFactory. 145 createOMAttribute(FIXConstants.FIX_FIELD_ID, null, String.valueOf(field.getTag()))); 146 Object value = field.getObject(); 147 148 if (value instanceof byte[]) { 149 DataSource dataSource = new ByteArrayDataSource((byte[]) value); 150 DataHandler dataHandler = new DataHandler(dataSource); 151 String contentID = msgCtx.addAttachment(dataHandler); 152 OMElement binaryData = soapFactory.createOMElement(FIXConstants.FIX_BINARY_FIELD, null); 153 String binaryCID = "cid:" + contentID; 154 binaryData.addAttribute(FIXConstants.FIX_MESSAGE_REFERENCE, binaryCID, null); 155 msgField.addChild(binaryData); 156 } else { 157 soapFactory.createOMText(msgField, value.toString(), OMElement.CDATA_SECTION_NODE); 158 } 159 trailer.addChild(msgField); 160 } 161 } 162 163 msg.addChild(header); 164 msg.addChild(body); 165 msg.addChild(trailer); 166 SOAPEnvelope envelope = soapFactory.getDefaultEnvelope(); 167 envelope.getBody().addChild(msg); 168 msgCtx.setEnvelope(envelope); 169 } 170 171 172 /** 173 * Extract the FIX message embedded in an Axis2 MessageContext 174 * 175 * @param msgCtx the Axis2 MessageContext 176 * @return a FIX message 177 * @throws java.io.IOException the exception thrown when handling erroneous binary content 178 */ 179 public Message createFIXMessage(MessageContext msgCtx) throws IOException { 180 if (log.isDebugEnabled()) { 181 log.debug("Extracting FIX message from the message context (Message ID: " + msgCtx.getMessageID() + ")"); 182 } 183 184 Message message = new Message(); 185 SOAPBody soapBody = msgCtx.getEnvelope().getBody(); 186 OMElement messageNode = soapBody.getFirstChildWithName(new QName(FIXConstants.FIX_MESSAGE)); 187 Iterator<OMElement> messageElements = messageNode.getChildElements(); 188 189 while (messageElements.hasNext()) { 190 OMElement node = messageElements.next(); 191 //create FIX header 192 if (node.getQName().getLocalPart().equals(FIXConstants.FIX_HEADER)) { 193 Iterator<OMElement> headerElements = node.getChildElements(); 194 while (headerElements.hasNext()) { 195 OMElement headerNode = headerElements.next(); 196 String tag = headerNode.getAttributeValue(new QName(FIXConstants.FIX_FIELD_ID)); 197 String value = null; 198 199 OMElement child = headerNode.getFirstElement(); 200 if (child != null) { 201 String href = headerNode.getFirstElement(). 202 getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_REFERENCE)); 203 if (href != null) { 204 DataHandler binaryDataHandler = msgCtx.getAttachment(href.substring(4)); 205 ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 206 binaryDataHandler.writeTo(outputStream); 207 value = new String(outputStream.toByteArray()); 208 } 209 } else { 210 value = headerNode.getText(); 211 } 212 213 if (value != null) { 214 message.getHeader().setString(Integer.parseInt(tag), value); 215 } 216 } 217 218 } else if (node.getQName().getLocalPart().equals(FIXConstants.FIX_BODY)) { 219 //create FIX body 220 Iterator<OMElement> bodyElements = node.getChildElements(); 221 while (bodyElements.hasNext()) { 222 OMElement bodyNode = bodyElements.next(); 223 String tag = bodyNode.getAttributeValue(new QName(FIXConstants.FIX_FIELD_ID)); 224 String value = null; 225 226 OMElement child = bodyNode.getFirstElement(); 227 if (child != null) { 228 String href = bodyNode.getFirstElement(). 229 getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_REFERENCE)); 230 if (href != null) { 231 DataHandler binaryDataHandler = msgCtx.getAttachment(href.substring(4)); 232 ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 233 binaryDataHandler.writeTo(outputStream); 234 value = new String(outputStream.toByteArray()); 235 } 236 } else { 237 value = bodyNode.getText(); 238 } 239 240 if (value != null) { 241 message.setString(Integer.parseInt(tag), value); 242 } 243 } 244 } else if (node.getQName().getLocalPart().equals(FIXConstants.FIX_TRAILER)) { 245 //create FIX trailer 246 Iterator<OMElement> trailerElements = node.getChildElements(); 247 while (trailerElements.hasNext()) { 248 OMElement trailerNode = trailerElements.next(); 249 String tag = trailerNode.getAttributeValue(new QName(FIXConstants.FIX_FIELD_ID)); 250 String value = null; 251 252 OMElement child = trailerNode.getFirstElement(); 253 if (child != null) { 254 String href = trailerNode.getFirstElement(). 255 getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_REFERENCE)); 256 if (href != null) { 257 DataHandler binaryDataHandler = msgCtx.getAttachment(href.substring(4)); 258 ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 259 binaryDataHandler.writeTo(outputStream); 260 value = new String(outputStream.toByteArray()); 261 } 262 } else { 263 value = trailerNode.getText(); 264 } 265 266 if (value != null) { 267 message.getTrailer().setString(Integer.parseInt(tag), value); 268 } 269 } 270 } 271 } 272 return message; 273 } 274 275 276 /** 277 * Generate EPRs for the specified FIX service. A FIX end point can be uniquely 278 * identified by a <host(IP), port> pair. Add some additional FIX session details 279 * so the EPRs are more self descriptive. 280 * A FIX EPR generated here looks like; 281 * fix://10.100.1.80:9898?BeginString=FIX.4.4&SenderCompID=BANZAI&TargetCompID=EXEC& 282 * SessionQualifier=mySession&Serviec=StockQuoteProxy 283 * 284 * @param acceptor the SocketAcceptor associated with the service 285 * @param serviceName the name of the service 286 * @param ip the IP address of the host 287 * @return an array of EPRs for the specified service in String format 288 */ 289 public static String[] generateEPRs(SocketAcceptor acceptor, String serviceName, String ip) { 290 //Get all the addresses associated with the acceptor 291 Map<SessionID, SocketAddress> socketAddresses = acceptor.getAcceptorAddresses(); 292 //Get all the sessions (SessionIDs) associated with the acceptor 293 ArrayList<SessionID> sessions = acceptor.getSessions(); 294 String[] EPRList = new String[sessions.size()]; 295 296 //Generate an EPR for each session/socket address 297 for (int i = 0; i < sessions.size(); i++) { 298 SessionID sessionID = sessions.get(i); 299 InetSocketAddress socketAddress = (InetSocketAddress) socketAddresses.get(sessionID); 300 EPRList[i] = FIXConstants.FIX_PREFIX + ip + ":" + socketAddress.getPort() + 301 "?" + FIXConstants.BEGIN_STRING + "=" + sessionID.getBeginString() + 302 "&" + FIXConstants.SENDER_COMP_ID + "=" + sessionID.getTargetCompID() + 303 "&" + FIXConstants.TARGET_COMP_ID + "=" + sessionID.getSenderCompID(); 304 305 String sessionQualifier = sessionID.getSessionQualifier(); 306 if (sessionQualifier != null && !sessionQualifier.equals("")) { 307 EPRList[i] += "&" + FIXConstants.SESSION_QUALIFIER + "=" + sessionQualifier; 308 } 309 310 String senderSubID = sessionID.getSenderSubID(); 311 if (senderSubID != null && !senderSubID.equals("")) { 312 EPRList[i] += "&" + FIXConstants.SENDER_SUB_ID + "=" + senderSubID; 313 } 314 315 String targetSubID = sessionID.getTargetSubID(); 316 if (targetSubID != null && !targetSubID.equals("")) { 317 EPRList[i] += "&" + FIXConstants.TARGET_SUB_ID + "=" + targetSubID; 318 } 319 320 String senderLocationID = sessionID.getSenderLocationID(); 321 if (senderLocationID != null && !senderLocationID.equals("")) { 322 EPRList[i] += "&" + FIXConstants.SENDER_LOCATION_ID + "=" + senderLocationID; 323 } 324 325 String targetLocationID = sessionID.getTargetLocationID(); 326 if (targetLocationID != null && !targetLocationID.equals("")) { 327 EPRList[i] += "&" + FIXConstants.TARGET_LOCATION_ID + "=" + targetLocationID; 328 } 329 330 EPRList[i] += "&Service=" + serviceName; 331 } 332 return EPRList; 333 } 334 335 /** 336 * Extracts parameters embedded in FIX EPRs 337 * 338 * @param url a valid FIX EPR 339 * @return a Hashtable of FIX properties 340 */ 341 public static Hashtable getProperties(String url) { 342 Hashtable<String, String> h = new Hashtable<String, String>(); 343 int propPos = url.indexOf("?"); 344 if (propPos != -1) { 345 StringTokenizer st = new StringTokenizer(url.substring(propPos + 1), "&"); 346 while (st.hasMoreTokens()) { 347 String token = st.nextToken(); 348 int sep = token.indexOf("="); 349 if (sep != -1) { 350 h.put(token.substring(0, sep), token.substring(sep + 1)); 351 } 352 } 353 } 354 return h; 355 } 356 357 /* 358 * This is here because AXIOM does not support removing CDATA tags yet. Given a String embedded in 359 * CDATA tags this method will return the String element only. 360 * 361 * @param str the String with CDATA tags 362 * @return String with CDATA tags stripped 363 * 364 private static String removeCDATA(String str) { 365 if (str.indexOf("<![CDATA[") != -1) { 366 str = str.split("CDATA")[1].split("]></field>")[0]; 367 str= str.substring(1, str.length()-1); 368 return str; 369 } else { 370 return str; 371 } 372 }*/ 373 374 /** 375 * Extracts the fields related to message forwarding (third party routing) from 376 * the FIX header. 377 * 378 * @param message the FIX message 379 * @return a Map of forwarding parameters 380 */ 381 public static Map<String, String> getMessageForwardingParameters(Message message) { 382 383 Map<String, String> map = new HashMap<String, String>(); 384 String value = getHeaderFieldValue(message, BeginString.FIELD); 385 map.put(FIXConstants.BEGIN_STRING, value); 386 value = getHeaderFieldValue(message, SenderCompID.FIELD); 387 map.put(FIXConstants.SENDER_COMP_ID, value); 388 value = getHeaderFieldValue(message, SenderSubID.FIELD); 389 map.put(FIXConstants.SENDER_SUB_ID, value); 390 value = getHeaderFieldValue(message, SenderLocationID.FIELD); 391 map.put(FIXConstants.SENDER_LOCATION_ID, value); 392 value = getHeaderFieldValue(message, TargetCompID.FIELD); 393 map.put(FIXConstants.TARGET_COMP_ID, value); 394 value = getHeaderFieldValue(message, DeliverToCompID.FIELD); 395 map.put(FIXConstants.DELIVER_TO_COMP_ID, value); 396 value = getHeaderFieldValue(message, DeliverToSubID.FIELD); 397 map.put(FIXConstants.DELIVER_TO_SUB_ID, value); 398 value = getHeaderFieldValue(message, DeliverToLocationID.FIELD); 399 map.put(FIXConstants.DELIVER_TO_LOCATION_ID, value); 400 value = getHeaderFieldValue(message, OnBehalfOfCompID.FIELD); 401 map.put(FIXConstants.ON_BEHALF_OF_COMP_ID, value); 402 value = getHeaderFieldValue(message, OnBehalfOfSubID.FIELD); 403 map.put(FIXConstants.ON_BEHALF_OF_SUB_ID, value); 404 value = getHeaderFieldValue(message, OnBehalfOfLocationID.FIELD); 405 map.put(FIXConstants.ON_BEHALF_OF_LOCATION_ID, value); 406 return map; 407 } 408 409 private static String getHeaderFieldValue(Message message, int tag) { 410 try { 411 return message.getHeader().getString(tag); 412 } catch (FieldNotFound fieldNotFound) { 413 return null; 414 } 415 } 416 417 /** 418 * Extracts the name of the service which processed the message from the MessageContext 419 * 420 * @param msgCtx Axis2 MessageContext of a message 421 * @return name of the AxisService 422 * @throws org.apache.axis2.AxisFault on error 423 */ 424 public static String getServiceName(MessageContext msgCtx) throws AxisFault { 425 426 Object serviceParam = msgCtx.getProperty(FIXConstants.FIX_SERVICE_NAME); 427 if (serviceParam != null) { 428 String serviceName = serviceParam.toString(); 429 if (serviceName != null && !serviceName.equals("")) { 430 return serviceName; 431 } 432 } 433 434 Map<String, String> trpHeaders = (Map) msgCtx.getProperty(MessageContext.TRANSPORT_HEADERS); 435 //try to get the service from the transport headers 436 if (trpHeaders != null) { 437 String serviceName = (trpHeaders.get(FIXConstants.FIX_MESSAGE_SERVICE)); 438 if (serviceName != null) { 439 return serviceName; 440 } 441 } 442 throw new AxisFault("Unable to find a valid service for the message"); 443 } 444 445 /** 446 * Extracts the application type for the message from the message context 447 * 448 * @param msgCtx Axis2 Message Context 449 * @return application type of the message 450 */ 451 public static String getFixApplication(MessageContext msgCtx) { 452 Map<String, String> trpHeaders = (Map) msgCtx.getProperty(MessageContext.TRANSPORT_HEADERS); 453 //try to get the application type from the transport headers 454 String fixApplication = null; 455 if (trpHeaders != null) { 456 fixApplication = trpHeaders.get(FIXConstants.FIX_MESSAGE_APPLICATION); 457 } 458 return fixApplication; 459 } 460 461 /** 462 * Creates a Map of transport headers for a message 463 * 464 * @param serviceName name of the service to which the message belongs to 465 * @param fixApplication FIX application type 466 * @return a Map of transport headers 467 */ 468 public static Map<String, String> getTransportHeaders(String serviceName, String fixApplication) { 469 Map<String, String> trpHeaders = new HashMap<String, String>(); 470 trpHeaders.put(FIXConstants.FIX_MESSAGE_SERVICE, serviceName); 471 trpHeaders.put(FIXConstants.FIX_MESSAGE_APPLICATION, fixApplication); 472 return trpHeaders; 473 } 474 475 /** 476 * Reads a FIX EPR and returns the host and port on a String array 477 * 478 * @param fixEPR a FIX EPR 479 * @return an array of Strings containing addressing elements 480 * @throws AxisFault on error 481 */ 482 public static String[] getSocketAddressElements(String fixEPR) throws AxisFault { 483 int propPos = fixEPR.indexOf("?"); 484 if (propPos != -1 && fixEPR.startsWith(FIXConstants.FIX_PREFIX)) { 485 String address = fixEPR.substring(FIXConstants.FIX_PREFIX.length(), propPos); 486 String[] socketAddressElemets = address.split(":"); 487 if (socketAddressElemets.length == 2) { 488 return socketAddressElemets; 489 } 490 } 491 throw new AxisFault("Malformed FIX EPR: " + fixEPR); 492 } 493 494 /** 495 * Reads the SOAP body of a message and attempts to retreive the application level sequence number 496 * 497 * @param msgCtx Axis2 MessageContext 498 * @return application level sequence number or -1 499 */ 500 public static int getSequenceNumber(MessageContext msgCtx) { 501 SOAPBody body = msgCtx.getEnvelope().getBody(); 502 OMElement messageNode = body.getFirstChildWithName(new QName(FIXConstants.FIX_MESSAGE)); 503 String value = messageNode.getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_COUNTER)); 504 if (value != null) { 505 return Integer.parseInt(value); 506 } else { 507 return -1; 508 } 509 } 510 511 /** 512 * Reads the SOAP body of a message and attempts to retreive the session identifier string 513 * 514 * @param msgCtx Axis2 MessageContext 515 * @return a String uniquely identifying a session or null 516 */ 517 public static String getSourceSession(MessageContext msgCtx) { 518 SOAPBody body = msgCtx.getEnvelope().getBody(); 519 OMElement messageNode = body.getFirstChildWithName(new QName(FIXConstants.FIX_MESSAGE)); 520 return messageNode.getAttributeValue(new QName(FIXConstants.FIX_MESSAGE_INCOMING_SESSION)); 521 } 522 }