public boolean mediate(MessageContext synCtx) {
boolean traceOn = isTraceOn(synCtx);
boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Start : Aggregate mediator");
if (traceOn && trace.isTraceEnabled()) {
trace.trace("Message : " + synCtx.getEnvelope());
}
}
try {
Aggregate aggregate = null;
// if a correlateExpression is provided and there is a coresponding
// element in the current message prepare to correlate the messages on that
if (correlateExpression != null
&& correlateExpression.evaluate(synCtx) != null) {
while (aggregate == null) {
synchronized (lock) {
if (activeAggregates.containsKey(correlateExpression.toString())) {
aggregate = activeAggregates.get(correlateExpression.toString());
if (aggregate != null) {
if (!aggregate.getLock()) {
aggregate = null;
}
}
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Creating new Aggregator - " +
(completionTimeoutMillis > 0 ? "expires in : "
+ (completionTimeoutMillis / 1000) + "secs" :
"without expiry time"));
}
aggregate = new Aggregate(
correlateExpression.toString(),
completionTimeoutMillis,
minMessagesToComplete,
maxMessagesToComplete, this);
if (completionTimeoutMillis > 0) {
synCtx.getConfiguration().getSynapseTimer().
schedule(aggregate, completionTimeoutMillis);
}
aggregate.getLock();
activeAggregates.put(correlateExpression.toString(), aggregate);
}
}
}
} else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION) != null) {
// if the correlattion cannot be found using the correlateExpression then
// try the default which is through the AGGREGATE_CORRELATION message property
// which is the unique original message id of a split or iterate operation and
// which thus can be used to uniquely group messages into aggregates
Object o = synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION);
String correlation;
if (o != null && o instanceof String) {
correlation = (String) o;
while (aggregate == null) {
synchronized (lock) {
if (activeAggregates.containsKey(correlation)) {
aggregate = activeAggregates.get(correlation);
if (aggregate != null) {
if (!aggregate.getLock()) {
aggregate = null;
}
} else {
break;
}
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Creating new Aggregator - " +
(completionTimeoutMillis > 0 ? "expires in : "
+ (completionTimeoutMillis / 1000) + "secs" :
"without expiry time"));
}
aggregate = new Aggregate(
correlation,
completionTimeoutMillis,
minMessagesToComplete,
maxMessagesToComplete, this);
if (completionTimeoutMillis > 0) {
synCtx.getConfiguration().getSynapseTimer().
schedule(aggregate, completionTimeoutMillis);
}
aggregate.getLock();
activeAggregates.put(correlation, aggregate);
}
}
}
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Unable to find aggrgation correlation property");
}
return true;
}
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn,
"Unable to find aggrgation correlation XPath or property");
}
return true;
}
// if there is an aggregate continue on aggregation
if (aggregate != null) {
boolean collected = aggregate.addMessage(synCtx);
if (traceOrDebugOn) {
if (collected) {
traceOrDebug(traceOn, "Collected a message during aggregation");
if (traceOn && trace.isTraceEnabled()) {
trace.trace("Collected message : " + synCtx);
}
}
}
// check the completeness of the aggregate and if completed aggregate the messages
// if not completed return false and block the message sequence till it completes
if (aggregate.isComplete(traceOn, traceOrDebugOn, trace, log)) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Aggregation completed - invoking onComplete");
}
completeAggregate(aggregate);
if (traceOrDebugOn) {
traceOrDebug(traceOn, "End : Aggregate mediator");
}
return true;
} else {
aggregate.releaseLock();
}
} else {
// if the aggregation correlation cannot be found then continue the message on the
// normal path by returning true
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Unable to find an aggregate for this message - skip");
}
return true;
}
} catch (JaxenException e) {
handleException("Unable to execute the XPATH over the message", e, synCtx);
}
if (traceOrDebugOn) {
traceOrDebug(traceOn, "End : Aggregate mediator");
}
return false;
}
Aggregate messages flowing through this mediator according to the correlation criteria
and the aggregation algorithm specified to it |