public synchronized boolean isComplete(boolean traceOn,
boolean traceOrDebugOn,
Log trace,
Log log) {
if (!completed) {
// if any messages have been collected, check if the completion criteria is met
if (!messages.isEmpty()) {
// get total messages for this group, from the first message we have collected
MessageContext mc = messages.get(0);
Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
if (prop != null && prop instanceof String) {
String[] msgSequence = prop.toString().split(
EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
int total = Integer.parseInt(msgSequence[1]);
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log, messages.size() +
" messages of " + total + " collected in current aggregation");
}
if (messages.size() >= total) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log, "Aggregation complete");
}
return true;
}
}
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
}
}
// if the minimum number of messages has been reached, its complete
if (minCount > 0 && messages.size() >= minCount) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log,
"Aggregation complete - the minimum : " + minCount
+ " messages has been reached");
}
return true;
}
if (maxCount > 0 && messages.size() >= maxCount) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log,
"Aggregation complete - the maximum : " + maxCount
+ " messages has been reached");
}
return true;
}
// else, has this aggregation reached its timeout?
if (expiryTimeMillis > 0 && System.currentTimeMillis() >= expiryTimeMillis) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log,
"Aggregation complete - the aggregation has timed out");
}
return true;
}
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log,
"Aggregation already completed - this message will not be processed in aggregation");
}
}
return false;
}
Has this aggregation group completed? |