1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.endpoints; 21 22 import org.apache.axis2.clustering.ClusterManager; 23 import org.apache.axis2.context.ConfigurationContext; 24 import org.apache.axis2.context.OperationContext; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 import org.apache.synapse.FaultHandler; 28 import org.apache.synapse.MessageContext; 29 import org.apache.synapse.SynapseConstants; 30 import org.apache.synapse.core.axis2.Axis2MessageContext; 31 import org.apache.synapse.endpoints.algorithms.AlgorithmContext; 32 import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm; 33 import org.apache.synapse.endpoints.dispatch.Dispatcher; 34 import org.apache.synapse.endpoints.dispatch.DispatcherContext; 35 36 import java.util.ArrayList; 37 import java.util.List; 38 39 /** 40 * SALoadbalanceEndpoint supports session affinity based load balancing. Each of this endpoint 41 * maintains a list of dispatchers. These dispatchers will be updated for both request (for client 42 * initiated sessions) and response (for server initiated sessions). Once updated, each dispatcher 43 * will check if has already encountered that session. If not, it will update the 44 * session -> endpoint map. To update sessions for response messages, all SALoadbalanceEndpoint 45 * objects are kept in a global property. When a message passes through SALoadbalanceEndpoints, each 46 * endpoint appends its "Synapse unique ID" to the operation context. Once the response for that 47 * message arrives, response sender checks first endpoint of the endpoint sequence from the 48 * operation context and get that endpoint from the above mentioned global property. Then it will 49 * invoke updateSession(...) method of that endpoint. After that, each endpoint will call 50 * updateSession(...) method of their appropriate child endpoint, so that all the sending endpoints 51 * for the session will be updated. 52 * <p/> 53 * This endpoint gets the target endpoint first from the dispatch manager, which will ask all listed 54 * dispatchers for a matching session. If a matching session is found it will just invoke the 55 * send(...) method of that endpoint. If not it will find an endpoint using the load balancing 56 * policy and send to that endpoint. 57 */ 58 public class SALoadbalanceEndpoint implements Endpoint { 59 60 private static final Log log = LogFactory.getLog(SALoadbalanceEndpoint.class); 61 62 private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session"; 63 public static final String ENDPOINT_LIST = "endpointList"; 64 public static final String ROOT_ENDPOINT = "rootendpoint"; 65 public static final String ENDPOINT_NAME_LIST = "endpointNameList"; 66 public static final String WARN_MESSAGE = "In a clustering environment, the endpoint " + 67 "name should be specified even for anonymous endpoints. Otherwise the clustering " + 68 "would not function properly, if there are more than one anonymous endpoints."; 69 70 /** 71 * Name of the endpoint. Used for named endpoints which can be referred using the key attribute 72 * of indirect endpoints. 73 */ 74 private String name = null; 75 76 /** 77 * List of endpoints among which the load is distributed. Any object implementing the Endpoint 78 * interface could be used. 79 */ 80 private List<Endpoint> endpoints = null; 81 82 /** 83 * Algorithm used for selecting the next endpoint to direct the first request of sessions. 84 * Default is RoundRobin. 85 */ 86 private LoadbalanceAlgorithm algorithm = null; 87 88 /** 89 * Parent endpoint of this endpoint if this used inside another endpoint. Although any endpoint 90 * can be the parent, only SALoadbalanceEndpoint should be used here. Use of any other endpoint 91 * would invalidate the session. 92 */ 93 private Endpoint parentEndpoint = null; 94 95 /** 96 * Dispatcher used for session affinity. 97 */ 98 private Dispatcher dispatcher = null; 99 100 /** 101 * The dispatcher context, place holder for keeping any runtime states that are used when 102 * finding endpoint for the session 103 */ 104 private final DispatcherContext dispatcherContext = new DispatcherContext(); 105 106 /** 107 * The endpoint context, place holder for keeping any runtime states related to the endpoint 108 */ 109 private final EndpointContext endpointContext = new EndpointContext(); 110 111 /** 112 * The algorithm context, place holder for keeping any runtime states related to the load 113 * balance algorithm 114 */ 115 private final AlgorithmContext algorithmContext = new AlgorithmContext(); 116 117 118 public void send(MessageContext synMessageContext) { 119 120 if (log.isDebugEnabled()) { 121 log.debug("Start : Session Affinity Load-balance Endpoint " + name); 122 } 123 124 boolean isClusteringEnable = false; 125 // get Axis2 MessageContext and ConfigurationContext 126 org.apache.axis2.context.MessageContext axisMC = 127 ((Axis2MessageContext) synMessageContext).getAxis2MessageContext(); 128 ConfigurationContext cc = axisMC.getConfigurationContext(); 129 130 //The check for clustering environment 131 ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager(); 132 if (clusterManager != null && 133 clusterManager.getContextManager() != null) { 134 isClusteringEnable = true; 135 } 136 137 String endpointName = this.getName(); 138 if (endpointName == null) { 139 if (isClusteringEnable) { 140 log.warn(WARN_MESSAGE); 141 } 142 if (log.isDebugEnabled()) { 143 log.debug("Using the name for the anonymous endpoint as : '" 144 + SynapseConstants.ANONYMOUS_ENDPOINT + "'"); 145 } 146 endpointName = SynapseConstants.ANONYMOUS_ENDPOINT; 147 } 148 149 if (isClusteringEnable) { 150 151 // if this is a cluster environment, then set configuration context to endpoint context 152 if (endpointContext.getConfigurationContext() == null) { 153 154 if (log.isDebugEnabled()) { 155 log.debug("Setting the ConfigurationContext to " + 156 "the EndpointContext with the name " + endpointName + 157 " for replicating data on the cluster"); 158 } 159 endpointContext.setConfigurationContext(cc); 160 endpointContext.setContextID(endpointName); 161 } 162 163 // if this is a cluster environment, then set configuration context to load balance 164 // algorithm context 165 if (algorithmContext.getConfigurationContext() == null) { 166 167 if (log.isDebugEnabled()) { 168 log.debug("Setting the ConfigurationContext to " + 169 "the AlgorithmContext with the name " + endpointName + 170 " for replicating data on the cluster"); 171 } 172 algorithmContext.setConfigurationContext(cc); 173 algorithmContext.setContextID(endpointName); 174 } 175 176 // if this is a cluster environment, then set configuration context to session based 177 // endpoint dispatcher 178 if (dispatcherContext.getConfigurationContext() == null) { 179 180 if (log.isDebugEnabled()) { 181 log.debug("Setting the ConfigurationContext to " + 182 "the DispatcherContext with the name " + endpointName + 183 " for replicating data on the cluster"); 184 } 185 dispatcherContext.setConfigurationContext(cc); 186 dispatcherContext.setContextID(endpointName); 187 188 if (log.isDebugEnabled()) { 189 log.debug("Setting the endpoints to the DispatcherContext : " + endpoints); 190 } 191 dispatcherContext.setEndpoints(endpoints); 192 } 193 } 194 195 // first check if this session is associated with a session. if so, get the endpoint 196 // associated for that session. 197 Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext); 198 if (endpoint == null) { 199 200 // there is no endpoint associated with this session. get a new endpoint using the 201 // load balance policy. 202 endpoint = algorithm.getNextEndpoint(synMessageContext, algorithmContext); 203 204 // this is a start of a new session. so update session map. 205 if (dispatcher.isServerInitiatedSession()) { 206 207 if (log.isDebugEnabled()) { 208 log.debug("Adding a new server initiated session for the current message"); 209 } 210 211 // add this endpoint to the endpoint sequence of operation context. 212 Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synMessageContext; 213 OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext(); 214 215 if (isClusteringEnable) { 216 // If running on a cluster keep endpoint names, because it is heavy to 217 // replicate endpoint itself 218 219 Object o = opCtx.getPropertyNonReplicable(ENDPOINT_NAME_LIST); 220 List<String> epNameList; 221 if (o instanceof List) { 222 epNameList = (List<String>) o; 223 epNameList.add(endpointName); 224 } else { 225 // this is the first endpoint in the heirachy. so create the queue and 226 // insert this as the first element. 227 epNameList = new ArrayList<String>(); 228 epNameList.add(endpointName); 229 opCtx.setNonReplicableProperty(ROOT_ENDPOINT, this); 230 } 231 232 // if the next endpoint is not a session affinity one, endpoint sequence ends 233 // here. but we have to add the next endpoint to the list. 234 if (!(endpoint instanceof SALoadbalanceEndpoint)) { 235 236 String name; 237 if (endpoint instanceof IndirectEndpoint) { 238 name = ((IndirectEndpoint) endpoint).getKey(); 239 } else { 240 name = endpoint.getName(); 241 } 242 243 if (name == null) { 244 log.warn(WARN_MESSAGE); 245 name = SynapseConstants.ANONYMOUS_ENDPOINT; 246 } 247 epNameList.add(name); 248 } 249 250 if (log.isDebugEnabled()) { 251 log.debug("Operating on a cluster. Setting the endpoint name list to " + 252 "the OperationContext : " + epNameList); 253 } 254 opCtx.setProperty(ENDPOINT_NAME_LIST, epNameList); 255 256 } else { 257 258 Object o = opCtx.getProperty(ENDPOINT_LIST); 259 List<Endpoint> endpointList; 260 if (o instanceof List) { 261 endpointList = (List<Endpoint>) o; 262 endpointList.add(this); 263 } else { 264 // this is the first endpoint in the heirachy. so create the queue and 265 // insert this as the first element. 266 endpointList = new ArrayList<Endpoint>(); 267 endpointList.add(this); 268 opCtx.setProperty(ENDPOINT_LIST, endpointList); 269 } 270 271 // if the next endpoint is not a session affinity one, endpoint sequence ends 272 // here. but we have to add the next endpoint to the list. 273 if (!(endpoint instanceof SALoadbalanceEndpoint)) { 274 endpointList.add(endpoint); 275 } 276 } 277 278 } else { 279 dispatcher.updateSession(synMessageContext, dispatcherContext, endpoint); 280 } 281 282 // this is the first request. so an endpoint has not been bound to this session and we 283 // are free to failover if the currently selected endpoint is not working. but for 284 // failover to work, we have to build the soap envelope. 285 synMessageContext.getEnvelope().build(); 286 287 // we should also indicate that this is the first message in the session. so that 288 // onFault(...) method can resend only the failed attempts for the first message. 289 synMessageContext.setProperty(FIRST_MESSAGE_IN_SESSION, Boolean.TRUE); 290 } 291 292 if (endpoint != null) { 293 294 // endpoints given by session dispatchers may not be active. therefore, we have check 295 // it here. 296 if (endpoint.isActive(synMessageContext)) { 297 if (log.isDebugEnabled()) { 298 log.debug("Using the endpoint on the session with " 299 + ((endpoint instanceof IndirectEndpoint) ? "key : " 300 + ((IndirectEndpoint) endpoint).getKey() : "name : " 301 + endpoint.getName()) + " for sending the message"); 302 } 303 endpoint.send(synMessageContext); 304 } else { 305 informFailure(synMessageContext); 306 } 307 308 } else { 309 310 // all child endpoints have failed. so mark this also as failed. 311 if (log.isDebugEnabled()) { 312 log.debug("Marking the Endpoint as failed, " + 313 "because all child endpoints has been failed"); 314 } 315 setActive(false, synMessageContext); 316 informFailure(synMessageContext); 317 } 318 } 319 320 /** 321 * This will be called for the response of the first message of each server initiated session. 322 * 323 * @param responseMsgCtx 324 * @param endpointList 325 * @param isClusteringEnable 326 */ 327 public void updateSession(MessageContext responseMsgCtx, List endpointList, 328 boolean isClusteringEnable) { 329 330 Endpoint endpoint = null; 331 332 if (isClusteringEnable) { 333 // if this is a clustering env. only keep endpoint names, because, it is heavy to 334 // replicate endpoint itself 335 String epNameObj = (String) endpointList.remove(0); 336 for (Endpoint ep : endpoints) { 337 if (ep != null) { 338 339 String name; 340 if (ep instanceof IndirectEndpoint) { 341 name = ((IndirectEndpoint) ep).getKey(); 342 } else { 343 name = ep.getName(); 344 } 345 346 if (name != null && name.equals(epNameObj)) { 347 endpoint = ep; 348 break; 349 } 350 } 351 } 352 353 } else { 354 endpoint = (Endpoint) endpointList.remove(0); 355 } 356 357 if (endpoint != null) { 358 359 dispatcher.updateSession(responseMsgCtx, dispatcherContext, endpoint); 360 if (endpoint instanceof SALoadbalanceEndpoint) { 361 ((SALoadbalanceEndpoint) endpoint).updateSession( 362 responseMsgCtx, endpointList, isClusteringEnable); 363 } 364 } 365 } 366 367 public String getName() { 368 return name; 369 } 370 371 public void setName(String name) { 372 this.name = name.trim(); 373 } 374 375 public LoadbalanceAlgorithm getAlgorithm() { 376 return algorithm; 377 } 378 379 public void setAlgorithm(LoadbalanceAlgorithm algorithm) { 380 this.algorithm = algorithm; 381 } 382 383 /** 384 * This is active in below conditions: 385 * If a session is not started AND at least one child endpoint is active. 386 * If a session is started AND the binding endpoint is active. 387 * <p/> 388 * This is not active for all other conditions. 389 * 390 * @param synMessageContext MessageContext of the current message. This is used to determine the 391 * session. 392 * @return true is active. false otherwise. 393 */ 394 public boolean isActive(MessageContext synMessageContext) { 395 // todo: implement above 396 boolean active; 397 Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext); 398 if (endpoint == null) { // If a session is not started 399 active = endpointContext.isActive(); 400 if (!active && endpoints != null) { 401 for (Endpoint ep : endpoints) { 402 if (ep != null) { 403 active = ep.isActive(synMessageContext); 404 if (active) { //AND at least one child endpoint is active 405 endpointContext.setActive(active); 406 // don't break the loop though we found one active endpoint. calling isActive() 407 // on all child endpoints will update their active state. so this is a good 408 // time to do that. 409 } 410 } 411 } 412 } 413 } else { 414 //If a session is started AND the binding endpoint is active. 415 active = endpoint.isActive(synMessageContext); 416 if (active) { 417 endpointContext.setActive(active); 418 } 419 } 420 421 if (log.isDebugEnabled()) { 422 log.debug("SALoadbalanceEndpoint with name '" + getName() + "' is in " 423 + (active ? "active" : "inactive") + " state"); 424 } 425 426 return active; 427 } 428 429 public void setActive(boolean active, MessageContext synMessageContext) { 430 endpointContext.setActive(active); 431 } 432 433 public List<Endpoint> getEndpoints() { 434 return endpoints; 435 } 436 437 public void setEndpoints(List<Endpoint> endpoints) { 438 this.endpoints = endpoints; 439 } 440 441 public void setParentEndpoint(Endpoint parentEndpoint) { 442 this.parentEndpoint = parentEndpoint; 443 } 444 445 public Dispatcher getDispatcher() { 446 return dispatcher; 447 } 448 449 public void setDispatcher(Dispatcher dispatcher) { 450 this.dispatcher = dispatcher; 451 } 452 453 /** 454 * It is logically incorrect to failover a session affinity endpoint after the session has started. 455 * If we redirect a message belonging to a particular session, new endpoint is not aware of the 456 * session. So we can't handle anything more at the endpoint level. Therefore, this method just 457 * deactivate the failed endpoint and give the fault to the next fault handler. 458 * <p/> 459 * But if the session has not started (i.e. first message), the message will be resend by binding 460 * it to a different endpoint. 461 * 462 * @param endpoint Failed endpoint. 463 * @param synMessageContext MessageContext of the failed message. 464 */ 465 public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) { 466 467 Object o = synMessageContext.getProperty(FIRST_MESSAGE_IN_SESSION); 468 469 if (o != null && Boolean.TRUE.equals(o)) { 470 471 // this is the first message. so unbind the sesion with failed endpoint and start 472 // new one by resending. 473 dispatcher.unbind(synMessageContext, dispatcherContext); 474 send(synMessageContext); 475 476 } else { 477 478 // session has already started. we can't failover. 479 informFailure(synMessageContext); 480 } 481 } 482 483 private void informFailure(MessageContext synMessageContext) { 484 485 log.warn("Failed to send using the selected endpoint, becasue it is inactive"); 486 487 if (parentEndpoint != null) { 488 parentEndpoint.onChildEndpointFail(this, synMessageContext); 489 } else { 490 Object o = synMessageContext.getFaultStack().pop(); 491 if (o != null) { 492 ((FaultHandler) o).handleFault(synMessageContext); 493 } 494 } 495 } 496 497 }