Home » synapse-1.2-src » org.apache.synapse.mediators.eip.aggregator » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.mediators.eip.aggregator;
   21   
   22   import org.apache.synapse.MessageContext;
   23   import org.apache.synapse.SynapseConstants;
   24   import org.apache.synapse.mediators.eip.EIPConstants;
   25   import org.apache.commons.logging.Log;
   26   import org.apache.commons.logging.LogFactory;
   27   
   28   import java.util.List;
   29   import java.util.ArrayList;
   30   import java.util.TimerTask;
   31   
   32   /**
   33    * An instance of this class is created to manage each aggregation group, and it holds
   34    * the aggregation properties and the messages collected during aggregation. This class also
   35    * times out itself after the timeout expires it
   36    */
   37   public class Aggregate extends TimerTask {
   38   
   39       private static final Log log = LogFactory.getLog(Aggregate.class);
   40       private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
   41   
   42       private long timeoutMillis = 0;
   43       /** The time in millis at which this aggregation should be considered as expired */
   44       private long expiryTimeMillis = 0;
   45       /** The minimum number of messages to be collected to consider this aggregation as complete */
   46       private int minCount = -1;
   47       /** The maximum number of messages that should be collected by this aggregation */
   48       private int maxCount = -1;
   49       private String correlation = null;
   50       /** The AggregateMediator that should be invoked on completion of the aggregation */
   51       private AggregateMediator aggregateMediator = null;
   52       private List<MessageContext> messages = new ArrayList<MessageContext>();
   53       private boolean locked = false;
   54       private boolean completed = false;
   55   
   56       /**
   57        * Save aggregation properties and timeout
   58        *
   59        * @param corelation representing the corelation name of the messages in the aggregate
   60        * @param timeoutMillis the timeout duration in milliseconds
   61        * @param min the minimum number of messages to be aggregated
   62        * @param max the maximum number of messages to be aggregated
   63        * @param mediator
   64        */
   65       public Aggregate(String corelation, long timeoutMillis, int min,
   66           int max, AggregateMediator mediator) {
   67           
   68           this.correlation = corelation;
   69           if (timeoutMillis > 0) {
   70               expiryTimeMillis = System.currentTimeMillis() + timeoutMillis;
   71           }
   72           if (min > 0) {
   73               minCount = min;
   74           }
   75           if (max > 0) {
   76               maxCount = max;
   77           }
   78           this.aggregateMediator = mediator;
   79       }
   80   
   81       /**
   82        * Add a message to the interlan message list
   83        *
   84        * @param synCtx message to be added into this aggregation group
   85        * @return true if the message was added or false if not
   86        */
   87       public synchronized boolean addMessage(MessageContext synCtx) {
   88           if (maxCount <= 0 || (maxCount > 0 && messages.size() < maxCount)) {
   89               messages.add(synCtx);
   90               return true;
   91           } else {
   92               return false;
   93           }
   94       }
   95   
   96       /**
   97        * Has this aggregation group completed?
   98        *
   99        * @param traceOn is tracing on
  100        * @param traceOrDebugOn is trace or debug on
  101        * @param trace trace log to be used
  102        * @param log log to be used
  103        *
  104        * @return boolean true if aggregation is complete
  105        */
  106       public synchronized boolean isComplete(boolean traceOn, boolean traceOrDebugOn,
  107           Log trace, Log log) {
  108   
  109           if (!completed) {
  110   
  111               // if any messages have been collected, check if the completion criteria is met
  112               if (!messages.isEmpty()) {
  113   
  114                   // get total messages for this group, from the first message we have collected
  115                   MessageContext mc = messages.get(0);
  116                   Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
  117               
  118                   if (prop != null && prop instanceof String) {
  119                       String[] msgSequence = prop.toString().split(
  120                               EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
  121                       int total = Integer.parseInt(msgSequence[1]);
  122   
  123                       if (traceOrDebugOn) {
  124                           traceOrDebug(traceOn, trace, log, messages.size() +
  125                                   " messages of " + total + " collected in current aggregation");
  126                       }
  127   
  128                       if (messages.size() >= total) {
  129                           if (traceOrDebugOn) {
  130                               traceOrDebug(traceOn, trace, log, "Aggregation complete");
  131                           }
  132                           return true;
  133                       }
  134                   }
  135               } else {
  136                   if (traceOrDebugOn) {
  137                       traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
  138                   }
  139               }
  140   
  141               // if the minimum number of messages has been reached, its complete
  142               if (minCount > 0 && messages.size() >= minCount) {
  143                   if (traceOrDebugOn) {
  144                       traceOrDebug(traceOn, trace, log,
  145                               "Aggregation complete - the minimum : " + minCount
  146                                       + " messages has been reached");
  147                   }
  148                   return true;
  149               }
  150   
  151               if (maxCount > 0 && messages.size() >= maxCount) {
  152                   if (traceOrDebugOn) {
  153                       traceOrDebug(traceOn, trace, log,
  154                               "Aggregation complete - the maximum : " + maxCount
  155                                       + " messages has been reached");
  156                   }
  157   
  158                   return true;
  159               }
  160   
  161               // else, has this aggregation reached its timeout?
  162               if (expiryTimeMillis > 0 && System.currentTimeMillis() >= expiryTimeMillis) {
  163                   if (traceOrDebugOn) {
  164                       traceOrDebug(traceOn, trace, log,
  165                               "Aggregation complete - the aggregation has timed out");
  166                   }
  167   
  168                   return true;
  169               }
  170           } else {
  171               if (traceOrDebugOn) {
  172                   traceOrDebug(traceOn, trace, log,
  173                           "Aggregation already completed - this message will not be processed in aggregation");
  174               }
  175           }
  176           
  177           return false;
  178       }
  179   
  180       private void traceOrDebug(boolean traceOn, Log trace, Log log, String msg) {
  181           if (traceOn) {
  182               trace.info(msg);
  183           }
  184           if (log.isDebugEnabled()) {
  185               log.debug(msg);
  186           }
  187       }
  188   
  189       public long getTimeoutMillis() {
  190           return timeoutMillis;
  191       }
  192   
  193       public void setTimeoutMillis(long timeoutMillis) {
  194           this.timeoutMillis = timeoutMillis;
  195       }
  196   
  197       public int getMinCount() {
  198           return minCount;
  199       }
  200   
  201       public void setMinCount(int minCount) {
  202           this.minCount = minCount;
  203       }
  204   
  205       public int getMaxCount() {
  206           return maxCount;
  207       }
  208   
  209       public void setMaxCount(int maxCount) {
  210           this.maxCount = maxCount;
  211       }
  212   
  213       public String getCorrelation() {
  214           return correlation;
  215       }
  216   
  217       public void setCorrelation(String correlation) {
  218           this.correlation = correlation;
  219       }
  220   
  221       public List<MessageContext> getMessages() {
  222           return messages;
  223       }
  224   
  225       public void setMessages(List<MessageContext> messages) {
  226           this.messages = messages;
  227       }
  228   
  229       public long getExpiryTimeMillis() {
  230           return expiryTimeMillis;
  231       }
  232   
  233       public void setExpiryTimeMillis(long expiryTimeMillis) {
  234           this.expiryTimeMillis = expiryTimeMillis;
  235       }
  236   
  237       public void run() {
  238           while (true) {
  239               if (completed) {
  240                   break;
  241               }
  242               if (getLock()) {
  243                   if (log.isDebugEnabled()) {
  244                       log.debug("Time : " + System.currentTimeMillis() + " and this aggregator " +
  245                               "expired at : " + expiryTimeMillis);
  246                   }
  247                   aggregateMediator.completeAggregate(this);
  248                   break;
  249               }
  250           }
  251       }
  252   
  253       public synchronized boolean getLock() {
  254           return !locked;
  255       }
  256   
  257       public void releaseLock() {
  258           locked = false;
  259       }
  260   
  261       public boolean isCompleted() {
  262           return completed;
  263       }
  264   
  265       public void setCompleted(boolean completed) {
  266           this.completed = completed;
  267       }
  268   }

Home » synapse-1.2-src » org.apache.synapse.mediators.eip.aggregator » [javadoc | source]