Home » synapse-1.2-src » org.apache.synapse.mediators.eip.aggregator » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.mediators.eip.aggregator;
   21   
   22   import org.apache.axiom.soap.SOAP11Constants;
   23   import org.apache.axiom.soap.SOAP12Constants;
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   import org.apache.synapse.MessageContext;
   27   import org.apache.synapse.SynapseConstants;
   28   import org.apache.synapse.mediators.AbstractMediator;
   29   import org.apache.synapse.mediators.base.SequenceMediator;
   30   import org.apache.synapse.mediators.eip.EIPConstants;
   31   import org.apache.synapse.mediators.eip.EIPUtils;
   32   import org.apache.synapse.util.xpath.SynapseXPath;
   33   import org.jaxen.JaxenException;
   34   
   35   import java.util.Collections;
   36   import java.util.HashMap;
   37   import java.util.Map;
   38   
   39   /**
   40    * Aggregate a number of messages that are determined to be for a particular group, and combine
   41    * them to form a single message which is then processed through the 'onComplete' sequence. Thus
   42    * an aggregator acts like a filter, and may look at a correlation XPath expression to select
   43    * messages for aggregation - or look at messageSequence number properties for aggregation or
   44    * let any other (i.e. non aggregatable) messages flow through
   45    * An instance of this mediator will register with a Timer to be notified after a specified timeout,
   46    * so that aggregations that never would complete could be timed out and cleared from memory and
   47    * any fault conditions handled
   48    */
   49   public class AggregateMediator extends AbstractMediator {
   50   
   51       private static final Log log = LogFactory.getLog(AggregateMediator.class);
   52       private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
   53   
   54       /** The duration as a number of milliseconds for this aggregation to complete */
   55       private long completionTimeoutMillis = 0;
   56       /** The minimum number of messages required to complete aggregation */
   57       private int minMessagesToComplete = -1;
   58       /** The maximum number of messages required to complete aggregation */
   59       private int maxMessagesToComplete = -1;
   60   
   61       /**
   62        * XPath that specifies a correlation expression that can be used to combine messages. An
   63        * example maybe //department@id="11"
   64        */
   65       private SynapseXPath correlateExpression = null;
   66       /**
   67        * An XPath expression that may specify a selected element to be aggregated from a group of
   68        * messages to create the aggregated message
   69        * e.g. //getQuote/return would pick up and aggregate the //getQuote/return elements from a
   70        * bunch of matching messages into one aggregated message
   71        */
   72       private SynapseXPath aggregationExpression = null;
   73   
   74       /** This holds the reference sequence name of the */
   75       private String onCompleteSequenceRef = null;
   76       /** Inline sequence definition holder that holds the onComplete sequence */
   77       private SequenceMediator onCompleteSequence = null;
   78   
   79       /** The active aggregates currently being processd */
   80       private Map<String, Aggregate> activeAggregates =
   81           Collections.synchronizedMap(new HashMap<String, Aggregate>());
   82   
   83       /** Lock object to provide the synchronized access to the activeAggregates on checking */
   84       private final Object lock = new Object();
   85   
   86       public AggregateMediator() {
   87           try {
   88               aggregationExpression = new SynapseXPath("s11:Body/child::*[position()=1] | " +
   89                   "s12:Body/child::*[position()=1]");
   90               aggregationExpression.addNamespace("s11", SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
   91               aggregationExpression.addNamespace("s12", SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
   92           } catch (JaxenException e) {
   93               if (log.isDebugEnabled()) {
   94                   handleException("Unable to set the default " +
   95                       "aggregationExpression for the aggregation", e, null);
   96               }
   97           }
   98       }
   99   
  100       /**
  101        * Aggregate messages flowing through this mediator according to the correlation criteria
  102        * and the aggregation algorithm specified to it
  103        *
  104        * @param synCtx - MessageContext to be mediated and aggregated
  105        * @return boolean true if the complete condition for the particular aggregate is validated
  106        */
  107       public boolean mediate(MessageContext synCtx) {
  108   
  109           boolean traceOn = isTraceOn(synCtx);
  110           boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
  111   
  112           if (traceOrDebugOn) {
  113               traceOrDebug(traceOn, "Start : Aggregate mediator");
  114   
  115               if (traceOn && trace.isTraceEnabled()) {
  116                   trace.trace("Message : " + synCtx.getEnvelope());
  117               }
  118           }
  119   
  120           try {
  121               Aggregate aggregate = null;
  122   
  123               // if a correlateExpression is provided and there is a coresponding
  124               // element in the current message prepare to correlate the messages on that
  125               if (correlateExpression != null
  126                       && correlateExpression.evaluate(synCtx) != null) {
  127   
  128                   while (aggregate == null) {
  129   
  130                       synchronized (lock) {
  131   
  132                           if (activeAggregates.containsKey(correlateExpression.toString())) {
  133   
  134                               aggregate = activeAggregates.get(correlateExpression.toString());
  135                               if (aggregate != null) {
  136                                   if (!aggregate.getLock()) {
  137                                       aggregate = null;
  138                                   }
  139                               }
  140   
  141                           } else {
  142   
  143                               if (traceOrDebugOn) {
  144                                   traceOrDebug(traceOn, "Creating new Aggregator - " +
  145                                           (completionTimeoutMillis > 0 ? "expires in : "
  146                                                   + (completionTimeoutMillis / 1000) + "secs" :
  147                                                   "without expiry time"));
  148                               }
  149   
  150                               aggregate = new Aggregate(
  151                                       correlateExpression.toString(),
  152                                       completionTimeoutMillis,
  153                                       minMessagesToComplete,
  154                                       maxMessagesToComplete, this);
  155   
  156                               if (completionTimeoutMillis > 0) {
  157                                   synCtx.getConfiguration().getSynapseTimer().
  158                                           schedule(aggregate, completionTimeoutMillis);
  159                               }
  160                               aggregate.getLock();
  161                               activeAggregates.put(correlateExpression.toString(), aggregate);
  162                           }
  163                       }
  164                   }
  165   
  166               } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION) != null) {
  167                   // if the correlattion cannot be found using the correlateExpression then
  168                   // try the default which is through the AGGREGATE_CORRELATION message property
  169                   // which is the unique original message id of a split or iterate operation and
  170                   // which thus can be used to uniquely group messages into aggregates
  171   
  172                   Object o = synCtx.getProperty(EIPConstants.AGGREGATE_CORRELATION);
  173                   String correlation;
  174   
  175                   if (o != null && o instanceof String) {
  176                       correlation = (String) o;
  177                       while (aggregate == null) {
  178                           synchronized (lock) {
  179                               if (activeAggregates.containsKey(correlation)) {
  180                                   aggregate = activeAggregates.get(correlation);
  181                                   if (aggregate != null) {
  182                                       if (!aggregate.getLock()) {
  183                                           aggregate = null;
  184                                       }
  185                                   } else {
  186                                       break;
  187                                   }
  188                               } else {
  189                                   if (traceOrDebugOn) {
  190                                       traceOrDebug(traceOn, "Creating new Aggregator - " +
  191                                               (completionTimeoutMillis > 0 ? "expires in : "
  192                                                       + (completionTimeoutMillis / 1000) + "secs" :
  193                                                       "without expiry time"));
  194                                   }
  195                           
  196                                   aggregate = new Aggregate(
  197                                           correlation,
  198                                           completionTimeoutMillis,
  199                                           minMessagesToComplete,
  200                                           maxMessagesToComplete, this);
  201   
  202                                   if (completionTimeoutMillis > 0) {
  203                                       synCtx.getConfiguration().getSynapseTimer().
  204                                               schedule(aggregate, completionTimeoutMillis);
  205                                   }
  206                                   aggregate.getLock();
  207                                   activeAggregates.put(correlation, aggregate);
  208                               }
  209                           }
  210                       }
  211                       
  212                   } else {
  213                       if (traceOrDebugOn) {
  214                           traceOrDebug(traceOn, "Unable to find aggrgation correlation property");
  215                       }
  216                       return true;
  217                   }
  218               } else {
  219                   if (traceOrDebugOn) {
  220                       traceOrDebug(traceOn,
  221                               "Unable to find aggrgation correlation XPath or property");
  222                   }
  223                   return true;
  224               }
  225   
  226               // if there is an aggregate continue on aggregation
  227               if (aggregate != null) {
  228                   boolean collected = aggregate.addMessage(synCtx);
  229                   if (traceOrDebugOn) {
  230                       if (collected) {
  231                           traceOrDebug(traceOn, "Collected a message during aggregation");
  232                           if (traceOn && trace.isTraceEnabled()) {
  233                               trace.trace("Collected message : " + synCtx);
  234                           }
  235                       }
  236                   }
  237                   
  238                   // check the completeness of the aggregate and if completed aggregate the messages
  239                   // if not completed return false and block the message sequence till it completes
  240   
  241                   if (aggregate.isComplete(traceOn, traceOrDebugOn, trace, log)) {
  242                       if (traceOrDebugOn) {
  243                           traceOrDebug(traceOn, "Aggregation completed - invoking onComplete");
  244                       }
  245                       completeAggregate(aggregate);
  246                       
  247                       if (traceOrDebugOn) {
  248                           traceOrDebug(traceOn, "End : Aggregate mediator");
  249                       }
  250                       return true;
  251                   } else {
  252                       aggregate.releaseLock();
  253                   }
  254   
  255               } else {
  256                   // if the aggregation correlation cannot be found then continue the message on the
  257                   // normal path by returning true
  258   
  259                   if (traceOrDebugOn) {
  260                       traceOrDebug(traceOn, "Unable to find an aggregate for this message - skip");
  261                   }
  262                   return true;
  263               }
  264   
  265           } catch (JaxenException e) {
  266               handleException("Unable to execute the XPATH over the message", e, synCtx);
  267           }
  268   
  269           if (traceOrDebugOn) {
  270               traceOrDebug(traceOn, "End : Aggregate mediator");
  271           }
  272   
  273           return false;
  274       }
  275   
  276       /**
  277        * Invoked by the Aggregate objects that are timed out, to signal timeout/completion of
  278        * itself
  279        * @param aggregate the timed out Aggregate that holds collected messages and properties
  280        */
  281       public synchronized void completeAggregate(Aggregate aggregate) {
  282   
  283           if (log.isDebugEnabled()) {
  284               log.debug("Aggregation completed or timed out");
  285           }
  286   
  287           // cancel the timer
  288           aggregate.cancel();
  289           aggregate.setCompleted(true);
  290   
  291           MessageContext newSynCtx = getAggregatedMessage(aggregate);
  292           if (newSynCtx == null) {
  293               log.warn("An aggregation of messages timed out with no aggregated messages", null);
  294               return;
  295           }
  296   
  297           activeAggregates.remove(aggregate.getCorrelation());
  298   
  299           if ((correlateExpression != null &&
  300               !correlateExpression.toString().equals(aggregate.getCorrelation())) ||
  301               correlateExpression == null) {
  302   
  303               if (onCompleteSequence != null) {
  304                   onCompleteSequence.mediate(newSynCtx);
  305   
  306               } else if (onCompleteSequenceRef != null
  307                   && newSynCtx.getSequence(onCompleteSequenceRef) != null) {
  308                   newSynCtx.getSequence(onCompleteSequenceRef).mediate(newSynCtx);
  309   
  310               } else {
  311                   handleException("Unable to find the sequence for the mediation " +
  312                       "of the aggregated message", newSynCtx);
  313               }
  314           }
  315       }
  316   
  317       /**
  318        * Get the aggregated message from the specified Aggregate instance
  319        *
  320        * @param aggregate the Aggregate object that holds collected messages and properties of the
  321        * aggregation
  322        * @return the aggregated message context
  323        */
  324       private MessageContext getAggregatedMessage(Aggregate aggregate) {
  325   
  326           MessageContext newCtx = null;
  327   
  328           for (MessageContext synCtx : aggregate.getMessages()) {
  329               
  330               if (newCtx == null) {
  331                   newCtx = synCtx;
  332   
  333                   if (log.isDebugEnabled()) {
  334                       log.debug("Generating Aggregated message from : " + newCtx.getEnvelope());
  335                   }
  336   
  337               } else {
  338                   try {
  339                       if (log.isDebugEnabled()) {
  340                           log.debug("Merging message : " + synCtx.getEnvelope() + " using XPath : " +
  341                                   aggregationExpression);
  342                       }
  343   
  344                       EIPUtils.enrichEnvelope(
  345                               newCtx.getEnvelope(), synCtx.getEnvelope(), aggregationExpression);
  346   
  347                       if (log.isDebugEnabled()) {
  348                           log.debug("Merged result : " + newCtx.getEnvelope());
  349                       }
  350   
  351                   } catch (JaxenException e) {
  352                       handleException("Error merging aggregation results using XPath : " +
  353                               aggregationExpression.toString(), e, synCtx);
  354                   }
  355               }
  356           }
  357           return newCtx;
  358       }
  359   
  360       public SynapseXPath getCorrelateExpression() {
  361           return correlateExpression;
  362       }
  363   
  364       public void setCorrelateExpression(SynapseXPath correlateExpression) {
  365           this.correlateExpression = correlateExpression;
  366       }
  367   
  368       public long getCompletionTimeoutMillis() {
  369           return completionTimeoutMillis;
  370       }
  371   
  372       public void setCompletionTimeoutMillis(long completionTimeoutMillis) {
  373           this.completionTimeoutMillis = completionTimeoutMillis;
  374       }
  375   
  376       public int getMinMessagesToComplete() {
  377           return minMessagesToComplete;
  378       }
  379   
  380       public void setMinMessagesToComplete(int minMessagesToComplete) {
  381           this.minMessagesToComplete = minMessagesToComplete;
  382       }
  383   
  384       public int getMaxMessagesToComplete() {
  385           return maxMessagesToComplete;
  386       }
  387   
  388       public void setMaxMessagesToComplete(int maxMessagesToComplete) {
  389           this.maxMessagesToComplete = maxMessagesToComplete;
  390       }
  391   
  392       public SynapseXPath getAggregationExpression() {
  393           return aggregationExpression;
  394       }
  395   
  396       public void setAggregationExpression(SynapseXPath aggregationExpression) {
  397           this.aggregationExpression = aggregationExpression;
  398       }
  399   
  400       public String getOnCompleteSequenceRef() {
  401           return onCompleteSequenceRef;
  402       }
  403   
  404       public void setOnCompleteSequenceRef(String onCompleteSequenceRef) {
  405           this.onCompleteSequenceRef = onCompleteSequenceRef;
  406       }
  407   
  408       public SequenceMediator getOnCompleteSequence() {
  409           return onCompleteSequence;
  410       }
  411   
  412       public void setOnCompleteSequence(SequenceMediator onCompleteSequence) {
  413           this.onCompleteSequence = onCompleteSequence;
  414       }
  415   
  416       public Map getActiveAggregates() {
  417           return activeAggregates;
  418       }
  419   }

Home » synapse-1.2-src » org.apache.synapse.mediators.eip.aggregator » [javadoc | source]