1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.ft; 18 19 import java.util.Map; 20 import java.util.concurrent.ConcurrentHashMap; 21 import java.util.concurrent.atomic.AtomicBoolean; 22 23 import org.apache.activemq.broker.Connection; 24 import org.apache.activemq.broker.ConnectionContext; 25 import org.apache.activemq.broker.ConsumerBrokerExchange; 26 import org.apache.activemq.broker.InsertableMutableBrokerFilter; 27 import org.apache.activemq.broker.MutableBrokerFilter; 28 import org.apache.activemq.broker.ProducerBrokerExchange; 29 import org.apache.activemq.broker.region.Subscription; 30 import org.apache.activemq.command.Command; 31 import org.apache.activemq.command.ConnectionControl; 32 import org.apache.activemq.command.ConnectionInfo; 33 import org.apache.activemq.command.ConsumerId; 34 import org.apache.activemq.command.ConsumerInfo; 35 import org.apache.activemq.command.DestinationInfo; 36 import org.apache.activemq.command.ExceptionResponse; 37 import org.apache.activemq.command.Message; 38 import org.apache.activemq.command.MessageAck; 39 import org.apache.activemq.command.MessageDispatch; 40 import org.apache.activemq.command.MessageDispatchNotification; 41 import org.apache.activemq.command.ProducerInfo; 42 import org.apache.activemq.command.RemoveInfo; 43 import org.apache.activemq.command.RemoveSubscriptionInfo; 44 import org.apache.activemq.command.Response; 45 import org.apache.activemq.command.SessionInfo; 46 import org.apache.activemq.command.TransactionId; 47 import org.apache.activemq.command.TransactionInfo; 48 import org.apache.activemq.transport.MutexTransport; 49 import org.apache.activemq.transport.ResponseCorrelator; 50 import org.apache.activemq.transport.Transport; 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 54 /** 55 * The Message Broker which passes messages to a slave 56 * 57 * @version $Revision: 1.8 $ 58 */ 59 public class MasterBroker extends InsertableMutableBrokerFilter { 60 61 private static final Log LOG = LogFactory.getLog(MasterBroker.class); 62 private Transport slave; 63 private AtomicBoolean started = new AtomicBoolean(false); 64 65 private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId, ConsumerId>(); 66 67 /** 68 * Constructor 69 * 70 * @param parent 71 * @param transport 72 */ 73 public MasterBroker(MutableBrokerFilter parent, Transport transport) { 74 super(parent); 75 this.slave = transport; 76 this.slave = new MutexTransport(slave); 77 this.slave = new ResponseCorrelator(slave); 78 this.slave.setTransportListener(transport.getTransportListener()); 79 } 80 81 /** 82 * start processing this broker 83 */ 84 public void startProcessing() { 85 started.set(true); 86 try { 87 Connection[] connections = getClients(); 88 ConnectionControl command = new ConnectionControl(); 89 command.setFaultTolerant(true); 90 if (connections != null) { 91 for (int i = 0; i < connections.length; i++) { 92 if (connections[i].isActive() && connections[i].isManageable()) { 93 connections[i].dispatchAsync(command); 94 } 95 } 96 } 97 } catch (Exception e) { 98 LOG.error("Failed to get Connections", e); 99 } 100 } 101 102 /** 103 * stop the broker 104 * 105 * @throws Exception 106 */ 107 public void stop() throws Exception { 108 stopProcessing(); 109 } 110 111 /** 112 * stop processing this broker 113 */ 114 public void stopProcessing() { 115 if (started.compareAndSet(true, false)) { 116 remove(); 117 } 118 } 119 120 /** 121 * A client is establishing a connection with the broker. 122 * 123 * @param context 124 * @param info 125 * @throws Exception 126 */ 127 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 128 super.addConnection(context, info); 129 sendAsyncToSlave(info); 130 } 131 132 /** 133 * A client is disconnecting from the broker. 134 * 135 * @param context the environment the operation is being executed under. 136 * @param info 137 * @param error null if the client requested the disconnect or the error 138 * that caused the client to disconnect. 139 * @throws Exception 140 */ 141 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 142 super.removeConnection(context, info, error); 143 sendAsyncToSlave(new RemoveInfo(info.getConnectionId())); 144 } 145 146 /** 147 * Adds a session. 148 * 149 * @param context 150 * @param info 151 * @throws Exception 152 */ 153 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 154 super.addSession(context, info); 155 sendAsyncToSlave(info); 156 } 157 158 /** 159 * Removes a session. 160 * 161 * @param context 162 * @param info 163 * @throws Exception 164 */ 165 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 166 super.removeSession(context, info); 167 sendAsyncToSlave(new RemoveInfo(info.getSessionId())); 168 } 169 170 /** 171 * Adds a producer. 172 * 173 * @param context the enviorment the operation is being executed under. 174 * @param info 175 * @throws Exception 176 */ 177 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 178 super.addProducer(context, info); 179 sendAsyncToSlave(info); 180 } 181 182 /** 183 * Removes a producer. 184 * 185 * @param context the environment the operation is being executed under. 186 * @param info 187 * @throws Exception 188 */ 189 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 190 super.removeProducer(context, info); 191 sendAsyncToSlave(new RemoveInfo(info.getProducerId())); 192 } 193 194 /** 195 * add a consumer 196 * 197 * @param context 198 * @param info 199 * @return the associated subscription 200 * @throws Exception 201 */ 202 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 203 sendSyncToSlave(info); 204 consumers.put(info.getConsumerId(), info.getConsumerId()); 205 return super.addConsumer(context, info); 206 } 207 208 @Override 209 public void removeConsumer(ConnectionContext context, ConsumerInfo info) 210 throws Exception { 211 super.removeConsumer(context, info); 212 consumers.remove(info.getConsumerId()); 213 sendSyncToSlave(new RemoveInfo(info.getConsumerId())); 214 } 215 216 /** 217 * remove a subscription 218 * 219 * @param context 220 * @param info 221 * @throws Exception 222 */ 223 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 224 super.removeSubscription(context, info); 225 sendAsyncToSlave(info); 226 } 227 228 @Override 229 public void addDestinationInfo(ConnectionContext context, 230 DestinationInfo info) throws Exception { 231 super.addDestinationInfo(context, info); 232 if (info.getDestination().isTemporary()) { 233 sendAsyncToSlave(info); 234 } 235 } 236 237 @Override 238 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 239 super.removeDestinationInfo(context, info); 240 if (info.getDestination().isTemporary()) { 241 sendAsyncToSlave(info); 242 } 243 } 244 245 /** 246 * begin a transaction 247 * 248 * @param context 249 * @param xid 250 * @throws Exception 251 */ 252 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 253 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN); 254 sendAsyncToSlave(info); 255 super.beginTransaction(context, xid); 256 } 257 258 /** 259 * Prepares a transaction. Only valid for xa transactions. 260 * 261 * @param context 262 * @param xid 263 * @return the state 264 * @throws Exception 265 */ 266 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 267 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE); 268 sendSyncToSlave(info); 269 int result = super.prepareTransaction(context, xid); 270 return result; 271 } 272 273 /** 274 * Rollsback a transaction. 275 * 276 * @param context 277 * @param xid 278 * @throws Exception 279 */ 280 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 281 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK); 282 sendAsyncToSlave(info); 283 super.rollbackTransaction(context, xid); 284 } 285 286 /** 287 * Commits a transaction. 288 * 289 * @param context 290 * @param xid 291 * @param onePhase 292 * @throws Exception 293 */ 294 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 295 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.COMMIT_ONE_PHASE); 296 sendSyncToSlave(info); 297 super.commitTransaction(context, xid, onePhase); 298 } 299 300 /** 301 * Forgets a transaction. 302 * 303 * @param context 304 * @param xid 305 * @throws Exception 306 */ 307 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { 308 TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET); 309 sendAsyncToSlave(info); 310 super.forgetTransaction(context, xid); 311 } 312 313 /** 314 * Notifiy the Broker that a dispatch will happen 315 * Do in 'pre' so that slave will avoid getting ack before dispatch 316 * similar logic to send() below. 317 * @param messageDispatch 318 */ 319 public void preProcessDispatch(MessageDispatch messageDispatch) { 320 super.preProcessDispatch(messageDispatch); 321 MessageDispatchNotification mdn = new MessageDispatchNotification(); 322 mdn.setConsumerId(messageDispatch.getConsumerId()); 323 mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId()); 324 mdn.setDestination(messageDispatch.getDestination()); 325 if (messageDispatch.getMessage() != null) { 326 Message msg = messageDispatch.getMessage(); 327 mdn.setMessageId(msg.getMessageId()); 328 if (consumers.containsKey(messageDispatch.getConsumerId())) { 329 sendSyncToSlave(mdn); 330 } 331 } 332 } 333 334 /** 335 * @param context 336 * @param message 337 * @throws Exception 338 */ 339 public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { 340 /** 341 * A message can be dispatched before the super.send() method returns so - 342 * here the order is switched to avoid problems on the slave with 343 * receiving acks for messages not received yet 344 */ 345 sendSyncToSlave(message); 346 super.send(producerExchange, message); 347 } 348 349 /** 350 * @param context 351 * @param ack 352 * @throws Exception 353 */ 354 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 355 sendToSlave(ack); 356 super.acknowledge(consumerExchange, ack); 357 } 358 359 public boolean isFaultTolerantConfiguration() { 360 return true; 361 } 362 363 protected void sendToSlave(Message message) { 364 if (message.isResponseRequired()) { 365 sendSyncToSlave(message); 366 } else { 367 sendAsyncToSlave(message); 368 } 369 } 370 371 protected void sendToSlave(MessageAck ack) { 372 if (ack.isResponseRequired()) { 373 sendAsyncToSlave(ack); 374 } else { 375 sendSyncToSlave(ack); 376 } 377 } 378 379 protected void sendAsyncToSlave(Command command) { 380 try { 381 slave.oneway(command); 382 } catch (Throwable e) { 383 LOG.error("Slave Failed", e); 384 stopProcessing(); 385 } 386 } 387 388 protected void sendSyncToSlave(Command command) { 389 try { 390 Response response = (Response)slave.request(command); 391 if (response.isException()) { 392 ExceptionResponse er = (ExceptionResponse)response; 393 LOG.error("Slave Failed", er.getException()); 394 } 395 } catch (Throwable e) { 396 LOG.error("Slave Failed", e); 397 } 398 } 399 }