Home » synapse-1.2-src » org.apache.synapse.endpoints » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.endpoints;
   21   
   22   import org.apache.axis2.clustering.ClusterManager;
   23   import org.apache.axis2.context.ConfigurationContext;
   24   import org.apache.axis2.context.OperationContext;
   25   import org.apache.commons.logging.Log;
   26   import org.apache.commons.logging.LogFactory;
   27   import org.apache.synapse.FaultHandler;
   28   import org.apache.synapse.MessageContext;
   29   import org.apache.synapse.SynapseConstants;
   30   import org.apache.synapse.core.axis2.Axis2MessageContext;
   31   import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
   32   import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
   33   import org.apache.synapse.endpoints.dispatch.Dispatcher;
   34   import org.apache.synapse.endpoints.dispatch.DispatcherContext;
   35   
   36   import java.util.ArrayList;
   37   import java.util.List;
   38   
   39   /**
   40    * SALoadbalanceEndpoint supports session affinity based load balancing. Each of this endpoint
   41    * maintains a list of dispatchers. These dispatchers will be updated for both request (for client
   42    * initiated sessions) and response (for server initiated sessions). Once updated, each dispatcher
   43    * will check if has already encountered that session. If not, it will update the
   44    * session -> endpoint map. To update sessions for response messages, all SALoadbalanceEndpoint
   45    * objects are kept in a global property. When a message passes through SALoadbalanceEndpoints, each
   46    * endpoint appends its "Synapse unique ID" to the operation context. Once the response for that
   47    * message arrives, response sender checks first endpoint of the endpoint sequence from the
   48    * operation context and get that endpoint from the above mentioned global property. Then it will
   49    * invoke updateSession(...) method of that endpoint. After that, each endpoint will call
   50    * updateSession(...) method of their appropriate child endpoint, so that all the sending endpoints
   51    * for the session will be updated.
   52    * <p/>
   53    * This endpoint gets the target endpoint first from the dispatch manager, which will ask all listed
   54    * dispatchers for a matching session. If a matching session is found it will just invoke the
   55    * send(...) method of that endpoint. If not it will find an endpoint using the load balancing
   56    * policy and send to that endpoint.
   57    */
   58   public class SALoadbalanceEndpoint implements Endpoint {
   59   
   60       private static final Log log = LogFactory.getLog(SALoadbalanceEndpoint.class);
   61   
   62       private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
   63       public static final String ENDPOINT_LIST = "endpointList";
   64       public static final String ROOT_ENDPOINT = "rootendpoint";
   65       public static final String ENDPOINT_NAME_LIST = "endpointNameList";
   66       public static final String WARN_MESSAGE = "In a clustering environment, the endpoint " +
   67               "name should be specified even for anonymous endpoints. Otherwise the clustering " +
   68               "would not function properly, if there are more than one anonymous endpoints.";
   69   
   70       /**
   71        * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
   72        * of indirect endpoints.
   73        */
   74       private String name = null;
   75   
   76       /**
   77        * List of endpoints among which the load is distributed. Any object implementing the Endpoint
   78        * interface could be used.
   79        */
   80       private List<Endpoint> endpoints = null;
   81   
   82       /**
   83        * Algorithm used for selecting the next endpoint to direct the first request of sessions.
   84        * Default is RoundRobin.
   85        */
   86       private LoadbalanceAlgorithm algorithm = null;
   87   
   88       /**
   89        * Parent endpoint of this endpoint if this used inside another endpoint. Although any endpoint
   90        * can be the parent, only SALoadbalanceEndpoint should be used here. Use of any other endpoint
   91        * would invalidate the session.
   92        */
   93       private Endpoint parentEndpoint = null;
   94   
   95       /**
   96        * Dispatcher used for session affinity.
   97        */
   98       private Dispatcher dispatcher = null;
   99   
  100       /**
  101        * The dispatcher context, place holder for keeping any runtime states that are used when
  102        * finding endpoint for the session
  103        */
  104       private final DispatcherContext dispatcherContext = new DispatcherContext();
  105   
  106       /**
  107        * The endpoint context, place holder for keeping any runtime states related to the endpoint
  108        */
  109       private final EndpointContext endpointContext = new EndpointContext();
  110   
  111       /**
  112        * The algorithm context, place holder for keeping any runtime states related to the load
  113        * balance algorithm
  114        */
  115       private final AlgorithmContext algorithmContext = new AlgorithmContext();
  116   
  117   
  118       public void send(MessageContext synMessageContext) {
  119   
  120           if (log.isDebugEnabled()) {
  121               log.debug("Start : Session Affinity Load-balance Endpoint " + name);
  122           }
  123   
  124           boolean isClusteringEnable = false;
  125           // get Axis2 MessageContext and ConfigurationContext
  126           org.apache.axis2.context.MessageContext axisMC =
  127                   ((Axis2MessageContext) synMessageContext).getAxis2MessageContext();
  128           ConfigurationContext cc = axisMC.getConfigurationContext();
  129   
  130           //The check for clustering environment
  131           ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
  132           if (clusterManager != null &&
  133                   clusterManager.getContextManager() != null) {
  134               isClusteringEnable = true;
  135           }
  136   
  137           String endpointName = this.getName();
  138           if (endpointName == null) {
  139               if (isClusteringEnable) {
  140                   log.warn(WARN_MESSAGE);
  141               }
  142               if (log.isDebugEnabled()) {
  143                   log.debug("Using the name for the anonymous endpoint as : '"
  144                           + SynapseConstants.ANONYMOUS_ENDPOINT + "'");
  145               }
  146               endpointName = SynapseConstants.ANONYMOUS_ENDPOINT;
  147           }
  148   
  149           if (isClusteringEnable) {
  150   
  151               // if this is a cluster environment, then set configuration context to endpoint context
  152               if (endpointContext.getConfigurationContext() == null) {
  153   
  154                   if (log.isDebugEnabled()) {
  155                       log.debug("Setting the ConfigurationContext to " +
  156                               "the EndpointContext with the name " + endpointName +
  157                               " for replicating data on the cluster");
  158                   }
  159                   endpointContext.setConfigurationContext(cc);
  160                   endpointContext.setContextID(endpointName);
  161               }
  162   
  163               // if this is a cluster environment, then set configuration context to load balance
  164               //  algorithm context
  165               if (algorithmContext.getConfigurationContext() == null) {
  166   
  167                   if (log.isDebugEnabled()) {
  168                       log.debug("Setting the ConfigurationContext to " +
  169                               "the AlgorithmContext with the name " + endpointName +
  170                               " for replicating data on the cluster");
  171                   }
  172                   algorithmContext.setConfigurationContext(cc);
  173                   algorithmContext.setContextID(endpointName);
  174               }
  175   
  176               // if this is a cluster environment, then set configuration context to session based
  177               // endpoint dispatcher
  178               if (dispatcherContext.getConfigurationContext() == null) {
  179   
  180                   if (log.isDebugEnabled()) {
  181                       log.debug("Setting the ConfigurationContext to " +
  182                               "the DispatcherContext with the name " + endpointName +
  183                               " for replicating data on the cluster");
  184                   }
  185                   dispatcherContext.setConfigurationContext(cc);
  186                   dispatcherContext.setContextID(endpointName);
  187   
  188                   if (log.isDebugEnabled()) {
  189                       log.debug("Setting the endpoints to the DispatcherContext : " + endpoints);
  190                   }
  191                   dispatcherContext.setEndpoints(endpoints);
  192               }
  193           }
  194   
  195           // first check if this session is associated with a session. if so, get the endpoint
  196           // associated for that session.
  197           Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
  198           if (endpoint == null) {
  199   
  200               // there is no endpoint associated with this session. get a new endpoint using the
  201               // load balance policy.
  202               endpoint = algorithm.getNextEndpoint(synMessageContext, algorithmContext);
  203   
  204               // this is a start of a new session. so update session map.
  205               if (dispatcher.isServerInitiatedSession()) {
  206   
  207                   if (log.isDebugEnabled()) {
  208                       log.debug("Adding a new server initiated session for the current message");
  209                   }
  210   
  211                   // add this endpoint to the endpoint sequence of operation context.
  212                   Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synMessageContext;
  213                   OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
  214   
  215                   if (isClusteringEnable) {
  216                       // If running on a cluster keep endpoint names, because it is heavy to
  217                       // replicate endpoint itself
  218   
  219                       Object o = opCtx.getPropertyNonReplicable(ENDPOINT_NAME_LIST);
  220                       List<String> epNameList;
  221                       if (o instanceof List) {
  222                           epNameList = (List<String>) o;
  223                           epNameList.add(endpointName);
  224                       } else {
  225                           // this is the first endpoint in the heirachy. so create the queue and
  226                           // insert this as the first element.
  227                           epNameList = new ArrayList<String>();
  228                           epNameList.add(endpointName);
  229                           opCtx.setNonReplicableProperty(ROOT_ENDPOINT, this);
  230                       }
  231                       
  232                       // if the next endpoint is not a session affinity one, endpoint sequence ends
  233                       // here. but we have to add the next endpoint to the list.
  234                       if (!(endpoint instanceof SALoadbalanceEndpoint)) {
  235   
  236                           String name;
  237                           if (endpoint instanceof IndirectEndpoint) {
  238                               name = ((IndirectEndpoint) endpoint).getKey();
  239                           } else {
  240                               name = endpoint.getName();
  241                           }
  242   
  243                           if (name == null) {
  244                               log.warn(WARN_MESSAGE);
  245                               name = SynapseConstants.ANONYMOUS_ENDPOINT;
  246                           }
  247                           epNameList.add(name);
  248                       }
  249   
  250                       if (log.isDebugEnabled()) {
  251                           log.debug("Operating on a cluster. Setting the endpoint name list to " +
  252                                   "the OperationContext : " + epNameList);
  253                       }
  254                       opCtx.setProperty(ENDPOINT_NAME_LIST, epNameList);
  255   
  256                   } else {
  257                       
  258                       Object o = opCtx.getProperty(ENDPOINT_LIST);
  259                       List<Endpoint> endpointList;
  260                       if (o instanceof List) {
  261                           endpointList = (List<Endpoint>) o;
  262                           endpointList.add(this);
  263                       } else {
  264                           // this is the first endpoint in the heirachy. so create the queue and
  265                           // insert this as the first element.
  266                           endpointList = new ArrayList<Endpoint>();
  267                           endpointList.add(this);
  268                           opCtx.setProperty(ENDPOINT_LIST, endpointList);
  269                       }
  270                       
  271                       // if the next endpoint is not a session affinity one, endpoint sequence ends
  272                       // here. but we have to add the next endpoint to the list.
  273                       if (!(endpoint instanceof SALoadbalanceEndpoint)) {
  274                           endpointList.add(endpoint);
  275                       }
  276                   }
  277   
  278               } else {
  279                   dispatcher.updateSession(synMessageContext, dispatcherContext, endpoint);
  280               }
  281   
  282               // this is the first request. so an endpoint has not been bound to this session and we
  283               // are free to failover if the currently selected endpoint is not working. but for
  284               // failover to work, we have to build the soap envelope.
  285               synMessageContext.getEnvelope().build();
  286   
  287               // we should also indicate that this is the first message in the session. so that
  288               // onFault(...) method can resend only the failed attempts for the first message.
  289               synMessageContext.setProperty(FIRST_MESSAGE_IN_SESSION, Boolean.TRUE);
  290           }
  291   
  292           if (endpoint != null) {
  293   
  294               // endpoints given by session dispatchers may not be active. therefore, we have check
  295               // it here.
  296               if (endpoint.isActive(synMessageContext)) {
  297                   if (log.isDebugEnabled()) {
  298                       log.debug("Using the endpoint on the session with "
  299                               + ((endpoint instanceof IndirectEndpoint) ? "key : "
  300                               + ((IndirectEndpoint) endpoint).getKey() : "name : "
  301                               + endpoint.getName()) + " for sending the message");
  302                   }
  303                   endpoint.send(synMessageContext);
  304               } else {
  305                   informFailure(synMessageContext);
  306               }
  307   
  308           } else {
  309   
  310               // all child endpoints have failed. so mark this also as failed.
  311               if (log.isDebugEnabled()) {
  312                   log.debug("Marking the Endpoint as failed, " +
  313                           "because all child endpoints has been failed");
  314               }
  315               setActive(false, synMessageContext);
  316               informFailure(synMessageContext);
  317           }
  318       }
  319   
  320       /**
  321        * This will be called for the response of the first message of each server initiated session.
  322        *
  323        * @param responseMsgCtx
  324        * @param endpointList
  325        * @param isClusteringEnable
  326        */
  327       public void updateSession(MessageContext responseMsgCtx, List endpointList,
  328                                 boolean isClusteringEnable) {
  329   
  330           Endpoint endpoint = null;
  331   
  332           if (isClusteringEnable) {
  333               // if this is a clustering env. only keep endpoint names, because, it is heavy to
  334               // replicate endpoint itself
  335               String epNameObj = (String) endpointList.remove(0);
  336               for (Endpoint ep : endpoints) {
  337                   if (ep != null) {
  338   
  339                       String name;
  340                       if (ep instanceof IndirectEndpoint) {
  341                           name = ((IndirectEndpoint) ep).getKey();
  342                       } else {
  343                           name = ep.getName();
  344                       }
  345   
  346                       if (name != null && name.equals(epNameObj)) {
  347                           endpoint = ep;
  348                           break;
  349                       }
  350                   }
  351               }
  352   
  353           } else {
  354               endpoint = (Endpoint) endpointList.remove(0);
  355           }
  356   
  357           if (endpoint != null) {
  358   
  359               dispatcher.updateSession(responseMsgCtx, dispatcherContext, endpoint);
  360               if (endpoint instanceof SALoadbalanceEndpoint) {
  361                   ((SALoadbalanceEndpoint) endpoint).updateSession(
  362                           responseMsgCtx, endpointList, isClusteringEnable);
  363               }
  364           }
  365       }
  366   
  367       public String getName() {
  368           return name;
  369       }
  370   
  371       public void setName(String name) {
  372           this.name = name.trim();
  373       }
  374   
  375       public LoadbalanceAlgorithm getAlgorithm() {
  376           return algorithm;
  377       }
  378   
  379       public void setAlgorithm(LoadbalanceAlgorithm algorithm) {
  380           this.algorithm = algorithm;
  381       }
  382   
  383       /**
  384        * This is active in below conditions:
  385        * If a session is not started AND at least one child endpoint is active.
  386        * If a session is started AND the binding endpoint is active.
  387        * <p/>
  388        * This is not active for all other conditions.
  389        *
  390        * @param synMessageContext MessageContext of the current message. This is used to determine the
  391        *                          session.
  392        * @return true is active. false otherwise.
  393        */
  394       public boolean isActive(MessageContext synMessageContext) {
  395           // todo: implement above
  396           boolean active;
  397           Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
  398           if (endpoint == null) { // If a session is not started
  399               active = endpointContext.isActive();
  400               if (!active && endpoints != null) {
  401                   for (Endpoint ep : endpoints) {
  402                       if (ep != null) {
  403                           active = ep.isActive(synMessageContext);
  404                           if (active) {    //AND at least one child endpoint is active
  405                               endpointContext.setActive(active);
  406                               // don't break the loop though we found one active endpoint. calling isActive()
  407                               // on all child endpoints will update their active state. so this is a good
  408                               // time to do that.
  409                           }
  410                       }
  411                   }
  412               }
  413           } else {
  414               //If a session is started AND the binding endpoint is active.
  415               active = endpoint.isActive(synMessageContext);
  416               if (active) {
  417                   endpointContext.setActive(active);
  418               }
  419           }
  420   
  421           if (log.isDebugEnabled()) {
  422               log.debug("SALoadbalanceEndpoint with name '" + getName() + "' is in "
  423                       + (active ? "active" : "inactive") + " state");
  424           }
  425   
  426           return active;
  427       }
  428   
  429       public void setActive(boolean active, MessageContext synMessageContext) {
  430           endpointContext.setActive(active);
  431       }
  432   
  433       public List<Endpoint> getEndpoints() {
  434           return endpoints;
  435       }
  436   
  437       public void setEndpoints(List<Endpoint> endpoints) {
  438           this.endpoints = endpoints;
  439       }
  440   
  441       public void setParentEndpoint(Endpoint parentEndpoint) {
  442           this.parentEndpoint = parentEndpoint;
  443       }
  444   
  445       public Dispatcher getDispatcher() {
  446           return dispatcher;
  447       }
  448   
  449       public void setDispatcher(Dispatcher dispatcher) {
  450           this.dispatcher = dispatcher;
  451       }
  452   
  453       /**
  454        * It is logically incorrect to failover a session affinity endpoint after the session has started.
  455        * If we redirect a message belonging to a particular session, new endpoint is not aware of the
  456        * session. So we can't handle anything more at the endpoint level. Therefore, this method just
  457        * deactivate the failed endpoint and give the fault to the next fault handler.
  458        * <p/>
  459        * But if the session has not started (i.e. first message), the message will be resend by binding
  460        * it to a different endpoint.
  461        *
  462        * @param endpoint          Failed endpoint.
  463        * @param synMessageContext MessageContext of the failed message.
  464        */
  465       public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
  466   
  467           Object o = synMessageContext.getProperty(FIRST_MESSAGE_IN_SESSION);
  468   
  469           if (o != null && Boolean.TRUE.equals(o)) {
  470   
  471               // this is the first message. so unbind the sesion with failed endpoint and start
  472               // new one by resending.
  473               dispatcher.unbind(synMessageContext, dispatcherContext);
  474               send(synMessageContext);
  475   
  476           } else {
  477   
  478               // session has already started. we can't failover.
  479               informFailure(synMessageContext);
  480           }
  481       }
  482   
  483       private void informFailure(MessageContext synMessageContext) {
  484   
  485           log.warn("Failed to send using the selected endpoint, becasue it is inactive");
  486           
  487           if (parentEndpoint != null) {
  488               parentEndpoint.onChildEndpointFail(this, synMessageContext);
  489           } else {
  490               Object o = synMessageContext.getFaultStack().pop();
  491               if (o != null) {
  492                   ((FaultHandler) o).handleFault(synMessageContext);
  493               }
  494           }
  495       }
  496   
  497   }

Home » synapse-1.2-src » org.apache.synapse.endpoints » [javadoc | source]