Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » ft » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.ft;
   18   
   19   import java.io.IOException;
   20   import java.net.URI;
   21   import java.net.URISyntaxException;
   22   import java.util.List;
   23   import java.util.concurrent.atomic.AtomicBoolean;
   24   
   25   import org.apache.activemq.Service;
   26   import org.apache.activemq.broker.BrokerService;
   27   import org.apache.activemq.broker.BrokerServiceAware;
   28   import org.apache.activemq.broker.TransportConnector;
   29   import org.apache.activemq.command.BrokerInfo;
   30   import org.apache.activemq.command.Command;
   31   import org.apache.activemq.command.CommandTypes;
   32   import org.apache.activemq.command.ConnectionId;
   33   import org.apache.activemq.command.ConnectionInfo;
   34   import org.apache.activemq.command.MessageDispatch;
   35   import org.apache.activemq.command.ProducerInfo;
   36   import org.apache.activemq.command.Response;
   37   import org.apache.activemq.command.SessionInfo;
   38   import org.apache.activemq.command.ShutdownInfo;
   39   import org.apache.activemq.transport.DefaultTransportListener;
   40   import org.apache.activemq.transport.Transport;
   41   import org.apache.activemq.transport.TransportDisposedIOException;
   42   import org.apache.activemq.transport.TransportFactory;
   43   import org.apache.activemq.util.IdGenerator;
   44   import org.apache.activemq.util.ServiceStopper;
   45   import org.apache.activemq.util.ServiceSupport;
   46   import org.apache.commons.logging.Log;
   47   import org.apache.commons.logging.LogFactory;
   48   
   49   /**
   50    * Connects a Slave Broker to a Master when using <a
   51    * href="http://activemq.apache.org/masterslave.html">Master Slave</a> for High
   52    * Availability of messages.
   53    * 
   54    * @org.apache.xbean.XBean
   55    * @version $Revision$
   56    */
   57   public class MasterConnector implements Service, BrokerServiceAware {
   58   
   59       private static final Log LOG = LogFactory.getLog(MasterConnector.class);
   60       private BrokerService broker;
   61       private URI remoteURI;
   62       private URI localURI;
   63       private Transport localBroker;
   64       private Transport remoteBroker;
   65       private TransportConnector connector;
   66       private AtomicBoolean started = new AtomicBoolean(false);
   67       private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
   68       private final IdGenerator idGenerator = new IdGenerator();
   69       private String userName;
   70       private String password;
   71       private ConnectionInfo connectionInfo;
   72       private SessionInfo sessionInfo;
   73       private ProducerInfo producerInfo;
   74       private final AtomicBoolean masterActive = new AtomicBoolean();
   75       private BrokerInfo brokerInfo;
   76       private boolean firstConnection=true;
   77       private boolean failedToStart;
   78   
   79       public MasterConnector() {
   80       }
   81   
   82       public MasterConnector(String remoteUri) throws URISyntaxException {
   83           remoteURI = new URI(remoteUri);
   84       }
   85   
   86       public void setBrokerService(BrokerService broker) {
   87           this.broker = broker;
   88           if (localURI == null) {
   89               localURI = broker.getVmConnectorURI();
   90           }
   91           if (connector == null) {
   92               List transportConnectors = broker.getTransportConnectors();
   93               if (!transportConnectors.isEmpty()) {
   94                   connector = (TransportConnector)transportConnectors.get(0);
   95               }
   96           }
   97       }
   98   
   99       public boolean isSlave() {
  100           return masterActive.get();
  101       }
  102   
  103       protected void restartBridge() throws Exception {
  104           localBroker.oneway(connectionInfo);
  105           remoteBroker.oneway(connectionInfo);
  106           localBroker.oneway(sessionInfo);
  107           remoteBroker.oneway(sessionInfo);
  108           remoteBroker.oneway(producerInfo);
  109           remoteBroker.oneway(brokerInfo);
  110       }
  111       
  112       public void start() throws Exception {
  113           if (!started.compareAndSet(false, true)) {
  114               return;
  115           }
  116           if (remoteURI == null) {
  117               throw new IllegalArgumentException("You must specify a remoteURI");
  118           }
  119           localBroker = TransportFactory.connect(localURI);
  120           remoteBroker = TransportFactory.connect(remoteURI);
  121           LOG.info("Starting a slave connection between " + localBroker + " and " + remoteBroker);
  122           localBroker.setTransportListener(new DefaultTransportListener() {
  123   
  124               public void onCommand(Object command) {
  125               }
  126   
  127               public void onException(IOException error) {
  128                   if (started.get()) {
  129                       serviceLocalException(error);
  130                   }
  131               }
  132           });
  133           remoteBroker.setTransportListener(new DefaultTransportListener() {
  134   
  135               public void onCommand(Object o) {
  136                   Command command = (Command)o;
  137                   if (started.get()) {
  138                       serviceRemoteCommand(command);
  139                   }
  140               }
  141   
  142               public void onException(IOException error) {
  143                   if (started.get()) {
  144                       serviceRemoteException(error);
  145                   }
  146               }
  147               
  148               public void transportResumed() {
  149               	try{
  150               		if(!firstConnection){
  151   	            		localBroker = TransportFactory.connect(localURI);
  152   	            		localBroker.setTransportListener(new DefaultTransportListener() {
  153   	
  154   	                        public void onCommand(Object command) {
  155   	                        }
  156   	
  157   	                        public void onException(IOException error) {
  158   	                            if (started.get()) {
  159   	                                serviceLocalException(error);
  160   	                            }
  161   	                        }
  162   	                    });
  163   	            		localBroker.start();
  164   	            		restartBridge();
  165   	            		LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished.");
  166               		}else{
  167               			firstConnection=false;
  168               		}
  169               	}catch(IOException e){
  170               		LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e);
  171               	}catch(Exception e){
  172               		LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e);
  173               	}
  174               	
  175               }
  176           });
  177           try {
  178               localBroker.start();
  179               remoteBroker.start();
  180               startBridge();
  181               masterActive.set(true);
  182           } catch (Exception e) {
  183               masterActive.set(false);
  184               if(!stoppedBeforeStart.get()){
  185               	LOG.error("Failed to start network bridge: " + e, e);
  186               }else{
  187               	LOG.info("Slave stopped before connected to the master.");
  188               }
  189               setFailedToStart(true);
  190           }    
  191       }
  192   
  193       protected void startBridge() throws Exception {
  194           connectionInfo = new ConnectionInfo();
  195           connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
  196           connectionInfo.setClientId(idGenerator.generateId());
  197           connectionInfo.setUserName(userName);
  198           connectionInfo.setPassword(password);
  199           connectionInfo.setBrokerMasterConnector(true);
  200           sessionInfo = new SessionInfo(connectionInfo, 1);
  201           producerInfo = new ProducerInfo(sessionInfo, 1);
  202           producerInfo.setResponseRequired(false);
  203           if (connector != null) {
  204               brokerInfo = connector.getBrokerInfo();
  205           } else {
  206               brokerInfo = new BrokerInfo();
  207           }
  208           brokerInfo.setBrokerName(broker.getBrokerName());
  209           brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
  210           brokerInfo.setSlaveBroker(true);
  211           brokerInfo.setPassiveSlave(broker.isPassiveSlave());
  212           restartBridge();
  213           LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
  214       }
  215   
  216       public void stop() throws Exception {
  217           if (!started.compareAndSet(true, false)||!masterActive.get()) {
  218               return;
  219           }
  220           masterActive.set(false);
  221           try {
  222               // if (connectionInfo!=null){
  223               // localBroker.request(connectionInfo.createRemoveCommand());
  224               // }
  225               // localBroker.setTransportListener(null);
  226               // remoteBroker.setTransportListener(null);
  227               remoteBroker.oneway(new ShutdownInfo());
  228               localBroker.oneway(new ShutdownInfo());
  229           } catch (IOException e) {
  230               LOG.debug("Caught exception stopping", e);
  231           } finally {
  232               ServiceStopper ss = new ServiceStopper();
  233               ss.stop(localBroker);
  234               ss.stop(remoteBroker);
  235               ss.throwFirstException();
  236           }
  237       }
  238       
  239       public void stopBeforeConnected()throws Exception{
  240           masterActive.set(false);
  241           started.set(false);
  242           stoppedBeforeStart.set(true);
  243           ServiceStopper ss = new ServiceStopper();
  244           ss.stop(localBroker);
  245           ss.stop(remoteBroker);
  246       }
  247   
  248       protected void serviceRemoteException(IOException error) {
  249           LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
  250           shutDown();
  251       }
  252   
  253       protected void serviceRemoteCommand(Command command) {
  254           try {
  255               if (command.isMessageDispatch()) {
  256                   MessageDispatch md = (MessageDispatch)command;
  257                   command = md.getMessage();
  258               }
  259               if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
  260                   LOG.warn("The Master has shutdown");
  261                   shutDown();
  262               } else {
  263                   boolean responseRequired = command.isResponseRequired();
  264                   int commandId = command.getCommandId();
  265                   if (responseRequired) {
  266                       Response response = (Response)localBroker.request(command);
  267                       response.setCorrelationId(commandId);
  268                       remoteBroker.oneway(response);
  269                   } else {
  270                       localBroker.oneway(command);
  271                   }
  272               }
  273           } catch (IOException e) {
  274               serviceRemoteException(e);
  275           }
  276       }
  277   
  278       protected void serviceLocalException(Throwable error) {
  279       	if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){
  280   	        LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
  281   	        ServiceSupport.dispose(this);
  282       	}else{
  283       		LOG.info(error.getMessage());
  284       	}
  285       }
  286   
  287       /**
  288        * @return Returns the localURI.
  289        */
  290       public URI getLocalURI() {
  291           return localURI;
  292       }
  293   
  294       /**
  295        * @param localURI The localURI to set.
  296        */
  297       public void setLocalURI(URI localURI) {
  298           this.localURI = localURI;
  299       }
  300   
  301       /**
  302        * @return Returns the remoteURI.
  303        */
  304       public URI getRemoteURI() {
  305           return remoteURI;
  306       }
  307   
  308       /**
  309        * @param remoteURI The remoteURI to set.
  310        */
  311       public void setRemoteURI(URI remoteURI) {
  312           this.remoteURI = remoteURI;
  313       }
  314   
  315       /**
  316        * @return Returns the password.
  317        */
  318       public String getPassword() {
  319           return password;
  320       }
  321   
  322       /**
  323        * @param password The password to set.
  324        */
  325       public void setPassword(String password) {
  326           this.password = password;
  327       }
  328   
  329       /**
  330        * @return Returns the userName.
  331        */
  332       public String getUserName() {
  333           return userName;
  334       }
  335   
  336       /**
  337        * @param userName The userName to set.
  338        */
  339       public void setUserName(String userName) {
  340           this.userName = userName;
  341       }
  342   
  343       private void shutDown() {
  344           masterActive.set(false);
  345           broker.masterFailed();
  346           ServiceSupport.dispose(this);
  347       }
  348   
  349   	public boolean isStoppedBeforeStart() {
  350   		return stoppedBeforeStart.get();
  351   	}
  352   
  353       /**
  354        * Get the failedToStart
  355        * @return the failedToStart
  356        */
  357       public boolean isFailedToStart() {
  358           return this.failedToStart;
  359       }
  360   
  361       /**
  362        * Set the failedToStart
  363        * @param failedToStart the failedToStart to set
  364        */
  365       public void setFailedToStart(boolean failedToStart) {
  366           this.failedToStart = failedToStart;
  367       }
  368   
  369   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » ft » [javadoc | source]