Home » activemq-parent-5.3.1-source-release » org.apache » activemq » memory » list » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.memory.list;
   18   
   19   import java.util.ArrayList;
   20   import java.util.HashMap;
   21   import java.util.Iterator;
   22   import java.util.List;
   23   import java.util.Map;
   24   import java.util.Set;
   25   import org.apache.activemq.broker.region.MessageReference;
   26   import org.apache.activemq.broker.region.Subscription;
   27   import org.apache.activemq.command.ActiveMQDestination;
   28   import org.apache.activemq.command.ActiveMQMessage;
   29   import org.apache.activemq.command.Message;
   30   import org.apache.activemq.filter.DestinationMap;
   31   import org.apache.activemq.memory.buffer.MessageBuffer;
   32   import org.apache.activemq.memory.buffer.MessageQueue;
   33   import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer;
   34   
   35   /**
   36    * An implementation of {@link MessageList} which maintains a separate message
   37    * list for each destination to reduce contention on the list and to speed up
   38    * recovery times by only recovering the interested topics.
   39    * 
   40    * @version $Revision: 1.1 $
   41    */
   42   public class DestinationBasedMessageList implements MessageList {
   43   
   44       private MessageBuffer messageBuffer;
   45       private Map<ActiveMQDestination, MessageQueue> queueIndex = new HashMap<ActiveMQDestination, MessageQueue>();
   46       private DestinationMap subscriptionIndex = new DestinationMap();
   47       private Object lock = new Object();
   48   
   49       public DestinationBasedMessageList(int maximumSize) {
   50           this(new OrderBasedMessageBuffer(maximumSize));
   51       }
   52       
   53       public DestinationBasedMessageList(MessageBuffer buffer) {
   54           messageBuffer = buffer;
   55       }
   56   
   57       public void add(MessageReference node) {
   58           ActiveMQMessage message = (ActiveMQMessage) node.getMessageHardRef();
   59           ActiveMQDestination destination = message.getDestination();
   60           MessageQueue queue = null;
   61           synchronized (lock) {
   62               queue = queueIndex.get(destination);
   63               if (queue == null) {
   64                   queue = messageBuffer.createMessageQueue();
   65                   queueIndex.put(destination, queue);
   66                   subscriptionIndex.put(destination, queue);
   67               }
   68           }
   69           queue.add(node);
   70       }
   71   
   72       public List<MessageReference> getMessages(Subscription sub) {
   73           return getMessages(sub.getConsumerInfo().getDestination());
   74       }
   75       
   76       public  List<MessageReference> getMessages(ActiveMQDestination destination) {
   77           Set set = null;
   78           synchronized (lock) {
   79               set = subscriptionIndex.get(destination);
   80           }
   81           List<MessageReference> answer = new ArrayList<MessageReference>();
   82           for (Iterator iter = set.iterator(); iter.hasNext();) {
   83               MessageQueue queue = (MessageQueue) iter.next();
   84               queue.appendMessages(answer);
   85           }
   86           return answer;
   87       }
   88       
   89       public Message[] browse(ActiveMQDestination destination) {
   90           List<MessageReference> result = getMessages(destination);
   91           return result.toArray(new Message[result.size()]);
   92       }
   93   
   94   
   95       public void clear() {
   96           messageBuffer.clear();
   97       }
   98   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » memory » list » [javadoc | source]