Home » synapse-1.2-src » org.apache.synapse.mediators.throttle » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   package org.apache.synapse.mediators.throttle;
   20   
   21   import org.apache.axiom.om.OMElement;
   22   import org.apache.neethi.PolicyEngine;
   23   import org.apache.synapse.Mediator;
   24   import org.apache.synapse.MessageContext;
   25   import org.apache.synapse.transport.nhttp.NhttpConstants;
   26   import org.apache.synapse.config.Entry;
   27   import org.apache.synapse.core.axis2.Axis2MessageContext;
   28   import org.apache.synapse.mediators.AbstractMediator;
   29   import org.apache.axis2.context.ConfigurationContext;
   30   import org.apache.axis2.clustering.context.Replicator;
   31   import org.apache.axis2.clustering.ClusteringFault;
   32   import org.apache.axis2.clustering.ClusterManager;
   33   import org.wso2.throttle;
   34   
   35   
   36   /**
   37    * The Mediator for the throttling - Throtting will occur according to the ws-policy
   38    * which is specified as the key for lookup from the registry or the inline policy
   39    * Only support IP based throttling- Throotling can manage per IP using the throttle policy
   40    */
   41   
   42   public class ThrottleMediator extends AbstractMediator {
   43   
   44       /* The key for getting the throttling policy - key refers to a/an [registry] entry    */
   45       private String policyKey = null;
   46       /* InLine policy object - XML  */
   47       private OMElement inLinePolicy = null;
   48       /* The reference to the sequence which will execute when access is denied   */
   49       private String onRejectSeqKey = null;
   50       /* The in-line sequence which will execute when access is denied */
   51       private Mediator onRejectMediator = null;
   52       /* The reference to the sequence which will execute when access is allowed  */
   53       private String onAcceptSeqKey = null;
   54       /* The in-line sequence which will execute when access is allowed */
   55       private Mediator onAcceptMediator = null;
   56       /* The concurrect access control group id */
   57       private String id;
   58       /* Access rate controller - limit the remote caller access*/
   59       private AccessRateController accessControler;
   60       /* ConcurrentAccessController - limit the remote calleres concurrent access */
   61       private ConcurrentAccessController concurrentAccessController = null;
   62       /* The property key that used when the ConcurrentAccessController
   63          look up from ConfigurationContext */
   64       private String key;
   65       /* Is this env. support clustering*/
   66       private boolean isClusteringEnable = false;
   67       /* The Throttle object - holds all runtime and configuration data */
   68       private Throttle throttle;
   69       /* Lock used to ensure thread-safe creation of the throttle */
   70       private final Object throttleLock = new Object();
   71   
   72       public ThrottleMediator() {
   73           this.accessControler = new AccessRateController();
   74       }
   75   
   76       public boolean mediate(MessageContext synCtx) {
   77   
   78           boolean traceOn = isTraceOn(synCtx);
   79           boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
   80           boolean isResponse = synCtx.isResponse();
   81           ConfigurationContext cc;
   82           org.apache.axis2.context.MessageContext axisMC;
   83   
   84           if (traceOrDebugOn) {
   85               traceOrDebug(traceOn, "Start : Throttle mediator");
   86   
   87               if (traceOn && trace.isTraceEnabled()) {
   88                   trace.trace("Message : " + synCtx.getEnvelope());
   89               }
   90           }
   91           // To ensure the creation of throttle is thread safe ? It is possible create same throttle
   92           // object multiple times  by multiple threads.
   93   
   94           synchronized (throttleLock) {
   95   
   96               // get Axis2 MessageContext and ConfigurationContext
   97               axisMC = ((Axis2MessageContext) synCtx).getAxis2MessageContext();
   98               cc = axisMC.getConfigurationContext();
   99   
  100               //To ensure check for clustering environment only happens one time
  101               if ((throttle == null && !isResponse) || (isResponse
  102                   && concurrentAccessController == null)) {
  103                   ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
  104                   if (clusterManager != null &&
  105                       clusterManager.getContextManager() != null) {
  106                       isClusteringEnable = true;
  107                   }
  108               }
  109   
  110               // Throttle only will be created ,if the massage flow is IN
  111               if (!isResponse) {
  112                   //check the availability of the ConcurrentAccessControler
  113                   //if this is a clustered environment
  114                   if (isClusteringEnable) {
  115                       concurrentAccessController =
  116                           (ConcurrentAccessController) cc.getProperty(key);
  117                   }
  118                   // for request messages, read the policy for throttling and initialize
  119                   if (inLinePolicy != null) {
  120                       // this uses a static policy
  121                       if (throttle == null) {  // only one time creation
  122   
  123                           if (traceOn && trace.isTraceEnabled()) {
  124                               trace.trace("Initializing using static throttling policy : "
  125                                   + inLinePolicy);
  126                           }
  127                           try {
  128                               // process the policy
  129                               throttle = ThrottlePolicyProcessor.processPolicy(
  130                                   PolicyEngine.getPolicy(inLinePolicy));
  131   
  132                               //At this point concurrent access controller definitely 'null'
  133                               // f the clustering is disable.
  134                               //For a clustered environment,it is 'null' ,
  135                               //if this is the first instance on the cluster ,
  136                               // that message mediation has occurred through this mediator.
  137                               if (throttle != null && concurrentAccessController == null) {
  138                                   concurrentAccessController =
  139                                       throttle.getConcurrentAccessController();
  140                                   if (concurrentAccessController != null) {
  141                                       cc.setProperty(key, concurrentAccessController);
  142                                   }
  143                               }
  144                           } catch (ThrottleException e) {
  145                               handleException("Error processing the throttling policy", e, synCtx);
  146                           }
  147                       }
  148   
  149                   } else if (policyKey != null) {
  150   
  151                       // If the policy has specified as a registry key.
  152                       // load or re-load policy from registry or local entry if not already available
  153   
  154                       Entry entry = synCtx.getConfiguration().getEntryDefinition(policyKey);
  155                       if (entry == null) {
  156                           handleException("Cannot find throttling policy using key : "
  157                               + policyKey, synCtx);
  158   
  159                       } else {
  160                           boolean reCreate = false;
  161                           // if the key refers to a dynamic resource
  162                           if (entry.isDynamic()) {
  163                               if (!entry.isCached() || entry.isExpired()) {
  164                                   reCreate = true;
  165                               }
  166                           }
  167                           if (reCreate || throttle == null) {
  168                               Object entryValue = synCtx.getEntry(policyKey);
  169                               if (entryValue == null) {
  170                                   handleException(
  171                                       "Null throttling policy returned by Entry : "
  172                                           + policyKey, synCtx);
  173   
  174                               } else {
  175                                   if (!(entryValue instanceof OMElement)) {
  176                                       handleException("Policy returned from key : " + policyKey +
  177                                           " is not an OMElement", synCtx);
  178   
  179                                   } else {
  180                                       //Check for reload in a cluster environment ?
  181                                       // For clustered environment ,if the concurrent access controller
  182                                       // is not null and throttle is not null , then must reload.
  183                                       if (isClusteringEnable && concurrentAccessController != null
  184                                           && throttle != null) {
  185                                           concurrentAccessController = null; // set null ,
  186                                           // because need reload
  187                                       }
  188   
  189                                       try {
  190                                           // Creates the throttle from the policy
  191                                           throttle = ThrottlePolicyProcessor.processPolicy(
  192                                               PolicyEngine.getPolicy((OMElement) entryValue));
  193   
  194                                           //For non-clustered  environment , must re-initiates
  195                                           //For  clustered  environment,
  196                                           //concurrent access controller is null ,
  197                                           //then must re-initiates
  198                                           if (throttle != null && (concurrentAccessController == null
  199                                               || !isClusteringEnable)) {
  200                                               concurrentAccessController =
  201                                                   throttle.getConcurrentAccessController();
  202                                               if (concurrentAccessController != null) {
  203                                                   cc.setProperty(key, concurrentAccessController);
  204                                               } else {
  205                                                   cc.removeProperty(key);
  206                                               }
  207                                           }
  208                                       } catch (ThrottleException e) {
  209                                           handleException("Error processing the throttling policy",
  210                                               e, synCtx);
  211                                       }
  212                                   }
  213                               }
  214                           }
  215                       }
  216                   }
  217               } else {
  218                   // if the message flow path is OUT , then must lookp from ConfigurationContext -
  219                   // never create ,just get the existing one
  220                   concurrentAccessController =
  221                       (ConcurrentAccessController) cc.getProperty(key);
  222               }
  223           }
  224           //perform concurrency throttling
  225           boolean canAccess = doThrottleByConcurrency(isResponse, traceOrDebugOn, traceOn);
  226   
  227           //if the access is success through concurrency throttle and if this is a request message
  228           //then do access rate based throttling
  229           if (throttle != null && !isResponse && canAccess) {
  230               canAccess = throttleByAccessRate(synCtx, axisMC, cc, traceOrDebugOn, traceOn);
  231           }
  232           // all the replication functionality of the access rate based throttling handles by itself 
  233           // Just replicate the current state of ConcurrentAccessController
  234           if (isClusteringEnable && concurrentAccessController != null) {
  235               if (cc != null) {
  236                   try {
  237                       if (traceOrDebugOn) {
  238                           traceOrDebug(traceOn, "Going to replicates the  " +
  239                               "states of the ConcurrentAccessController with key : " + key);
  240                       }
  241                       Replicator.replicate(cc);
  242                   } catch (ClusteringFault clusteringFault) {
  243                       handleException("Error during the replicating  states ",
  244                           clusteringFault, synCtx);
  245                   }
  246               }
  247           }
  248           if (canAccess) {
  249               if (onAcceptSeqKey != null) {
  250                   Mediator mediator = synCtx.getSequence(onAcceptSeqKey);
  251                   if (mediator != null) {
  252                       return mediator.mediate(synCtx);
  253                   } else {
  254                       handleException("Unable to find onAccept sequence with key : "
  255                           + onAcceptSeqKey, synCtx);
  256                   }
  257               } else if (onAcceptMediator != null) {
  258                   return onAcceptMediator.mediate(synCtx);
  259               } else {
  260                   return true;
  261               }
  262   
  263           } else {
  264               if (onRejectSeqKey != null) {
  265                   Mediator mediator = synCtx.getSequence(onRejectSeqKey);
  266                   if (mediator != null) {
  267                       return mediator.mediate(synCtx);
  268                   } else {
  269                       handleException("Unable to find onReject sequence with key : "
  270                           + onRejectSeqKey, synCtx);
  271                   }
  272               } else if (onRejectMediator != null) {
  273                   return onRejectMediator.mediate(synCtx);
  274               } else {
  275                   return false;
  276               }
  277           }
  278   
  279           if (traceOrDebugOn) {
  280               traceOrDebug(traceOn, "End : Throttle mediator");
  281           }
  282           return canAccess;
  283       }
  284   
  285       /**
  286        * Helper method that handles the concurrent access through throttle
  287        *
  288        * @param isResponse     Current Message is response or not
  289        * @param traceOrDebugOn is trace or debug on?
  290        * @param traceOn        is trace on?
  291        * @return true if the caller can access ,o.w. false
  292        */
  293       private boolean doThrottleByConcurrency(boolean isResponse, boolean traceOrDebugOn, boolean traceOn) {
  294           boolean canAcess = true;
  295           if (concurrentAccessController != null) {
  296               // do the concurrecy throttling
  297               int concurrentLimit = concurrentAccessController.getLimit();
  298               if (traceOrDebugOn) {
  299                   traceOrDebug(traceOn, "Concurrent access controller for ID : " + id +
  300                       " allows : " + concurrentLimit + " concurrent accesses");
  301               }
  302               int available;
  303               if (!isResponse) {
  304                   available = concurrentAccessController.getAndDecrement();
  305                   canAcess = available > 0;
  306                   if (traceOrDebugOn) {
  307                       traceOrDebug(traceOn, "Concurrency Throttle : Access " +
  308                           (canAcess ? "allowed" : "denied") + " :: " + available
  309                           + " of available of " + concurrentLimit + " connections");
  310                   }
  311               } else {
  312                   available = concurrentAccessController.incrementAndGet();
  313                   if (traceOrDebugOn) {
  314                       traceOrDebug(traceOn, "Concurrency Throttle : Connection returned" + " :: " +
  315                           available + " of available of " + concurrentLimit + " connections");
  316                   }
  317               }
  318           }
  319           return canAcess;
  320       }
  321   
  322       /**
  323        * Helper method that handles the access-rate based throttling
  324        *
  325        * @param synCtx         MessageContext(Synapse)
  326        * @param axisMC         MessageContext(Axis2)
  327        * @param cc             ConfigurationContext
  328        * @param traceOrDebugOn is trace or debug on?
  329        * @param traceOn        is trace on?
  330        * @return ue if the caller can access ,o.w. false
  331        */
  332       private boolean throttleByAccessRate(MessageContext synCtx, org.apache.axis2.context.MessageContext axisMC, ConfigurationContext cc, boolean traceOrDebugOn, boolean traceOn) {
  333   
  334           String callerId = null;
  335           boolean canAccess = true;
  336           //remote ip of the caller
  337           String remoteIP = (String) axisMC.getPropertyNonReplicable(
  338               org.apache.axis2.context.MessageContext.REMOTE_ADDR);
  339           //domain name of the caller
  340           String domainName = (String) axisMC.getPropertyNonReplicable(NhttpConstants.REMOTE_HOST);
  341   
  342           //Using remote caller domain name , If there is a throttle configuration for
  343           // this domain name ,then throttling will occur according to that configuration
  344           if (domainName != null) {
  345               // do the domain based throttling
  346               if (traceOrDebugOn) {
  347                   traceOrDebug(traceOn, "The Domain Name of the caller is :" + domainName);
  348               }
  349               // loads the DomainBasedThrottleContext
  350               ThrottleContext context
  351                   = throttle.getThrottleContext(ThrottleConstants.DOMAIN_BASED_THROTTLE_KEY);
  352               if (context != null) {
  353                   //loads the DomainBasedThrottleConfiguration
  354                   ThrottleConfiguration config = context.getThrottleConfiguration();
  355                   if (config != null) {
  356                       //checks the availability of a policy configuration for  this domain name
  357                       callerId = config.getConfigurationKeyOfCaller(domainName);
  358                       if (callerId != null) {  // there is configuration for this domain name
  359   
  360                           //If this is a clusterred env.
  361                           if (isClusteringEnable) {
  362                               context.setConfigurationContext(cc);
  363                               context.setThrottleId(id);
  364                           }
  365   
  366                           try {
  367                               //Checks for access state
  368                               canAccess = accessControler.canAccess(context,
  369                                   callerId, ThrottleConstants.DOMAIN_BASE);
  370   
  371                               if (traceOrDebugOn) {
  372                                   traceOrDebug(traceOn, "Access " + (canAccess ? "allowed" : "denied")
  373                                       + " for Domain Name : " + domainName);
  374                               }
  375   
  376                               //In the case of both of concurrency throttling and
  377                               //rate based throttling have enabled ,
  378                               //if the access rate less than maximum concurrent access ,
  379                               //then it is possible to occur death situation.To avoid that reset,
  380                               //if the access has denied by rate based throttling
  381                               if (!canAccess && concurrentAccessController != null) {
  382                                   concurrentAccessController.incrementAndGet();
  383                                   if (isClusteringEnable) {
  384                                       cc.setProperty(key, concurrentAccessController);
  385                                   }
  386                               }
  387                           } catch (ThrottleException e) {
  388                               handleException("Error occurd during throttling", e, synCtx);
  389                           }
  390                       }
  391                   }
  392               }
  393           } else {
  394               if (traceOrDebugOn) {
  395                   traceOrDebug(traceOn, "The Domain name of the caller cannot be found");
  396               }
  397           }
  398   
  399           //At this point , any configuration for the remote caller hasn't found ,
  400           //therefore trying to find a configuration policy based on remote caller ip
  401           if (callerId == null) {
  402               //do the IP-based throttling
  403               if (remoteIP == null) {
  404                   if (traceOrDebugOn) {
  405                       traceOrDebug(traceOn, "The IP address of the caller cannot be found");
  406                   }
  407                   canAccess = true;
  408   
  409               } else {
  410                   if (traceOrDebugOn) {
  411                       traceOrDebug(traceOn, "The IP Address of the caller is :" + remoteIP);
  412                   }
  413                   try {
  414                       // Loads the IPBasedThrottleContext
  415                       ThrottleContext context =
  416                           throttle.getThrottleContext(ThrottleConstants.IP_BASED_THROTTLE_KEY);
  417                       if (context != null) {
  418                           //Loads the IPBasedThrottleConfiguration
  419                           ThrottleConfiguration config = context.getThrottleConfiguration();
  420                           if (config != null) {
  421                               //Checks the availability of a policy configuration for  this ip
  422                               callerId = config.getConfigurationKeyOfCaller(remoteIP);
  423                               if (callerId != null) {   // there is configuration for this ip
  424   
  425                                   //For clustered env.
  426                                   if (isClusteringEnable) {
  427                                       context.setConfigurationContext(cc);
  428                                       context.setThrottleId(id);
  429                                   }
  430                                   //Checks access state
  431                                   canAccess = accessControler.canAccess(context,
  432                                       callerId, ThrottleConstants.IP_BASE);
  433   
  434                                   if (traceOrDebugOn) {
  435                                       traceOrDebug(traceOn, "Access " +
  436                                           (canAccess ? "allowed" : "denied")
  437                                           + " for IP : " + remoteIP);
  438                                   }
  439                                   //In the case of both of concurrency throttling and
  440                                   //rate based throttling have enabled ,
  441                                   //if the access rate less than maximum concurrent access ,
  442                                   //then it is possible to occur death situation.To avoid that reset,
  443                                   //if the access has denied by rate based throttling
  444                                   if (!canAccess && concurrentAccessController != null) {
  445                                       concurrentAccessController.incrementAndGet();
  446                                       if (isClusteringEnable) {
  447                                           cc.setProperty(key, concurrentAccessController);
  448                                       }
  449                                   }
  450                               }
  451                           }
  452                       }
  453                   } catch (ThrottleException e) {
  454                       handleException("Error occurd during throttling", e, synCtx);
  455                   }
  456               }
  457           }
  458           return canAccess;
  459       }
  460   
  461       public String getType() {
  462           return ThrottleMediator.class.getName();
  463       }
  464   
  465       /**
  466        * To get the policy key - The key for which will used to lookup policy from the registry
  467        *
  468        * @return String
  469        */
  470   
  471       public String getPolicyKey() {
  472           return policyKey;
  473       }
  474   
  475       /**
  476        * To set the policy key - The key for which lookup from the registry
  477        *
  478        * @param policyKey Key for picking policy from the registry
  479        */
  480       public void setPolicyKey(String policyKey) {
  481           this.policyKey = policyKey;
  482       }
  483   
  484       /**
  485        * getting throttle policy which has defined as InLineXML
  486        *
  487        * @return InLine Throttle Policy
  488        */
  489       public OMElement getInLinePolicy() {
  490           return inLinePolicy;
  491       }
  492   
  493       /**
  494        * setting throttle policy which has defined as InLineXML
  495        *
  496        * @param inLinePolicy Inline policy
  497        */
  498       public void setInLinePolicy(OMElement inLinePolicy) {
  499           this.inLinePolicy = inLinePolicy;
  500       }
  501   
  502       public String getOnRejectSeqKey() {
  503           return onRejectSeqKey;
  504       }
  505   
  506       public void setOnRejectSeqKey(String onRejectSeqKey) {
  507           this.onRejectSeqKey = onRejectSeqKey;
  508       }
  509   
  510       public Mediator getOnRejectMediator() {
  511           return onRejectMediator;
  512       }
  513   
  514       public void setOnRejectMediator(Mediator onRejectMediator) {
  515           this.onRejectMediator = onRejectMediator;
  516       }
  517   
  518       public String getOnAcceptSeqKey() {
  519           return onAcceptSeqKey;
  520       }
  521   
  522       public void setOnAcceptSeqKey(String onAcceptSeqKey) {
  523           this.onAcceptSeqKey = onAcceptSeqKey;
  524       }
  525   
  526       public Mediator getOnAcceptMediator() {
  527           return onAcceptMediator;
  528       }
  529   
  530       public void setOnAcceptMediator(Mediator onAcceptMediator) {
  531           this.onAcceptMediator = onAcceptMediator;
  532       }
  533   
  534       public String getId() {
  535           return id;
  536       }
  537   
  538       public void setId(String id) {
  539           this.id = id;
  540           this.key = ThrottleConstants.THROTTLE_PROPERTY_PREFIX + id + ThrottleConstants.CAC_SUFFIX;
  541       }
  542   }

Home » synapse-1.2-src » org.apache.synapse.mediators.throttle » [javadoc | source]