Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.cursors;
   18   
   19   import org.apache.activemq.broker.Broker;
   20   import org.apache.activemq.broker.region.MessageReference;
   21   import org.apache.activemq.broker.region.Queue;
   22   import org.apache.activemq.command.Message;
   23   import org.apache.activemq.usage.SystemUsage;
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   
   27   /**
   28    * Store based Cursor for Queues
   29    * 
   30    * @version $Revision: 474985 $
   31    */
   32   public class StoreQueueCursor extends AbstractPendingMessageCursor {
   33   
   34       private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
   35       private Broker broker;
   36       private int pendingCount;
   37       private Queue queue;
   38       private PendingMessageCursor nonPersistent;
   39       private QueueStorePrefetch persistent;
   40       private boolean started;
   41       private PendingMessageCursor currentCursor;
   42   
   43       /**
   44        * Construct
   45        * 
   46        * @param queue
   47        * @param tmpStore
   48        */
   49       public StoreQueueCursor(Broker broker,Queue queue) {
   50           this.broker=broker;
   51           this.queue = queue;
   52           this.persistent = new QueueStorePrefetch(queue);
   53           currentCursor = persistent;
   54       }
   55   
   56       public synchronized void start() throws Exception {
   57           started = true;
   58           super.start();
   59           if (nonPersistent == null) {
   60               if (broker.getBrokerService().isPersistent()) {
   61                   nonPersistent = new FilePendingMessageCursor(broker,queue.getName());
   62               }else {
   63                   nonPersistent = new VMPendingMessageCursor();
   64               }
   65               nonPersistent.setMaxBatchSize(getMaxBatchSize());
   66               nonPersistent.setSystemUsage(systemUsage);
   67               nonPersistent.setEnableAudit(isEnableAudit());
   68               nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
   69               nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
   70           }
   71           nonPersistent.setMessageAudit(getMessageAudit());
   72           nonPersistent.start();
   73           persistent.setMessageAudit(getMessageAudit());
   74           persistent.start();
   75           pendingCount = persistent.size() + nonPersistent.size();
   76       }
   77   
   78       public synchronized void stop() throws Exception {
   79           started = false;
   80           if (nonPersistent != null) {
   81               nonPersistent.stop();
   82               nonPersistent.gc();
   83           }
   84           persistent.stop();
   85           persistent.gc();
   86           super.stop();
   87           pendingCount = 0;
   88       }
   89   
   90       public synchronized void addMessageLast(MessageReference node) throws Exception {
   91           if (node != null) {
   92               Message msg = node.getMessage();
   93               if (started) {
   94                   pendingCount++;
   95                   if (!msg.isPersistent()) {
   96                       nonPersistent.addMessageLast(node);
   97                   }
   98               }
   99               if (msg.isPersistent()) {
  100                   persistent.addMessageLast(node);
  101               }
  102           }
  103       }
  104   
  105       public synchronized void addMessageFirst(MessageReference node) throws Exception {
  106           if (node != null) {
  107               Message msg = node.getMessage();
  108               if (started) {
  109                   pendingCount++;
  110                   if (!msg.isPersistent()) {
  111                       nonPersistent.addMessageFirst(node);
  112                   }
  113               }
  114               if (msg.isPersistent()) {
  115                   persistent.addMessageFirst(node);
  116               }
  117           }
  118       }
  119   
  120       public synchronized void clear() {
  121           pendingCount = 0;
  122       }
  123   
  124       public synchronized boolean hasNext() {
  125           try {
  126               getNextCursor();
  127           } catch (Exception e) {
  128               LOG.error("Failed to get current cursor ", e);
  129               throw new RuntimeException(e);
  130          }
  131          return currentCursor != null ? currentCursor.hasNext() : false;
  132       }
  133   
  134       public synchronized MessageReference next() {
  135           MessageReference result = currentCursor != null ? currentCursor.next() : null;
  136           return result;
  137       }
  138   
  139       public synchronized void remove() {
  140           if (currentCursor != null) {
  141               currentCursor.remove();
  142           }
  143           pendingCount--;
  144       }
  145   
  146       public synchronized void remove(MessageReference node) {
  147           if (!node.isPersistent()) {
  148               nonPersistent.remove(node);
  149           } else {
  150               persistent.remove(node);
  151           }
  152           pendingCount--;
  153       }
  154   
  155       public synchronized void reset() {
  156           nonPersistent.reset();
  157           persistent.reset();
  158           pendingCount = persistent.size() + nonPersistent.size();        
  159       }
  160       
  161       public void release() {
  162           nonPersistent.release();
  163           persistent.release();
  164       }
  165   
  166   
  167       public synchronized int size() {
  168           if (pendingCount < 0) {
  169               pendingCount = persistent.size() + nonPersistent.size();
  170           }
  171           return pendingCount;
  172       }
  173   
  174       public synchronized boolean isEmpty() {
  175           // if negative, more messages arrived in store since last reset so non empty
  176           return pendingCount == 0;
  177       }
  178   
  179       /**
  180        * Informs the Broker if the subscription needs to intervention to recover
  181        * it's state e.g. DurableTopicSubscriber may do
  182        * 
  183        * @see org.apache.activemq.region.cursors.PendingMessageCursor
  184        * @return true if recovery required
  185        */
  186       public boolean isRecoveryRequired() {
  187           return false;
  188       }
  189   
  190       /**
  191        * @return the nonPersistent Cursor
  192        */
  193       public PendingMessageCursor getNonPersistent() {
  194           return this.nonPersistent;
  195       }
  196   
  197       /**
  198        * @param nonPersistent cursor to set
  199        */
  200       public void setNonPersistent(PendingMessageCursor nonPersistent) {
  201           this.nonPersistent = nonPersistent;
  202       }
  203   
  204       public void setMaxBatchSize(int maxBatchSize) {
  205           persistent.setMaxBatchSize(maxBatchSize);
  206           if (nonPersistent != null) {
  207               nonPersistent.setMaxBatchSize(maxBatchSize);
  208           }
  209           super.setMaxBatchSize(maxBatchSize);
  210       }
  211       
  212       
  213       public void setMaxProducersToAudit(int maxProducersToAudit) {
  214           super.setMaxProducersToAudit(maxProducersToAudit);
  215           if (persistent != null) {
  216               persistent.setMaxProducersToAudit(maxProducersToAudit);
  217           }
  218           if (nonPersistent != null) {
  219               nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
  220           }
  221       }
  222   
  223       public void setMaxAuditDepth(int maxAuditDepth) {
  224           super.setMaxAuditDepth(maxAuditDepth);
  225           if (persistent != null) {
  226               persistent.setMaxAuditDepth(maxAuditDepth);
  227           }
  228           if (nonPersistent != null) {
  229               nonPersistent.setMaxAuditDepth(maxAuditDepth);
  230           }
  231       }
  232       
  233       public void setEnableAudit(boolean enableAudit) {
  234           super.setEnableAudit(enableAudit);
  235           if (persistent != null) {
  236               persistent.setEnableAudit(enableAudit);
  237           }
  238           if (nonPersistent != null) {
  239               nonPersistent.setEnableAudit(enableAudit);
  240           }
  241       }
  242       
  243       public void setUseCache(boolean useCache) {
  244           super.setUseCache(useCache);
  245           if (persistent != null) {
  246               persistent.setUseCache(useCache);
  247           }
  248           if (nonPersistent != null) {
  249               nonPersistent.setUseCache(useCache);
  250           }
  251       }
  252       
  253       public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
  254           super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
  255           if (persistent != null) {
  256               persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
  257           }
  258           if (nonPersistent != null) {
  259               nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
  260           }
  261       }
  262   
  263   
  264   
  265       public synchronized void gc() {
  266           if (persistent != null) {
  267               persistent.gc();
  268           }
  269           if (nonPersistent != null) {
  270               nonPersistent.gc();
  271           }
  272           pendingCount = persistent.size() + nonPersistent.size();
  273       }
  274   
  275       public void setSystemUsage(SystemUsage usageManager) {
  276           super.setSystemUsage(usageManager);
  277           if (persistent != null) {
  278               persistent.setSystemUsage(usageManager);
  279           }
  280           if (nonPersistent != null) {
  281               nonPersistent.setSystemUsage(usageManager);
  282           }
  283       }
  284   
  285       protected synchronized PendingMessageCursor getNextCursor() throws Exception {
  286           if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
  287               currentCursor = currentCursor == persistent ? nonPersistent : persistent;
  288               // sanity check
  289               if (currentCursor.isEmpty()) {
  290                   currentCursor = currentCursor == persistent ? nonPersistent : persistent;
  291               }
  292           }
  293           return currentCursor;
  294       }
  295   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]