1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.endpoints; 21 22 import org.apache.axis2.clustering.ClusterManager; 23 import org.apache.axis2.context.ConfigurationContext; 24 import org.apache.synapse.MessageContext; 25 import org.apache.synapse.SynapseConstants; 26 import org.apache.synapse.core.axis2.Axis2MessageContext; 27 import org.apache.synapse.endpoints.utils.EndpointDefinition; 28 import org.apache.synapse.statistics.impl.EndPointStatisticsStack; 29 30 /** 31 * This class represents an actual endpoint to send the message. It is responsible for sending the 32 * message, performing retries if a failure occurred and informing the parent endpoint if a failure 33 * couldn't be recovered. 34 */ 35 public class AddressEndpoint extends DefaultEndpoint { 36 37 /** 38 * The endpoint context , place holder for keep any runtime states related to the endpoint 39 */ 40 private final EndpointContext endpointContext = new EndpointContext(); 41 42 /** 43 * Checks if the endpoint is active (failed or not). If endpoint is in failed state and 44 * suspendOnFailDuration has elapsed, it will be set to active. 45 * 46 * @param synMessageContext MessageContext of the current message. This is not used here. 47 * @return true if endpoint is active. false otherwise. 48 */ 49 public boolean isActive(MessageContext synMessageContext) { 50 51 boolean active = endpointContext.isActive(); 52 if (!active) { 53 54 long recoverOn = endpointContext.getRecoverOn(); 55 if (System.currentTimeMillis() > recoverOn) { 56 active = true; 57 endpointContext.setActive(true); 58 endpointContext.setRecoverOn(0); 59 60 } 61 } 62 63 if (log.isDebugEnabled()) { 64 log.debug("AddressEndpoint with name '" + getName() + "' is in " 65 + (active ? "active" : "inactive") + " state"); 66 } 67 68 return active; 69 } 70 71 /** 72 * Sets if endpoint active or not. if endpoint is set as failed (active = false), the recover on 73 * time is calculated so that it will be activated after the recover on time. 74 * 75 * @param active true if active. false otherwise. 76 * @param synMessageContext MessageContext of the current message. This is not used here. 77 */ 78 public synchronized void setActive(boolean active, MessageContext synMessageContext) { 79 80 // this is synchronized as recoverOn can be set to unpredictable values if two threads call 81 // this method simultaneously. 82 83 if (!active) { 84 EndpointDefinition endpoint = getEndpoint(); 85 if (endpoint.getSuspendOnFailDuration() != -1) { 86 // Calculating a new value by adding suspendOnFailDuration to current time. 87 // as the endpoint is set as failed 88 endpointContext.setRecoverOn( 89 System.currentTimeMillis() + endpoint.getSuspendOnFailDuration()); 90 } else { 91 endpointContext.setRecoverOn(Long.MAX_VALUE); 92 } 93 } 94 95 this.endpointContext.setActive(active); 96 } 97 98 /** 99 * Sends the message through this endpoint. This method just handles statistics related 100 * functions and gives the message to the Synapse environment to send. It does not add any 101 * endpoint specific details to the message context. These details are added only to the cloned 102 * message context by the Axis2FlexibleMepClient. So that we can reuse the original message 103 * context for resending through different endpoints. 104 * 105 * @param synCtx MessageContext sent by client to Synapse 106 */ 107 public void send(MessageContext synCtx) { 108 109 boolean traceOn = isTraceOn(synCtx); 110 boolean traceOrDebugOn = isTraceOrDebugOn(traceOn); 111 112 if (traceOrDebugOn) { 113 traceOrDebug(traceOn, "Start : Address Endpoint"); 114 115 if (traceOn && trace.isTraceEnabled()) { 116 trace.trace("Message : " + synCtx.getEnvelope()); 117 } 118 } 119 120 boolean isClusteringEnable = false; 121 // get Axis2 MessageContext and ConfigurationContext 122 org.apache.axis2.context.MessageContext axisMC = 123 ((Axis2MessageContext) synCtx).getAxis2MessageContext(); 124 ConfigurationContext cc = axisMC.getConfigurationContext(); 125 126 //The check for clustering environment 127 128 ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager(); 129 if (clusterManager != null && 130 clusterManager.getContextManager() != null) { 131 isClusteringEnable = true; 132 } 133 134 String endPointName = this.getName(); 135 if (endPointName == null) { 136 137 if (traceOrDebugOn && isClusteringEnable) { 138 log.warn(SALoadbalanceEndpoint.WARN_MESSAGE); 139 } 140 endPointName = SynapseConstants.ANONYMOUS_ENDPOINT; 141 } 142 143 if (isClusteringEnable) { 144 145 // if this is a cluster environment , then set configuration context to endpoint context 146 if (endpointContext.getConfigurationContext() == null) { 147 endpointContext.setConfigurationContext(cc); 148 endpointContext.setContextID(endPointName); // The context ID 149 } 150 } 151 152 EndpointDefinition endpoint = getEndpoint(); 153 // Setting Required property to collect the End Point statistics 154 boolean statisticsEnable 155 = (SynapseConstants.STATISTICS_ON == endpoint.getStatisticsState()); 156 if (statisticsEnable) { 157 EndPointStatisticsStack endPointStatisticsStack = null; 158 Object statisticsStackObj = 159 synCtx.getProperty(org.apache.synapse.SynapseConstants.ENDPOINT_STATS); 160 if (statisticsStackObj == null) { 161 endPointStatisticsStack = new EndPointStatisticsStack(); 162 synCtx.setProperty(org.apache.synapse.SynapseConstants.ENDPOINT_STATS, 163 endPointStatisticsStack); 164 } else if (statisticsStackObj instanceof EndPointStatisticsStack) { 165 endPointStatisticsStack = (EndPointStatisticsStack) statisticsStackObj; 166 } 167 if (endPointStatisticsStack != null) { 168 boolean isFault = synCtx.getEnvelope().getBody().hasFault(); 169 endPointStatisticsStack.put(endPointName, System.currentTimeMillis(), 170 !synCtx.isResponse(), statisticsEnable, isFault); 171 } 172 } 173 174 if (endpoint.getAddress() != null) { 175 if (traceOrDebugOn) { 176 traceOrDebug(traceOn, "Sending message to endpoint : " + 177 endPointName + " resolves to address = " + endpoint.getAddress()); 178 traceOrDebug(traceOn, "SOAPAction: " + (synCtx.getSoapAction() != null ? 179 synCtx.getSoapAction() : "null")); 180 traceOrDebug(traceOn, "WSA-Action: " + (synCtx.getWSAAction() != null ? 181 synCtx.getWSAAction() : "null")); 182 183 if (traceOn && trace.isTraceEnabled()) { 184 trace.trace("Envelope : \n" + synCtx.getEnvelope()); 185 } 186 } 187 } 188 189 // register this as the immediate fault handler for this message. 190 synCtx.pushFaultHandler(this); 191 192 // add this as the last endpoint to process this message. it is used by statistics code. 193 synCtx.setProperty(SynapseConstants.PROCESSED_ENDPOINT, this); 194 195 synCtx.getEnvironment().send(endpoint, synCtx); 196 } 197 198 public void onFault(MessageContext synCtx) { 199 // perform retries here 200 201 // if this endpoint has actually failed, inform the parent. 202 setActive(false, synCtx); 203 super.onFault(synCtx); 204 } 205 }