1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.mediators.eip.aggregator; 21 22 import org.apache.axiom.soap.SOAP11Constants; 23 import org.apache.axiom.soap.SOAP12Constants; 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.apache.synapse.MessageContext; 27 import org.apache.synapse.SynapseConstants; 28 import org.apache.synapse.mediators.AbstractMediator; 29 import org.apache.synapse.mediators.base.SequenceMediator; 30 import org.apache.synapse.mediators.eip.EIPConstants; 31 import org.apache.synapse.mediators.eip.EIPUtils; 32 import org.apache.synapse.util.xpath.SynapseXPath; 33 import org.jaxen.JaxenException; 34 35 import java.util.Collections; 36 import java.util.HashMap; 37 import java.util.Map; 38 39 /** 40 * Aggregate a number of messages that are determined to be for a particular group, and combine 41 * them to form a single message which is then processed through the 'onComplete' sequence. Thus 42 * an aggregator acts like a filter, and may look at a correlation XPath expression to select 43 * messages for aggregation - or look at messageSequence number properties for aggregation or 44 * let any other (i.e. non aggregatable) messages flow through 45 * An instance of this mediator will register with a Timer to be notified after a specified timeout, 46 * so that aggregations that never would complete could be timed out and cleared from memory and 47 * any fault conditions handled 48 */ 49 public class AggregateMediator extends AbstractMediator { 50 51 private static final Log log = LogFactory.getLog(AggregateMediator.class); 52 private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER); 53 54 /** The duration as a number of milliseconds for this aggregation to complete */ 55 private long completionTimeoutMillis = 0; 56 /** The minimum number of messages required to complete aggregation */ 57 private int minMessagesToComplete = -1; 58 /** The maximum number of messages required to complete aggregation */ 59 private int maxMessagesToComplete = -1; 60 61 /** 62 * XPath that specifies a correlation expression that can be used to combine messages. An 63 * example maybe //department@id="11" 64 */ 65 private SynapseXPath correlateExpression = null; 66 /** 67 * An XPath expression that may specify a selected element to be aggregated from a group of 68 * messages to create the aggregated message 69 * e.g. //getQuote/return would pick up and aggregate the //getQuote/return elements from a 70 * bunch of matching messages into one aggregated message 71 */ 72 private SynapseXPath aggregationExpression = null; 73 74 /** This holds the reference sequence name of the */ 75 private String onCompleteSequenceRef = null; 76 /** Inline sequence definition holder that holds the onComplete sequence */ 77 private SequenceMediator onCompleteSequence = null; 78 79 /** The active aggregates currently being processd */ 80 private Map<String, Aggregate> activeAggregates = 81 Collections.synchronizedMap(new HashMap<String, Aggregate>()); 82 83 /** Lock object to provide the synchronized access to the activeAggregates on checking */ 84 private final Object lock = new Object(); 85 86 public AggregateMediator() { 87 try { 88 aggregationExpression = new SynapseXPath("s11:Body/child::*[position()=1] | " + 89 "s12:Body/child::*[position()=1]"); 90 aggregationExpression.addNamespace("s11", SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI); 91 aggregationExpression.addNamespace("s12", SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI); 92 } catch (JaxenException e) { 93 if (log.isDebugEnabled()) { 94 handleException("Unable to set the default " + 95 "aggregationExpression for the aggregation", e, null); 96 } 97 } 98 } 99 100 /** 101 * Aggregate messages flowing through this mediator according to the correlation criteria 102 * and the aggregation algorithm specified to it 103 * 104 * @param synCtx - MessageContext to be mediated and aggregated 105 * @return boolean true if the complete condition for the particular aggregate is validated 106 */ 107 public boolean mediate(MessageContext synCtx) { 108 109 boolean traceOn = isTraceOn(synCtx); 110 boolean traceOrDebugOn = isTraceOrDebugOn(traceOn); 111 112 if (traceOrDebugOn) { 113 traceOrDebug(traceOn, "Start : Aggregate mediator"); 114 115 if (traceOn && trace.isTraceEnabled()) { 116 trace.trace("Message : " + synCtx.getEnvelope()); 117 } 118 } 119 120 try { 121 Aggregate aggregate = null; 122 123 // if a correlateExpression is provided and there is a coresponding 124 // element in the current message prepare to correlate the messages on that 125 if (correlateExpression != null 126 && correlateExpression.evaluate(synCtx) != null) { 127 128 while (aggregate == null) { 129 130 synchronized (lock) { 131 132 if (activeAggregates.containsKey(correlateExpression.toString())) { 133 134 aggregate = activeAggregates.get(correlateExpression.toString()); 135 if (aggregate != null) { 136 if (!aggregate.getLock()) { 137 aggregate = null; 138 } 139 } 140 141 } else { 142 143 if (traceOrDebugOn) { 144 traceOrDebug(traceOn, "Creating new Aggregator - " + 145 (completionTimeoutMillis > 0 ? "expires in : " 146 + (completionTimeoutMillis / 1000) + "secs" : 147 "without expiry time")); 148 } 149 150 aggregate = new Aggregate( 151 correlateExpression.toString(), 152 completionTimeoutMillis, 153 minMessagesToComplete, 154 maxMessagesToComplete, this); 155 156 if (completionTimeoutMillis > 0) { 157 synCtx.getConfiguration().getSynapseTimer(). 158 schedule(aggregate, completionTimeoutMillis); 159 } 160 aggregate.getLock(); 161 activeAggregates.put(correlateExpression.toString(), aggregate); 162 } 163 } 164 } 165 166 } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION) != null) { 167 // if the correlattion cannot be found using the correlateExpression then 168 // try the default which is through the AGGREGATE_CORRELATION message property 169 // which is the unique original message id of a split or iterate operation and 170 // which thus can be used to uniquely group messages into aggregates 171 172 Object o = synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION); 173 String correlation; 174 175 if (o != null && o instanceof String) { 176 correlation = (String) o; 177 while (aggregate == null) { 178 synchronized (lock) { 179 if (activeAggregates.containsKey(correlation)) { 180 aggregate = activeAggregates.get(correlation); 181 if (aggregate != null) { 182 if (!aggregate.getLock()) { 183 aggregate = null; 184 } 185 } else { 186 break; 187 } 188 } else { 189 if (traceOrDebugOn) { 190 traceOrDebug(traceOn, "Creating new Aggregator - " + 191 (completionTimeoutMillis > 0 ? "expires in : " 192 + (completionTimeoutMillis / 1000) + "secs" : 193 "without expiry time")); 194 } 195 196 aggregate = new Aggregate( 197 correlation, 198 completionTimeoutMillis, 199 minMessagesToComplete, 200 maxMessagesToComplete, this); 201 202 if (completionTimeoutMillis > 0) { 203 synCtx.getConfiguration().getSynapseTimer(). 204 schedule(aggregate, completionTimeoutMillis); 205 } 206 aggregate.getLock(); 207 activeAggregates.put(correlation, aggregate); 208 } 209 } 210 } 211 212 } else { 213 if (traceOrDebugOn) { 214 traceOrDebug(traceOn, "Unable to find aggrgation correlation property"); 215 } 216 return true; 217 } 218 } else { 219 if (traceOrDebugOn) { 220 traceOrDebug(traceOn, 221 "Unable to find aggrgation correlation XPath or property"); 222 } 223 return true; 224 } 225 226 // if there is an aggregate continue on aggregation 227 if (aggregate != null) { 228 boolean collected = aggregate.addMessage(synCtx); 229 if (traceOrDebugOn) { 230 if (collected) { 231 traceOrDebug(traceOn, "Collected a message during aggregation"); 232 if (traceOn && trace.isTraceEnabled()) { 233 trace.trace("Collected message : " + synCtx); 234 } 235 } 236 } 237 238 // check the completeness of the aggregate and if completed aggregate the messages 239 // if not completed return false and block the message sequence till it completes 240 241 if (aggregate.isComplete(traceOn, traceOrDebugOn, trace, log)) { 242 if (traceOrDebugOn) { 243 traceOrDebug(traceOn, "Aggregation completed - invoking onComplete"); 244 } 245 completeAggregate(aggregate); 246 247 if (traceOrDebugOn) { 248 traceOrDebug(traceOn, "End : Aggregate mediator"); 249 } 250 return true; 251 } else { 252 aggregate.releaseLock(); 253 } 254 255 } else { 256 // if the aggregation correlation cannot be found then continue the message on the 257 // normal path by returning true 258 259 if (traceOrDebugOn) { 260 traceOrDebug(traceOn, "Unable to find an aggregate for this message - skip"); 261 } 262 return true; 263 } 264 265 } catch (JaxenException e) { 266 handleException("Unable to execute the XPATH over the message", e, synCtx); 267 } 268 269 if (traceOrDebugOn) { 270 traceOrDebug(traceOn, "End : Aggregate mediator"); 271 } 272 273 return false; 274 } 275 276 /** 277 * Invoked by the Aggregate objects that are timed out, to signal timeout/completion of 278 * itself 279 * @param aggregate the timed out Aggregate that holds collected messages and properties 280 */ 281 public synchronized void completeAggregate(Aggregate aggregate) { 282 283 if (log.isDebugEnabled()) { 284 log.debug("Aggregation completed or timed out"); 285 } 286 287 // cancel the timer 288 aggregate.cancel(); 289 aggregate.setCompleted(true); 290 291 MessageContext newSynCtx = getAggregatedMessage(aggregate); 292 if (newSynCtx == null) { 293 log.warn("An aggregation of messages timed out with no aggregated messages", null); 294 return; 295 } 296 297 activeAggregates.remove(aggregate.getCorrelation()); 298 299 if ((correlateExpression != null && 300 !correlateExpression.toString().equals(aggregate.getCorrelation())) || 301 correlateExpression == null) { 302 303 if (onCompleteSequence != null) { 304 onCompleteSequence.mediate(newSynCtx); 305 306 } else if (onCompleteSequenceRef != null 307 && newSynCtx.getSequence(onCompleteSequenceRef) != null) { 308 newSynCtx.getSequence(onCompleteSequenceRef).mediate(newSynCtx); 309 310 } else { 311 handleException("Unable to find the sequence for the mediation " + 312 "of the aggregated message", newSynCtx); 313 } 314 } 315 } 316 317 /** 318 * Get the aggregated message from the specified Aggregate instance 319 * 320 * @param aggregate the Aggregate object that holds collected messages and properties of the 321 * aggregation 322 * @return the aggregated message context 323 */ 324 private MessageContext getAggregatedMessage(Aggregate aggregate) { 325 326 MessageContext newCtx = null; 327 328 for (MessageContext synCtx : aggregate.getMessages()) { 329 330 if (newCtx == null) { 331 newCtx = synCtx; 332 333 if (log.isDebugEnabled()) { 334 log.debug("Generating Aggregated message from : " + newCtx.getEnvelope()); 335 } 336 337 } else { 338 try { 339 if (log.isDebugEnabled()) { 340 log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " + 341 aggregationExpression); 342 } 343 344 EIPUtils.enrichEnvelope( 345 newCtx.getEnvelope(), synCtx.getEnvelope(), aggregationExpression); 346 347 if (log.isDebugEnabled()) { 348 log.debug("Merged result : " + newCtx.getEnvelope()); 349 } 350 351 } catch (JaxenException e) { 352 handleException("Error merging aggregation results using XPath : " + 353 aggregationExpression.toString(), e, synCtx); 354 } 355 } 356 } 357 return newCtx; 358 } 359 360 public SynapseXPath getCorrelateExpression() { 361 return correlateExpression; 362 } 363 364 public void setCorrelateExpression(SynapseXPath correlateExpression) { 365 this.correlateExpression = correlateExpression; 366 } 367 368 public long getCompletionTimeoutMillis() { 369 return completionTimeoutMillis; 370 } 371 372 public void setCompletionTimeoutMillis(long completionTimeoutMillis) { 373 this.completionTimeoutMillis = completionTimeoutMillis; 374 } 375 376 public int getMinMessagesToComplete() { 377 return minMessagesToComplete; 378 } 379 380 public void setMinMessagesToComplete(int minMessagesToComplete) { 381 this.minMessagesToComplete = minMessagesToComplete; 382 } 383 384 public int getMaxMessagesToComplete() { 385 return maxMessagesToComplete; 386 } 387 388 public void setMaxMessagesToComplete(int maxMessagesToComplete) { 389 this.maxMessagesToComplete = maxMessagesToComplete; 390 } 391 392 public SynapseXPath getAggregationExpression() { 393 return aggregationExpression; 394 } 395 396 public void setAggregationExpression(SynapseXPath aggregationExpression) { 397 this.aggregationExpression = aggregationExpression; 398 } 399 400 public String getOnCompleteSequenceRef() { 401 return onCompleteSequenceRef; 402 } 403 404 public void setOnCompleteSequenceRef(String onCompleteSequenceRef) { 405 this.onCompleteSequenceRef = onCompleteSequenceRef; 406 } 407 408 public SequenceMediator getOnCompleteSequence() { 409 return onCompleteSequence; 410 } 411 412 public void setOnCompleteSequence(SequenceMediator onCompleteSequence) { 413 this.onCompleteSequence = onCompleteSequence; 414 } 415 416 public Map getActiveAggregates() { 417 return activeAggregates; 418 } 419 }