Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.cursors;
   18   
   19   import java.io.IOException;
   20   import java.util.Iterator;
   21   import java.util.LinkedList;
   22   import java.util.concurrent.atomic.AtomicBoolean;
   23   import java.util.concurrent.atomic.AtomicLong;
   24   
   25   import org.apache.activemq.broker.Broker;
   26   import org.apache.activemq.broker.ConnectionContext;
   27   import org.apache.activemq.broker.region.Destination;
   28   import org.apache.activemq.broker.region.MessageReference;
   29   import org.apache.activemq.broker.region.QueueMessageReference;
   30   import org.apache.activemq.command.Message;
   31   import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
   32   import org.apache.activemq.kaha.CommandMarshaller;
   33   import org.apache.activemq.kaha.ListContainer;
   34   import org.apache.activemq.kaha.Store;
   35   import org.apache.activemq.openwire.OpenWireFormat;
   36   import org.apache.activemq.usage.SystemUsage;
   37   import org.apache.activemq.usage.Usage;
   38   import org.apache.activemq.usage.UsageListener;
   39   import org.apache.commons.logging.Log;
   40   import org.apache.commons.logging.LogFactory;
   41   
   42   /**
   43    * persist pending messages pending message (messages awaiting dispatch to a
   44    * consumer) cursor
   45    * 
   46    * @version $Revision: 911759 $
   47    */
   48   public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
   49       private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
   50       private static final AtomicLong NAME_COUNT = new AtomicLong();
   51       protected Broker broker;
   52       private Store store;
   53       private String name;
   54       private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
   55       private ListContainer<MessageReference> diskList;
   56       private Iterator<MessageReference> iter;
   57       private Destination regionDestination;
   58       private boolean iterating;
   59       private boolean flushRequired;
   60       private AtomicBoolean started = new AtomicBoolean();
   61       /**
   62        * @param name
   63        * @param store
   64        */
   65       public FilePendingMessageCursor(Broker broker,String name) {
   66           this.useCache=false;
   67           this.broker = broker;
   68           //the store can be null if the BrokerService has persistence 
   69           //turned off
   70           this.store= broker.getTempDataStore();
   71           this.name = NAME_COUNT.incrementAndGet() + "_" + name;
   72       }
   73   
   74       public void start() throws Exception {
   75           if (started.compareAndSet(false, true)) {
   76               super.start();
   77               if (systemUsage != null) {
   78                   systemUsage.getMemoryUsage().addUsageListener(this);
   79               }
   80           }
   81       }
   82   
   83       public void stop() throws Exception {
   84           if (started.compareAndSet(true, false)) {
   85               super.stop();
   86               if (systemUsage != null) {
   87                   systemUsage.getMemoryUsage().removeUsageListener(this);
   88               }
   89           }
   90       }
   91   
   92       /**
   93        * @return true if there are no pending messages
   94        */
   95       public synchronized boolean isEmpty() {
   96           if(memoryList.isEmpty() && isDiskListEmpty()){
   97               return true;
   98           }
   99           for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
  100               MessageReference node = iterator.next();
  101               if (node== QueueMessageReference.NULL_MESSAGE){
  102                   continue;
  103               }
  104               if (!node.isDropped()) {
  105                   return false;
  106               }
  107               // We can remove dropped references.
  108               iterator.remove();
  109           }
  110           return isDiskListEmpty();
  111       }
  112       
  113       
  114   
  115       /**
  116        * reset the cursor
  117        */
  118       public synchronized void reset() {
  119           iterating = true;
  120           last = null;
  121           iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
  122       }
  123   
  124       public synchronized void release() {
  125           iterating = false;
  126           if (flushRequired) {
  127               flushRequired = false;
  128               flushToDisk();
  129           }
  130       }
  131   
  132       public synchronized void destroy() throws Exception {
  133           stop();
  134           for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
  135               Message node = (Message)i.next();
  136               node.decrementReferenceCount();
  137           }
  138           memoryList.clear();
  139           destroyDiskList();
  140       }
  141   
  142       private void destroyDiskList() throws Exception {
  143           if (!isDiskListEmpty()) {
  144               Iterator<MessageReference> iterator = diskList.iterator();
  145               while (iterator.hasNext()) {
  146                   iterator.next();
  147                   iterator.remove();
  148               }
  149               diskList.clear();
  150           }   
  151   	    store.deleteListContainer(name, "TopicSubscription");
  152       }
  153   
  154       public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
  155           LinkedList<MessageReference> result = new LinkedList<MessageReference>();
  156           int count = 0;
  157           for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
  158               MessageReference ref = i.next();
  159               ref.incrementReferenceCount();
  160               result.add(ref);
  161               count++;
  162           }
  163           if (count < maxItems && !isDiskListEmpty()) {
  164               for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
  165                   Message message = (Message)i.next();
  166                   message.setRegionDestination(regionDestination);
  167                   message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
  168                   message.incrementReferenceCount();
  169                   result.add(message);
  170                   count++;
  171               }
  172           }
  173           return result;
  174       }
  175   
  176       /**
  177        * add message to await dispatch
  178        * 
  179        * @param node
  180        */
  181       public synchronized void addMessageLast(MessageReference node) {
  182           if (!node.isExpired()) {
  183               try {
  184                   regionDestination = node.getMessage().getRegionDestination();
  185                   if (isDiskListEmpty()) {
  186                       if (hasSpace() || this.store==null) {
  187                           memoryList.add(node);
  188                           node.incrementReferenceCount();
  189                           return;
  190                       }
  191                   }
  192                   if (!hasSpace()) {
  193                       if (isDiskListEmpty()) {
  194                           expireOldMessages();
  195                           if (hasSpace()) {
  196                               memoryList.add(node);
  197                               node.incrementReferenceCount();
  198                               return;
  199                           } else {
  200                               flushToDisk();
  201                           }
  202                       }
  203                   }
  204                   systemUsage.getTempUsage().waitForSpace();
  205                   getDiskList().add(node);
  206   
  207               } catch (Exception e) {
  208                   LOG.error("Caught an Exception adding a message: " + node
  209                           + " first to FilePendingMessageCursor ", e);
  210                   throw new RuntimeException(e);
  211               }
  212           } else {
  213               discard(node);
  214           }
  215       }
  216   
  217       /**
  218        * add message to await dispatch
  219        * 
  220        * @param node
  221        */
  222       public synchronized void addMessageFirst(MessageReference node) {
  223           if (!node.isExpired()) {
  224               try {
  225                   regionDestination = node.getMessage().getRegionDestination();
  226                   if (isDiskListEmpty()) {
  227                       if (hasSpace()) {
  228                           memoryList.addFirst(node);
  229                           node.incrementReferenceCount();
  230                           return;
  231                       }
  232                   }
  233                   if (!hasSpace()) {
  234                       if (isDiskListEmpty()) {
  235                           expireOldMessages();
  236                           if (hasSpace()) {
  237                               memoryList.addFirst(node);
  238                               node.incrementReferenceCount();
  239                               return;
  240                           } else {
  241                               flushToDisk();
  242                           }
  243                       }
  244                   }
  245                   systemUsage.getTempUsage().waitForSpace();
  246                   node.decrementReferenceCount();
  247                   getDiskList().addFirst(node);
  248   
  249               } catch (Exception e) {
  250                   LOG.error("Caught an Exception adding a message: " + node
  251                           + " first to FilePendingMessageCursor ", e);
  252                   throw new RuntimeException(e);
  253               }
  254           } else {
  255               discard(node);
  256           }
  257       }
  258   
  259       /**
  260        * @return true if there pending messages to dispatch
  261        */
  262       public synchronized boolean hasNext() {
  263           return iter.hasNext();
  264       }
  265   
  266       /**
  267        * @return the next pending message
  268        */
  269       public synchronized MessageReference next() {
  270           Message message = (Message)iter.next();
  271           last = message;
  272           if (!isDiskListEmpty()) {
  273               // got from disk
  274               message.setRegionDestination(regionDestination);
  275               message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
  276           }
  277           message.incrementReferenceCount();
  278           return message;
  279       }
  280   
  281       /**
  282        * remove the message at the cursor position
  283        */
  284       public synchronized void remove() {
  285           iter.remove();
  286           if (last != null) {
  287           	last.decrementReferenceCount();
  288           }
  289       }
  290   
  291       /**
  292        * @param node
  293        * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
  294        */
  295       public synchronized void remove(MessageReference node) {
  296           if (memoryList.remove(node)) {
  297           	node.decrementReferenceCount();
  298           }
  299           if (!isDiskListEmpty()) {
  300               getDiskList().remove(node);
  301           }
  302       }
  303   
  304       /**
  305        * @return the number of pending messages
  306        */
  307       public synchronized int size() {
  308           return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
  309       }
  310   
  311       /**
  312        * clear all pending messages
  313        */
  314       public synchronized void clear() {
  315           memoryList.clear();
  316           if (!isDiskListEmpty()) {
  317               getDiskList().clear();
  318           }
  319           last=null;
  320       }
  321   
  322   	public synchronized boolean isFull() {
  323   
  324   		return super.isFull()
  325   				|| (systemUsage != null && systemUsage.getTempUsage().isFull());
  326   
  327   	}
  328   
  329       public boolean hasMessagesBufferedToDeliver() {
  330           return !isEmpty();
  331       }
  332   
  333       public void setSystemUsage(SystemUsage usageManager) {
  334           super.setSystemUsage(usageManager);
  335       }
  336   
  337       public void onUsageChanged(Usage usage, int oldPercentUsage,
  338               int newPercentUsage) {
  339           if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
  340               synchronized (this) {
  341                   flushRequired = true;
  342                   if (!iterating) {
  343                       expireOldMessages();
  344                       if (!hasSpace()) {
  345                           flushToDisk();
  346                           flushRequired = false;
  347                       }
  348                   }
  349               }
  350           }
  351       }
  352       
  353       public boolean isTransient() {
  354           return true;
  355       }
  356   
  357       protected boolean isSpaceInMemoryList() {
  358           return hasSpace() && isDiskListEmpty();
  359       }
  360       
  361       protected synchronized void expireOldMessages() {
  362           if (!memoryList.isEmpty()) {
  363               LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
  364               this.memoryList = new LinkedList<MessageReference>();
  365               while (!tmpList.isEmpty()) {
  366                   MessageReference node = tmpList.removeFirst();
  367                   if (node.isExpired()) {
  368                       discard(node);
  369                   }else {
  370                       memoryList.add(node);
  371                   }               
  372               }
  373           }
  374   
  375       }
  376   
  377       protected synchronized void flushToDisk() {
  378          
  379           if (!memoryList.isEmpty()) {
  380               while (!memoryList.isEmpty()) {
  381                   MessageReference node = memoryList.removeFirst();
  382                   node.decrementReferenceCount();
  383                   getDiskList().addLast(node);
  384               }
  385               memoryList.clear();
  386           }
  387       }
  388   
  389       protected boolean isDiskListEmpty() {
  390           return diskList == null || diskList.isEmpty();
  391       }
  392   
  393       protected ListContainer<MessageReference> getDiskList() {
  394           if (diskList == null) {
  395               try {
  396                   diskList = store.getListContainer(name, "TopicSubscription", true);
  397                   diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
  398               } catch (IOException e) {
  399                   LOG.error("Caught an IO Exception getting the DiskList " + name, e);
  400                   throw new RuntimeException(e);
  401               }
  402           }
  403           return diskList;
  404       }
  405       
  406       protected void discard(MessageReference message) {
  407           message.decrementReferenceCount();
  408           if (LOG.isDebugEnabled()) {
  409               LOG.debug("Discarding message " + message);
  410           }
  411           broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message);
  412       }
  413   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]