Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.cursors;
   18   
   19   import java.util.ArrayList;
   20   import java.util.Iterator;
   21   import java.util.LinkedList;
   22   import java.util.List;
   23   import org.apache.activemq.broker.ConnectionContext;
   24   import org.apache.activemq.broker.region.Destination;
   25   import org.apache.activemq.broker.region.MessageReference;
   26   import org.apache.activemq.broker.region.QueueMessageReference;
   27   
   28   /**
   29    * hold pending messages in a linked list (messages awaiting disptach to a
   30    * consumer) cursor
   31    * 
   32    * @version $Revision: 915914 $
   33    */
   34   public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
   35       private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
   36       private Iterator<MessageReference> iter;
   37       public VMPendingMessageCursor() {
   38           this.useCache = false;
   39       }
   40   
   41       @Override
   42       public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
   43               throws Exception {
   44           List<MessageReference> rc = new ArrayList<MessageReference>();
   45           for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
   46               MessageReference r = iterator.next();
   47               if (r.getRegionDestination() == destination) {
   48                   r.decrementReferenceCount();
   49                   rc.add(r);
   50                   iterator.remove();
   51               }
   52           }
   53           return rc;
   54       }
   55   
   56       /**
   57        * @return true if there are no pending messages
   58        */
   59       @Override
   60       public synchronized boolean isEmpty() {
   61           if (list.isEmpty()) {
   62               return true;
   63           } else {
   64               for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
   65                   MessageReference node = iterator.next();
   66                   if (node == QueueMessageReference.NULL_MESSAGE) {
   67                       continue;
   68                   }
   69                   if (!node.isDropped()) {
   70                       return false;
   71                   }
   72                   // We can remove dropped references.
   73                   iterator.remove();
   74               }
   75               return true;
   76           }
   77       }
   78   
   79       /**
   80        * reset the cursor
   81        */
   82       @Override
   83       public synchronized void reset() {
   84           iter = list.listIterator();
   85           last = null;
   86       }
   87   
   88       /**
   89        * add message to await dispatch
   90        * 
   91        * @param node
   92        */
   93       @Override
   94       public synchronized void addMessageLast(MessageReference node) {
   95           node.incrementReferenceCount();
   96           list.addLast(node);
   97       }
   98   
   99       /**
  100        * add message to await dispatch
  101        * 
  102        * @param position
  103        * @param node
  104        */
  105       @Override
  106       public synchronized void addMessageFirst(MessageReference node) {
  107           node.incrementReferenceCount();
  108           list.addFirst(node);
  109       }
  110   
  111       /**
  112        * @return true if there pending messages to dispatch
  113        */
  114       @Override
  115       public synchronized boolean hasNext() {
  116           return iter.hasNext();
  117       }
  118   
  119       /**
  120        * @return the next pending message
  121        */
  122       @Override
  123       public synchronized MessageReference next() {
  124           last = iter.next();
  125           if (last != null) {
  126               last.incrementReferenceCount();
  127           }
  128           return last;
  129       }
  130   
  131       /**
  132        * remove the message at the cursor position
  133        */
  134       @Override
  135       public synchronized void remove() {
  136           if (last != null) {
  137               last.decrementReferenceCount();
  138           }
  139           iter.remove();
  140       }
  141   
  142       /**
  143        * @return the number of pending messages
  144        */
  145       @Override
  146       public synchronized int size() {
  147           return list.size();
  148       }
  149   
  150       /**
  151        * clear all pending messages
  152        */
  153       @Override
  154       public synchronized void clear() {
  155           for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
  156               MessageReference ref = i.next();
  157               ref.decrementReferenceCount();
  158           }
  159           list.clear();
  160       }
  161   
  162       @Override
  163       public synchronized void remove(MessageReference node) {
  164           for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
  165               MessageReference ref = i.next();
  166               if (node.getMessageId().equals(ref.getMessageId())) {
  167                   ref.decrementReferenceCount();
  168                   i.remove();
  169                   break;
  170               }
  171           }
  172       }
  173   
  174       /**
  175        * Page in a restricted number of messages
  176        * 
  177        * @param maxItems
  178        * @return a list of paged in messages
  179        */
  180       @Override
  181       public LinkedList<MessageReference> pageInList(int maxItems) {
  182           LinkedList<MessageReference> result = new LinkedList<MessageReference>();
  183           for (MessageReference ref: list) {
  184               ref.incrementReferenceCount();
  185               result.add(ref);
  186               if (result.size() >= maxItems) {
  187                   break;
  188               }
  189           }
  190           return result;
  191       }
  192   
  193       @Override
  194       public boolean isTransient() {
  195           return true;
  196       }
  197   
  198       @Override
  199       public void destroy() throws Exception {
  200           super.destroy();
  201           clear();
  202       }
  203   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]