Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » ft » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.ft;
   18   
   19   import java.util.Map;
   20   import java.util.concurrent.ConcurrentHashMap;
   21   import java.util.concurrent.atomic.AtomicBoolean;
   22   
   23   import org.apache.activemq.broker.Connection;
   24   import org.apache.activemq.broker.ConnectionContext;
   25   import org.apache.activemq.broker.ConsumerBrokerExchange;
   26   import org.apache.activemq.broker.InsertableMutableBrokerFilter;
   27   import org.apache.activemq.broker.MutableBrokerFilter;
   28   import org.apache.activemq.broker.ProducerBrokerExchange;
   29   import org.apache.activemq.broker.region.Subscription;
   30   import org.apache.activemq.command.Command;
   31   import org.apache.activemq.command.ConnectionControl;
   32   import org.apache.activemq.command.ConnectionInfo;
   33   import org.apache.activemq.command.ConsumerId;
   34   import org.apache.activemq.command.ConsumerInfo;
   35   import org.apache.activemq.command.DestinationInfo;
   36   import org.apache.activemq.command.ExceptionResponse;
   37   import org.apache.activemq.command.Message;
   38   import org.apache.activemq.command.MessageAck;
   39   import org.apache.activemq.command.MessageDispatch;
   40   import org.apache.activemq.command.MessageDispatchNotification;
   41   import org.apache.activemq.command.ProducerInfo;
   42   import org.apache.activemq.command.RemoveInfo;
   43   import org.apache.activemq.command.RemoveSubscriptionInfo;
   44   import org.apache.activemq.command.Response;
   45   import org.apache.activemq.command.SessionInfo;
   46   import org.apache.activemq.command.TransactionId;
   47   import org.apache.activemq.command.TransactionInfo;
   48   import org.apache.activemq.transport.MutexTransport;
   49   import org.apache.activemq.transport.ResponseCorrelator;
   50   import org.apache.activemq.transport.Transport;
   51   import org.apache.commons.logging.Log;
   52   import org.apache.commons.logging.LogFactory;
   53   
   54   /**
   55    * The Message Broker which passes messages to a slave
   56    * 
   57    * @version $Revision: 1.8 $
   58    */
   59   public class MasterBroker extends InsertableMutableBrokerFilter {
   60   
   61       private static final Log LOG = LogFactory.getLog(MasterBroker.class);
   62       private Transport slave;
   63       private AtomicBoolean started = new AtomicBoolean(false);
   64   
   65       private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId, ConsumerId>();
   66       
   67       /**
   68        * Constructor
   69        * 
   70        * @param parent
   71        * @param transport
   72        */
   73       public MasterBroker(MutableBrokerFilter parent, Transport transport) {
   74           super(parent);
   75           this.slave = transport;
   76           this.slave = new MutexTransport(slave);
   77           this.slave = new ResponseCorrelator(slave);
   78           this.slave.setTransportListener(transport.getTransportListener());
   79       }
   80   
   81       /**
   82        * start processing this broker
   83        */
   84       public void startProcessing() {
   85           started.set(true);
   86           try {
   87               Connection[] connections = getClients();
   88               ConnectionControl command = new ConnectionControl();
   89               command.setFaultTolerant(true);
   90               if (connections != null) {
   91                   for (int i = 0; i < connections.length; i++) {
   92                       if (connections[i].isActive() && connections[i].isManageable()) {
   93                           connections[i].dispatchAsync(command);
   94                       }
   95                   }
   96               }
   97           } catch (Exception e) {
   98               LOG.error("Failed to get Connections", e);
   99           }
  100       }
  101   
  102       /**
  103        * stop the broker
  104        * 
  105        * @throws Exception
  106        */
  107       public void stop() throws Exception {
  108           stopProcessing();
  109       }
  110   
  111       /**
  112        * stop processing this broker
  113        */
  114       public void stopProcessing() {
  115           if (started.compareAndSet(true, false)) {
  116               remove();
  117           }
  118       }
  119   
  120       /**
  121        * A client is establishing a connection with the broker.
  122        * 
  123        * @param context
  124        * @param info
  125        * @throws Exception
  126        */
  127       public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
  128           super.addConnection(context, info);
  129           sendAsyncToSlave(info);
  130       }
  131   
  132       /**
  133        * A client is disconnecting from the broker.
  134        * 
  135        * @param context the environment the operation is being executed under.
  136        * @param info
  137        * @param error null if the client requested the disconnect or the error
  138        *                that caused the client to disconnect.
  139        * @throws Exception
  140        */
  141       public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
  142           super.removeConnection(context, info, error);
  143           sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
  144       }
  145   
  146       /**
  147        * Adds a session.
  148        * 
  149        * @param context
  150        * @param info
  151        * @throws Exception
  152        */
  153       public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
  154           super.addSession(context, info);
  155           sendAsyncToSlave(info);
  156       }
  157   
  158       /**
  159        * Removes a session.
  160        * 
  161        * @param context
  162        * @param info
  163        * @throws Exception
  164        */
  165       public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
  166           super.removeSession(context, info);
  167           sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
  168       }
  169   
  170       /**
  171        * Adds a producer.
  172        * 
  173        * @param context the enviorment the operation is being executed under.
  174        * @param info
  175        * @throws Exception
  176        */
  177       public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
  178           super.addProducer(context, info);
  179           sendAsyncToSlave(info);
  180       }
  181   
  182       /**
  183        * Removes a producer.
  184        * 
  185        * @param context the environment the operation is being executed under.
  186        * @param info
  187        * @throws Exception
  188        */
  189       public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
  190           super.removeProducer(context, info);
  191           sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
  192       }
  193   
  194       /**
  195        * add a consumer
  196        * 
  197        * @param context
  198        * @param info
  199        * @return the associated subscription
  200        * @throws Exception
  201        */
  202       public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  203           sendSyncToSlave(info);
  204           consumers.put(info.getConsumerId(), info.getConsumerId());
  205           return super.addConsumer(context, info);
  206       }
  207   
  208       @Override
  209       public void removeConsumer(ConnectionContext context, ConsumerInfo info)
  210               throws Exception {
  211           super.removeConsumer(context, info);
  212           consumers.remove(info.getConsumerId());
  213           sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
  214      }
  215   
  216       /**
  217        * remove a subscription
  218        * 
  219        * @param context
  220        * @param info
  221        * @throws Exception
  222        */
  223       public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
  224           super.removeSubscription(context, info);
  225           sendAsyncToSlave(info);
  226       }
  227       
  228       @Override
  229       public void addDestinationInfo(ConnectionContext context,
  230               DestinationInfo info) throws Exception {
  231           super.addDestinationInfo(context, info);
  232           if (info.getDestination().isTemporary()) {
  233               sendAsyncToSlave(info);
  234           }
  235       }
  236   
  237       @Override
  238       public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
  239           super.removeDestinationInfo(context, info);
  240           if (info.getDestination().isTemporary()) {
  241               sendAsyncToSlave(info);
  242           }
  243       }
  244       
  245       /**
  246        * begin a transaction
  247        * 
  248        * @param context
  249        * @param xid
  250        * @throws Exception
  251        */
  252       public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  253           TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN);
  254           sendAsyncToSlave(info);
  255           super.beginTransaction(context, xid);
  256       }
  257   
  258       /**
  259        * Prepares a transaction. Only valid for xa transactions.
  260        * 
  261        * @param context
  262        * @param xid
  263        * @return the state
  264        * @throws Exception
  265        */
  266       public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  267           TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE);
  268           sendSyncToSlave(info);
  269           int result = super.prepareTransaction(context, xid);
  270           return result;
  271       }
  272   
  273       /**
  274        * Rollsback a transaction.
  275        * 
  276        * @param context
  277        * @param xid
  278        * @throws Exception
  279        */
  280       public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  281           TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK);
  282           sendAsyncToSlave(info);
  283           super.rollbackTransaction(context, xid);
  284       }
  285   
  286       /**
  287        * Commits a transaction.
  288        * 
  289        * @param context
  290        * @param xid
  291        * @param onePhase
  292        * @throws Exception
  293        */
  294       public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
  295           TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.COMMIT_ONE_PHASE);
  296           sendSyncToSlave(info);
  297           super.commitTransaction(context, xid, onePhase);
  298       }
  299   
  300       /**
  301        * Forgets a transaction.
  302        * 
  303        * @param context
  304        * @param xid
  305        * @throws Exception
  306        */
  307       public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  308           TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET);
  309           sendAsyncToSlave(info);
  310           super.forgetTransaction(context, xid);
  311       }
  312   
  313       /**
  314        * Notifiy the Broker that a dispatch will happen
  315        * Do in 'pre' so that slave will avoid getting ack before dispatch
  316        * similar logic to send() below.
  317        * @param messageDispatch
  318        */
  319       public void preProcessDispatch(MessageDispatch messageDispatch) {
  320           super.preProcessDispatch(messageDispatch);
  321           MessageDispatchNotification mdn = new MessageDispatchNotification();
  322           mdn.setConsumerId(messageDispatch.getConsumerId());
  323           mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
  324           mdn.setDestination(messageDispatch.getDestination());
  325           if (messageDispatch.getMessage() != null) {
  326               Message msg = messageDispatch.getMessage();
  327               mdn.setMessageId(msg.getMessageId());
  328               if (consumers.containsKey(messageDispatch.getConsumerId())) {
  329                   sendSyncToSlave(mdn);
  330               }
  331           }
  332       }
  333   
  334       /**
  335        * @param context
  336        * @param message
  337        * @throws Exception
  338        */
  339       public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
  340           /**
  341            * A message can be dispatched before the super.send() method returns so -
  342            * here the order is switched to avoid problems on the slave with
  343            * receiving acks for messages not received yet
  344            */
  345           sendSyncToSlave(message);
  346           super.send(producerExchange, message);
  347       }
  348   
  349       /**
  350        * @param context
  351        * @param ack
  352        * @throws Exception
  353        */
  354       public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
  355           sendToSlave(ack);
  356           super.acknowledge(consumerExchange, ack);
  357       }
  358   
  359       public boolean isFaultTolerantConfiguration() {
  360           return true;
  361       }
  362   
  363       protected void sendToSlave(Message message) {
  364           if (message.isResponseRequired()) {
  365               sendSyncToSlave(message);
  366           } else {
  367               sendAsyncToSlave(message);
  368           }
  369       }
  370   
  371       protected void sendToSlave(MessageAck ack) {
  372           if (ack.isResponseRequired()) {
  373               sendAsyncToSlave(ack);
  374           } else {
  375               sendSyncToSlave(ack);
  376           }
  377       }
  378   
  379       protected void sendAsyncToSlave(Command command) {
  380           try {
  381               slave.oneway(command);
  382           } catch (Throwable e) {
  383               LOG.error("Slave Failed", e);
  384               stopProcessing();
  385           }
  386       }
  387   
  388       protected void sendSyncToSlave(Command command) {
  389           try {
  390               Response response = (Response)slave.request(command);
  391               if (response.isException()) {
  392                   ExceptionResponse er = (ExceptionResponse)response;
  393                   LOG.error("Slave Failed", er.getException());
  394               }
  395           } catch (Throwable e) {
  396               LOG.error("Slave Failed", e);
  397           }
  398       }
  399   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » ft » [javadoc | source]