Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » cluster » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.cluster;
   18   
   19   import java.util.ArrayList;
   20   import java.util.List;
   21   
   22   import org.apache.activemq.broker.Broker;
   23   import org.apache.activemq.broker.BrokerFilter;
   24   import org.apache.activemq.broker.ConnectionContext;
   25   import org.apache.activemq.broker.region.Subscription;
   26   import org.apache.activemq.command.ActiveMQDestination;
   27   import org.apache.activemq.command.ConsumerId;
   28   import org.apache.activemq.command.ConsumerInfo;
   29   import org.apache.commons.logging.Log;
   30   import org.apache.commons.logging.LogFactory;
   31   
   32   /**
   33    * Monitors for client connections that may fail to another broker - but this
   34    * broker isn't aware they've gone. Can occur with network glitches or client
   35    * error
   36    * 
   37    * @version $Revision$
   38    */
   39   public class ConnectionSplitBroker extends BrokerFilter{
   40       private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class);
   41       private List<ConsumerInfo>networkConsumerList = new ArrayList<ConsumerInfo>();
   42       public ConnectionSplitBroker(Broker next) {
   43           super(next);
   44       }
   45   
   46           
   47       public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
   48               throws Exception {
   49           ActiveMQDestination dest = info.getDestination();
   50   
   51           synchronized (networkConsumerList) {
   52               if (info.isNetworkSubscription()) {
   53                   networkConsumerList.add(info);
   54               } else {
   55                   if (!networkConsumerList.isEmpty()) {
   56                       List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
   57                       for (ConsumerInfo nc : networkConsumerList) {
   58                           if (!nc.isNetworkConsumersEmpty()) {
   59                               
   60                               for (ConsumerId id : nc.getNetworkConsumerIds()) {
   61                                   
   62                                   if (id.equals(info.getConsumerId())) {
   63                                       nc.removeNetworkConsumerId(id);
   64                                       if (nc.isNetworkConsumersEmpty()) {
   65                                           gcList.add(nc);
   66                                       }
   67                                   }
   68                               }
   69                           }
   70                       }
   71                       for (ConsumerInfo nc : gcList) {
   72                           networkConsumerList.remove(nc);
   73                           super.removeConsumer(context, nc);
   74                           LOG.warn("Removed stale network consumer " + nc);
   75                       }
   76                   }
   77               }
   78           }
   79   
   80           return super.addConsumer(context, info);
   81       }
   82   
   83       public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
   84           if (info.isNetworkSubscription()) {
   85   
   86               synchronized (networkConsumerList) {
   87                   networkConsumerList.remove(info);
   88               }
   89           }
   90           super.removeConsumer(context, info);
   91       }
   92   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » cluster » [javadoc | source]