1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.memory.list; 18 19 import java.util.ArrayList; 20 import java.util.HashMap; 21 import java.util.Iterator; 22 import java.util.List; 23 import java.util.Map; 24 import java.util.Set; 25 import org.apache.activemq.broker.region.MessageReference; 26 import org.apache.activemq.broker.region.Subscription; 27 import org.apache.activemq.command.ActiveMQDestination; 28 import org.apache.activemq.command.ActiveMQMessage; 29 import org.apache.activemq.command.Message; 30 import org.apache.activemq.filter.DestinationMap; 31 import org.apache.activemq.memory.buffer.MessageBuffer; 32 import org.apache.activemq.memory.buffer.MessageQueue; 33 import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer; 34 35 /** 36 * An implementation of {@link MessageList} which maintains a separate message 37 * list for each destination to reduce contention on the list and to speed up 38 * recovery times by only recovering the interested topics. 39 * 40 * @version $Revision: 1.1 $ 41 */ 42 public class DestinationBasedMessageList implements MessageList { 43 44 private MessageBuffer messageBuffer; 45 private Map<ActiveMQDestination, MessageQueue> queueIndex = new HashMap<ActiveMQDestination, MessageQueue>(); 46 private DestinationMap subscriptionIndex = new DestinationMap(); 47 private Object lock = new Object(); 48 49 public DestinationBasedMessageList(int maximumSize) { 50 this(new OrderBasedMessageBuffer(maximumSize)); 51 } 52 53 public DestinationBasedMessageList(MessageBuffer buffer) { 54 messageBuffer = buffer; 55 } 56 57 public void add(MessageReference node) { 58 ActiveMQMessage message = (ActiveMQMessage) node.getMessageHardRef(); 59 ActiveMQDestination destination = message.getDestination(); 60 MessageQueue queue = null; 61 synchronized (lock) { 62 queue = queueIndex.get(destination); 63 if (queue == null) { 64 queue = messageBuffer.createMessageQueue(); 65 queueIndex.put(destination, queue); 66 subscriptionIndex.put(destination, queue); 67 } 68 } 69 queue.add(node); 70 } 71 72 public List<MessageReference> getMessages(Subscription sub) { 73 return getMessages(sub.getConsumerInfo().getDestination()); 74 } 75 76 public List<MessageReference> getMessages(ActiveMQDestination destination) { 77 Set set = null; 78 synchronized (lock) { 79 set = subscriptionIndex.get(destination); 80 } 81 List<MessageReference> answer = new ArrayList<MessageReference>(); 82 for (Iterator iter = set.iterator(); iter.hasNext();) { 83 MessageQueue queue = (MessageQueue) iter.next(); 84 queue.appendMessages(answer); 85 } 86 return answer; 87 } 88 89 public Message[] browse(ActiveMQDestination destination) { 90 List<MessageReference> result = getMessages(destination); 91 return result.toArray(new Message[result.size()]); 92 } 93 94 95 public void clear() { 96 messageBuffer.clear(); 97 } 98 }