1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.cluster; 18 19 import java.util.ArrayList; 20 import java.util.List; 21 22 import org.apache.activemq.broker.Broker; 23 import org.apache.activemq.broker.BrokerFilter; 24 import org.apache.activemq.broker.ConnectionContext; 25 import org.apache.activemq.broker.region.Subscription; 26 import org.apache.activemq.command.ActiveMQDestination; 27 import org.apache.activemq.command.ConsumerId; 28 import org.apache.activemq.command.ConsumerInfo; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 32 /** 33 * Monitors for client connections that may fail to another broker - but this 34 * broker isn't aware they've gone. Can occur with network glitches or client 35 * error 36 * 37 * @version $Revision$ 38 */ 39 public class ConnectionSplitBroker extends BrokerFilter{ 40 private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class); 41 private List<ConsumerInfo>networkConsumerList = new ArrayList<ConsumerInfo>(); 42 public ConnectionSplitBroker(Broker next) { 43 super(next); 44 } 45 46 47 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) 48 throws Exception { 49 ActiveMQDestination dest = info.getDestination(); 50 51 synchronized (networkConsumerList) { 52 if (info.isNetworkSubscription()) { 53 networkConsumerList.add(info); 54 } else { 55 if (!networkConsumerList.isEmpty()) { 56 List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>(); 57 for (ConsumerInfo nc : networkConsumerList) { 58 if (!nc.isNetworkConsumersEmpty()) { 59 60 for (ConsumerId id : nc.getNetworkConsumerIds()) { 61 62 if (id.equals(info.getConsumerId())) { 63 nc.removeNetworkConsumerId(id); 64 if (nc.isNetworkConsumersEmpty()) { 65 gcList.add(nc); 66 } 67 } 68 } 69 } 70 } 71 for (ConsumerInfo nc : gcList) { 72 networkConsumerList.remove(nc); 73 super.removeConsumer(context, nc); 74 LOG.warn("Removed stale network consumer " + nc); 75 } 76 } 77 } 78 } 79 80 return super.addConsumer(context, info); 81 } 82 83 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 84 if (info.isNetworkSubscription()) { 85 86 synchronized (networkConsumerList) { 87 networkConsumerList.remove(info); 88 } 89 } 90 super.removeConsumer(context, info); 91 } 92 }