1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.ft; 18 19 import java.io.IOException; 20 import java.net.URI; 21 import java.net.URISyntaxException; 22 import java.util.List; 23 import java.util.concurrent.atomic.AtomicBoolean; 24 25 import org.apache.activemq.Service; 26 import org.apache.activemq.broker.BrokerService; 27 import org.apache.activemq.broker.BrokerServiceAware; 28 import org.apache.activemq.broker.TransportConnector; 29 import org.apache.activemq.command.BrokerInfo; 30 import org.apache.activemq.command.Command; 31 import org.apache.activemq.command.CommandTypes; 32 import org.apache.activemq.command.ConnectionId; 33 import org.apache.activemq.command.ConnectionInfo; 34 import org.apache.activemq.command.MessageDispatch; 35 import org.apache.activemq.command.ProducerInfo; 36 import org.apache.activemq.command.Response; 37 import org.apache.activemq.command.SessionInfo; 38 import org.apache.activemq.command.ShutdownInfo; 39 import org.apache.activemq.transport.DefaultTransportListener; 40 import org.apache.activemq.transport.Transport; 41 import org.apache.activemq.transport.TransportDisposedIOException; 42 import org.apache.activemq.transport.TransportFactory; 43 import org.apache.activemq.util.IdGenerator; 44 import org.apache.activemq.util.ServiceStopper; 45 import org.apache.activemq.util.ServiceSupport; 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 49 /** 50 * Connects a Slave Broker to a Master when using <a 51 * href="http://activemq.apache.org/masterslave.html">Master Slave</a> for High 52 * Availability of messages. 53 * 54 * @org.apache.xbean.XBean 55 * @version $Revision$ 56 */ 57 public class MasterConnector implements Service, BrokerServiceAware { 58 59 private static final Log LOG = LogFactory.getLog(MasterConnector.class); 60 private BrokerService broker; 61 private URI remoteURI; 62 private URI localURI; 63 private Transport localBroker; 64 private Transport remoteBroker; 65 private TransportConnector connector; 66 private AtomicBoolean started = new AtomicBoolean(false); 67 private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false); 68 private final IdGenerator idGenerator = new IdGenerator(); 69 private String userName; 70 private String password; 71 private ConnectionInfo connectionInfo; 72 private SessionInfo sessionInfo; 73 private ProducerInfo producerInfo; 74 private final AtomicBoolean masterActive = new AtomicBoolean(); 75 private BrokerInfo brokerInfo; 76 private boolean firstConnection=true; 77 private boolean failedToStart; 78 79 public MasterConnector() { 80 } 81 82 public MasterConnector(String remoteUri) throws URISyntaxException { 83 remoteURI = new URI(remoteUri); 84 } 85 86 public void setBrokerService(BrokerService broker) { 87 this.broker = broker; 88 if (localURI == null) { 89 localURI = broker.getVmConnectorURI(); 90 } 91 if (connector == null) { 92 List transportConnectors = broker.getTransportConnectors(); 93 if (!transportConnectors.isEmpty()) { 94 connector = (TransportConnector)transportConnectors.get(0); 95 } 96 } 97 } 98 99 public boolean isSlave() { 100 return masterActive.get(); 101 } 102 103 protected void restartBridge() throws Exception { 104 localBroker.oneway(connectionInfo); 105 remoteBroker.oneway(connectionInfo); 106 localBroker.oneway(sessionInfo); 107 remoteBroker.oneway(sessionInfo); 108 remoteBroker.oneway(producerInfo); 109 remoteBroker.oneway(brokerInfo); 110 } 111 112 public void start() throws Exception { 113 if (!started.compareAndSet(false, true)) { 114 return; 115 } 116 if (remoteURI == null) { 117 throw new IllegalArgumentException("You must specify a remoteURI"); 118 } 119 localBroker = TransportFactory.connect(localURI); 120 remoteBroker = TransportFactory.connect(remoteURI); 121 LOG.info("Starting a slave connection between " + localBroker + " and " + remoteBroker); 122 localBroker.setTransportListener(new DefaultTransportListener() { 123 124 public void onCommand(Object command) { 125 } 126 127 public void onException(IOException error) { 128 if (started.get()) { 129 serviceLocalException(error); 130 } 131 } 132 }); 133 remoteBroker.setTransportListener(new DefaultTransportListener() { 134 135 public void onCommand(Object o) { 136 Command command = (Command)o; 137 if (started.get()) { 138 serviceRemoteCommand(command); 139 } 140 } 141 142 public void onException(IOException error) { 143 if (started.get()) { 144 serviceRemoteException(error); 145 } 146 } 147 148 public void transportResumed() { 149 try{ 150 if(!firstConnection){ 151 localBroker = TransportFactory.connect(localURI); 152 localBroker.setTransportListener(new DefaultTransportListener() { 153 154 public void onCommand(Object command) { 155 } 156 157 public void onException(IOException error) { 158 if (started.get()) { 159 serviceLocalException(error); 160 } 161 } 162 }); 163 localBroker.start(); 164 restartBridge(); 165 LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished."); 166 }else{ 167 firstConnection=false; 168 } 169 }catch(IOException e){ 170 LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e); 171 }catch(Exception e){ 172 LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e); 173 } 174 175 } 176 }); 177 try { 178 localBroker.start(); 179 remoteBroker.start(); 180 startBridge(); 181 masterActive.set(true); 182 } catch (Exception e) { 183 masterActive.set(false); 184 if(!stoppedBeforeStart.get()){ 185 LOG.error("Failed to start network bridge: " + e, e); 186 }else{ 187 LOG.info("Slave stopped before connected to the master."); 188 } 189 setFailedToStart(true); 190 } 191 } 192 193 protected void startBridge() throws Exception { 194 connectionInfo = new ConnectionInfo(); 195 connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); 196 connectionInfo.setClientId(idGenerator.generateId()); 197 connectionInfo.setUserName(userName); 198 connectionInfo.setPassword(password); 199 connectionInfo.setBrokerMasterConnector(true); 200 sessionInfo = new SessionInfo(connectionInfo, 1); 201 producerInfo = new ProducerInfo(sessionInfo, 1); 202 producerInfo.setResponseRequired(false); 203 if (connector != null) { 204 brokerInfo = connector.getBrokerInfo(); 205 } else { 206 brokerInfo = new BrokerInfo(); 207 } 208 brokerInfo.setBrokerName(broker.getBrokerName()); 209 brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos()); 210 brokerInfo.setSlaveBroker(true); 211 brokerInfo.setPassiveSlave(broker.isPassiveSlave()); 212 restartBridge(); 213 LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established."); 214 } 215 216 public void stop() throws Exception { 217 if (!started.compareAndSet(true, false)||!masterActive.get()) { 218 return; 219 } 220 masterActive.set(false); 221 try { 222 // if (connectionInfo!=null){ 223 // localBroker.request(connectionInfo.createRemoveCommand()); 224 // } 225 // localBroker.setTransportListener(null); 226 // remoteBroker.setTransportListener(null); 227 remoteBroker.oneway(new ShutdownInfo()); 228 localBroker.oneway(new ShutdownInfo()); 229 } catch (IOException e) { 230 LOG.debug("Caught exception stopping", e); 231 } finally { 232 ServiceStopper ss = new ServiceStopper(); 233 ss.stop(localBroker); 234 ss.stop(remoteBroker); 235 ss.throwFirstException(); 236 } 237 } 238 239 public void stopBeforeConnected()throws Exception{ 240 masterActive.set(false); 241 started.set(false); 242 stoppedBeforeStart.set(true); 243 ServiceStopper ss = new ServiceStopper(); 244 ss.stop(localBroker); 245 ss.stop(remoteBroker); 246 } 247 248 protected void serviceRemoteException(IOException error) { 249 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); 250 shutDown(); 251 } 252 253 protected void serviceRemoteCommand(Command command) { 254 try { 255 if (command.isMessageDispatch()) { 256 MessageDispatch md = (MessageDispatch)command; 257 command = md.getMessage(); 258 } 259 if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) { 260 LOG.warn("The Master has shutdown"); 261 shutDown(); 262 } else { 263 boolean responseRequired = command.isResponseRequired(); 264 int commandId = command.getCommandId(); 265 if (responseRequired) { 266 Response response = (Response)localBroker.request(command); 267 response.setCorrelationId(commandId); 268 remoteBroker.oneway(response); 269 } else { 270 localBroker.oneway(command); 271 } 272 } 273 } catch (IOException e) { 274 serviceRemoteException(e); 275 } 276 } 277 278 protected void serviceLocalException(Throwable error) { 279 if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){ 280 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error); 281 ServiceSupport.dispose(this); 282 }else{ 283 LOG.info(error.getMessage()); 284 } 285 } 286 287 /** 288 * @return Returns the localURI. 289 */ 290 public URI getLocalURI() { 291 return localURI; 292 } 293 294 /** 295 * @param localURI The localURI to set. 296 */ 297 public void setLocalURI(URI localURI) { 298 this.localURI = localURI; 299 } 300 301 /** 302 * @return Returns the remoteURI. 303 */ 304 public URI getRemoteURI() { 305 return remoteURI; 306 } 307 308 /** 309 * @param remoteURI The remoteURI to set. 310 */ 311 public void setRemoteURI(URI remoteURI) { 312 this.remoteURI = remoteURI; 313 } 314 315 /** 316 * @return Returns the password. 317 */ 318 public String getPassword() { 319 return password; 320 } 321 322 /** 323 * @param password The password to set. 324 */ 325 public void setPassword(String password) { 326 this.password = password; 327 } 328 329 /** 330 * @return Returns the userName. 331 */ 332 public String getUserName() { 333 return userName; 334 } 335 336 /** 337 * @param userName The userName to set. 338 */ 339 public void setUserName(String userName) { 340 this.userName = userName; 341 } 342 343 private void shutDown() { 344 masterActive.set(false); 345 broker.masterFailed(); 346 ServiceSupport.dispose(this); 347 } 348 349 public boolean isStoppedBeforeStart() { 350 return stoppedBeforeStart.get(); 351 } 352 353 /** 354 * Get the failedToStart 355 * @return the failedToStart 356 */ 357 public boolean isFailedToStart() { 358 return this.failedToStart; 359 } 360 361 /** 362 * Set the failedToStart 363 * @param failedToStart the failedToStart to set 364 */ 365 public void setFailedToStart(boolean failedToStart) { 366 this.failedToStart = failedToStart; 367 } 368 369 }