Home » synapse-1.2-src » org.apache.synapse.endpoints » [javadoc | source]

    1   /*
    2    *  Licensed to the Apache Software Foundation (ASF) under one
    3    *  or more contributor license agreements.  See the NOTICE file
    4    *  distributed with this work for additional information
    5    *  regarding copyright ownership.  The ASF licenses this file
    6    *  to you under the Apache License, Version 2.0 (the
    7    *  "License"); you may not use this file except in compliance
    8    *  with the License.  You may obtain a copy of the License at
    9    *
   10    *   http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing,
   13    *  software distributed under the License is distributed on an
   14    *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    *  KIND, either express or implied.  See the License for the
   16    *  specific language governing permissions and limitations
   17    *  under the License.
   18    */
   19   
   20   package org.apache.synapse.endpoints;
   21   
   22   import org.apache.axis2.clustering.ClusterManager;
   23   import org.apache.axis2.context.ConfigurationContext;
   24   import org.apache.synapse.MessageContext;
   25   import org.apache.synapse.SynapseConstants;
   26   import org.apache.synapse.core.axis2.Axis2MessageContext;
   27   import org.apache.synapse.endpoints.utils.EndpointDefinition;
   28   import org.apache.synapse.statistics.impl.EndPointStatisticsStack;
   29   
   30   /**
   31    * This class represents an actual endpoint to send the message. It is responsible for sending the
   32    * message, performing retries if a failure occurred and informing the parent endpoint if a failure
   33    * couldn't be recovered.
   34    */
   35   public class AddressEndpoint extends DefaultEndpoint {
   36   
   37       /**
   38        * The endpoint context , place holder for keep any runtime states related to the endpoint
   39        */
   40       private final EndpointContext endpointContext = new EndpointContext();
   41   
   42       /**
   43        * Checks if the endpoint is active (failed or not). If endpoint is in failed state and
   44        * suspendOnFailDuration has elapsed, it will be set to active.
   45        *
   46        * @param synMessageContext MessageContext of the current message. This is not used here.
   47        * @return true if endpoint is active. false otherwise.
   48        */
   49       public boolean isActive(MessageContext synMessageContext) {
   50   
   51           boolean active = endpointContext.isActive();
   52           if (!active) {
   53   
   54               long recoverOn = endpointContext.getRecoverOn();
   55               if (System.currentTimeMillis() > recoverOn) {
   56                   active = true;
   57                   endpointContext.setActive(true);
   58                   endpointContext.setRecoverOn(0);                       
   59   
   60               }
   61           }
   62   
   63           if (log.isDebugEnabled()) {
   64               log.debug("AddressEndpoint with name '" + getName() + "' is in "
   65                       + (active ? "active" : "inactive") + " state");
   66           }
   67   
   68           return active;
   69       }
   70   
   71       /**
   72        * Sets if endpoint active or not. if endpoint is set as failed (active = false), the recover on
   73        * time is calculated so that it will be activated after the recover on time.
   74        *
   75        * @param active            true if active. false otherwise.
   76        * @param synMessageContext MessageContext of the current message. This is not used here.
   77        */
   78       public synchronized void setActive(boolean active, MessageContext synMessageContext) {
   79   
   80           // this is synchronized as recoverOn can be set to unpredictable values if two threads call
   81           // this method simultaneously.
   82   
   83           if (!active) {
   84               EndpointDefinition endpoint = getEndpoint();
   85               if (endpoint.getSuspendOnFailDuration() != -1) {
   86                   // Calculating a new value by adding suspendOnFailDuration to current time.
   87                   // as the endpoint is set as failed
   88                   endpointContext.setRecoverOn(
   89                           System.currentTimeMillis() + endpoint.getSuspendOnFailDuration());
   90               } else {
   91                   endpointContext.setRecoverOn(Long.MAX_VALUE);
   92               }
   93           }
   94   
   95           this.endpointContext.setActive(active);
   96       }
   97   
   98       /**
   99        * Sends the message through this endpoint. This method just handles statistics related
  100        * functions and gives the message to the Synapse environment to send. It does not add any
  101        * endpoint specific details to the message context. These details are added only to the cloned
  102        * message context by the Axis2FlexibleMepClient. So that we can reuse the original message
  103        * context for resending through different endpoints.
  104        *
  105        * @param synCtx MessageContext sent by client to Synapse
  106        */
  107       public void send(MessageContext synCtx) {
  108   
  109           boolean traceOn = isTraceOn(synCtx);
  110           boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
  111   
  112           if (traceOrDebugOn) {
  113               traceOrDebug(traceOn, "Start : Address Endpoint");
  114   
  115               if (traceOn && trace.isTraceEnabled()) {
  116                   trace.trace("Message : " + synCtx.getEnvelope());
  117               }
  118           }
  119   
  120           boolean isClusteringEnable = false;
  121           // get Axis2 MessageContext and ConfigurationContext
  122           org.apache.axis2.context.MessageContext axisMC =
  123                   ((Axis2MessageContext) synCtx).getAxis2MessageContext();
  124           ConfigurationContext cc = axisMC.getConfigurationContext();
  125   
  126           //The check for clustering environment
  127   
  128           ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
  129           if (clusterManager != null &&
  130                   clusterManager.getContextManager() != null) {
  131               isClusteringEnable = true;
  132           }
  133   
  134           String endPointName = this.getName();
  135           if (endPointName == null) {
  136   
  137               if (traceOrDebugOn && isClusteringEnable) {
  138                   log.warn(SALoadbalanceEndpoint.WARN_MESSAGE);
  139               }
  140               endPointName = SynapseConstants.ANONYMOUS_ENDPOINT;
  141           }
  142   
  143           if (isClusteringEnable) {
  144   
  145               // if this is a cluster environment , then set configuration context to endpoint context
  146               if (endpointContext.getConfigurationContext() == null) {
  147                   endpointContext.setConfigurationContext(cc);
  148                   endpointContext.setContextID(endPointName); // The context ID
  149               }
  150           }
  151   
  152           EndpointDefinition endpoint = getEndpoint();
  153           // Setting Required property to collect the End Point statistics
  154           boolean statisticsEnable
  155                   = (SynapseConstants.STATISTICS_ON == endpoint.getStatisticsState());
  156           if (statisticsEnable) {
  157               EndPointStatisticsStack endPointStatisticsStack = null;
  158               Object statisticsStackObj =
  159                       synCtx.getProperty(org.apache.synapse.SynapseConstants.ENDPOINT_STATS);
  160               if (statisticsStackObj == null) {
  161                   endPointStatisticsStack = new EndPointStatisticsStack();
  162                   synCtx.setProperty(org.apache.synapse.SynapseConstants.ENDPOINT_STATS,
  163                           endPointStatisticsStack);
  164               } else if (statisticsStackObj instanceof EndPointStatisticsStack) {
  165                   endPointStatisticsStack = (EndPointStatisticsStack) statisticsStackObj;
  166               }
  167               if (endPointStatisticsStack != null) {
  168                   boolean isFault = synCtx.getEnvelope().getBody().hasFault();
  169                   endPointStatisticsStack.put(endPointName, System.currentTimeMillis(),
  170                           !synCtx.isResponse(), statisticsEnable, isFault);
  171               }
  172           }
  173   
  174           if (endpoint.getAddress() != null) {
  175               if (traceOrDebugOn) {
  176                   traceOrDebug(traceOn, "Sending message to endpoint : " +
  177                           endPointName + " resolves to address = " + endpoint.getAddress());
  178                   traceOrDebug(traceOn, "SOAPAction: " + (synCtx.getSoapAction() != null ?
  179                           synCtx.getSoapAction() : "null"));
  180                   traceOrDebug(traceOn, "WSA-Action: " + (synCtx.getWSAAction() != null ?
  181                           synCtx.getWSAAction() : "null"));
  182   
  183                   if (traceOn && trace.isTraceEnabled()) {
  184                       trace.trace("Envelope : \n" + synCtx.getEnvelope());
  185                   }
  186               }
  187           }
  188   
  189           // register this as the immediate fault handler for this message.
  190           synCtx.pushFaultHandler(this);
  191   
  192           // add this as the last endpoint to process this message. it is used by statistics code.
  193           synCtx.setProperty(SynapseConstants.PROCESSED_ENDPOINT, this);
  194   
  195           synCtx.getEnvironment().send(endpoint, synCtx);
  196       }
  197   
  198       public void onFault(MessageContext synCtx) {
  199           // perform retries here
  200   
  201           // if this endpoint has actually failed, inform the parent.
  202           setActive(false, synCtx);
  203           super.onFault(synCtx);
  204       }
  205   }

Home » synapse-1.2-src » org.apache.synapse.endpoints » [javadoc | source]