Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.cursors;
   18   
   19   import java.util.Collections;
   20   import java.util.HashMap;
   21   import java.util.List;
   22   import java.util.Map;
   23   import java.util.concurrent.CopyOnWriteArrayList;
   24   
   25   import org.apache.activemq.advisory.AdvisorySupport;
   26   import org.apache.activemq.broker.Broker;
   27   import org.apache.activemq.broker.ConnectionContext;
   28   import org.apache.activemq.broker.region.Destination;
   29   import org.apache.activemq.broker.region.MessageReference;
   30   import org.apache.activemq.broker.region.Subscription;
   31   import org.apache.activemq.broker.region.Topic;
   32   import org.apache.activemq.command.Message;
   33   import org.apache.activemq.usage.SystemUsage;
   34   import org.apache.commons.logging.Log;
   35   import org.apache.commons.logging.LogFactory;
   36   
   37   /**
   38    * persist pending messages pending message (messages awaiting dispatch to a
   39    * consumer) cursor
   40    * 
   41    * @version $Revision: 813962 $
   42    */
   43   public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
   44   
   45       private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
   46       private final String clientId;
   47       private final String subscriberName;
   48       private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
   49       private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
   50       private final PendingMessageCursor nonPersistent;
   51       private PendingMessageCursor currentCursor;
   52       private final Subscription subscription;
   53       /**
   54        * @param broker Broker for this cursor
   55        * @param clientId clientId for this cursor
   56        * @param subscriberName subscriber name for this cursor
   57        * @param maxBatchSize currently ignored
   58        * @param subscription  subscription for this cursor
   59        */
   60       public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
   61           this.subscription=subscription;
   62           this.clientId = clientId;
   63           this.subscriberName = subscriberName;
   64           if (broker.getBrokerService().isPersistent()) {
   65               this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
   66           }else {
   67               this.nonPersistent = new VMPendingMessageCursor();
   68           }
   69           
   70           this.nonPersistent.setMaxBatchSize(maxBatchSize);
   71           this.nonPersistent.setSystemUsage(systemUsage);
   72           this.storePrefetches.add(this.nonPersistent);
   73       }
   74   
   75       public synchronized void start() throws Exception {
   76           if (!isStarted()) {
   77               super.start();
   78               for (PendingMessageCursor tsp : storePrefetches) {
   79               	tsp.setMessageAudit(getMessageAudit());
   80                   tsp.start();
   81               }
   82           }
   83       }
   84   
   85       public synchronized void stop() throws Exception {
   86           if (isStarted()) {
   87               super.stop();
   88               for (PendingMessageCursor tsp : storePrefetches) {
   89                   tsp.stop();
   90               }
   91           }
   92       }
   93   
   94       /**
   95        * Add a destination
   96        * 
   97        * @param context
   98        * @param destination
   99        * @throws Exception
  100        */
  101       public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
  102           if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
  103               TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
  104               tsp.setMaxBatchSize(getMaxBatchSize());
  105               tsp.setSystemUsage(systemUsage);
  106               tsp.setEnableAudit(isEnableAudit());
  107               tsp.setMaxAuditDepth(getMaxAuditDepth());
  108               tsp.setMaxProducersToAudit(getMaxProducersToAudit());
  109               tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark());
  110               topics.put(destination, tsp);
  111               storePrefetches.add(tsp);
  112               if (isStarted()) {
  113                   tsp.start();
  114               }
  115           }
  116       }
  117   
  118       /**
  119        * remove a destination
  120        * 
  121        * @param context
  122        * @param destination
  123        * @throws Exception
  124        */
  125       public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
  126           PendingMessageCursor tsp = topics.remove(destination);
  127           if (tsp != null) {
  128               storePrefetches.remove(tsp);
  129           }
  130           return Collections.EMPTY_LIST;
  131       }
  132   
  133       /**
  134        * @return true if there are no pending messages
  135        */
  136       public synchronized boolean isEmpty() {
  137           for (PendingMessageCursor tsp : storePrefetches) {
  138               if( !tsp.isEmpty() )
  139                   return false;
  140           }
  141           return true;
  142       }
  143   
  144       public synchronized boolean isEmpty(Destination destination) {
  145           boolean result = true;
  146           TopicStorePrefetch tsp = topics.get(destination);
  147           if (tsp != null) {
  148               result = tsp.isEmpty();
  149           }
  150           return result;
  151       }
  152   
  153       /**
  154        * Informs the Broker if the subscription needs to intervention to recover
  155        * it's state e.g. DurableTopicSubscriber may do
  156        * 
  157        * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
  158        * @return true if recovery required
  159        */
  160       public boolean isRecoveryRequired() {
  161           return false;
  162       }
  163   
  164       public synchronized void addMessageLast(MessageReference node) throws Exception {
  165           if (node != null) {
  166               Message msg = node.getMessage();
  167               if (isStarted()) {
  168                   if (!msg.isPersistent()) {
  169                       nonPersistent.addMessageLast(node);
  170                   }
  171               }
  172               if (msg.isPersistent()) {
  173                   Destination dest = msg.getRegionDestination();
  174                   TopicStorePrefetch tsp = topics.get(dest);
  175                   if (tsp != null) {
  176                       tsp.addMessageLast(node);
  177                   }
  178               }
  179           }
  180       }
  181   
  182       public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
  183           nonPersistent.addMessageLast(node);
  184       }
  185   
  186       public synchronized void clear() {
  187           for (PendingMessageCursor tsp : storePrefetches) {
  188               tsp.clear();
  189           }
  190       }
  191   
  192       public synchronized boolean hasNext() {
  193           boolean result = true;
  194           if (result) {
  195               try {
  196                   currentCursor = getNextCursor();
  197               } catch (Exception e) {
  198                   LOG.error("Failed to get current cursor ", e);
  199                   throw new RuntimeException(e);
  200               }
  201               result = currentCursor != null ? currentCursor.hasNext() : false;
  202           }
  203           return result;
  204       }
  205   
  206       public synchronized MessageReference next() {
  207           MessageReference result = currentCursor != null ? currentCursor.next() : null;
  208           return result;
  209       }
  210   
  211       public synchronized void remove() {
  212           if (currentCursor != null) {
  213               currentCursor.remove();
  214           }
  215       }
  216   
  217       public synchronized void remove(MessageReference node) {
  218           if (currentCursor != null) {
  219               currentCursor.remove(node);
  220           }
  221       }
  222   
  223       public synchronized void reset() {
  224           for (PendingMessageCursor storePrefetch : storePrefetches) {
  225               storePrefetch.reset();
  226           }
  227       }
  228   
  229       public synchronized void release() {
  230           for (PendingMessageCursor storePrefetch : storePrefetches) {
  231               storePrefetch.release();
  232           }
  233       }
  234   
  235       public synchronized int size() {
  236           int pendingCount=0;
  237           for (PendingMessageCursor tsp : storePrefetches) {
  238               pendingCount += tsp.size();
  239           }
  240           return pendingCount;
  241       }
  242   
  243       public void setMaxBatchSize(int maxBatchSize) {
  244           for (PendingMessageCursor storePrefetch : storePrefetches) {
  245               storePrefetch.setMaxBatchSize(maxBatchSize);
  246           }
  247           super.setMaxBatchSize(maxBatchSize);
  248       }
  249   
  250       public synchronized void gc() {
  251           for (PendingMessageCursor tsp : storePrefetches) {
  252               tsp.gc();
  253           }
  254       }
  255   
  256       public void setSystemUsage(SystemUsage usageManager) {
  257           super.setSystemUsage(usageManager);
  258           for (PendingMessageCursor tsp : storePrefetches) {
  259               tsp.setSystemUsage(usageManager);
  260           }
  261       }
  262       
  263       public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
  264           super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
  265           for (PendingMessageCursor cursor : storePrefetches) {
  266               cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
  267           }
  268       }
  269       
  270       public void setMaxProducersToAudit(int maxProducersToAudit) {
  271           super.setMaxProducersToAudit(maxProducersToAudit);
  272           for (PendingMessageCursor cursor : storePrefetches) {
  273               cursor.setMaxAuditDepth(maxAuditDepth);
  274           }
  275       }
  276   
  277       public void setMaxAuditDepth(int maxAuditDepth) {
  278           super.setMaxAuditDepth(maxAuditDepth);
  279           for (PendingMessageCursor cursor : storePrefetches) {
  280               cursor.setMaxAuditDepth(maxAuditDepth);
  281           }
  282       }
  283       
  284       public void setEnableAudit(boolean enableAudit) {
  285           super.setEnableAudit(enableAudit);
  286           for (PendingMessageCursor cursor : storePrefetches) {
  287               cursor.setEnableAudit(enableAudit);
  288           }
  289       }
  290       
  291       public  void setUseCache(boolean useCache) {
  292           super.setUseCache(useCache);
  293           for (PendingMessageCursor cursor : storePrefetches) {
  294               cursor.setUseCache(useCache);
  295           }
  296       }
  297       
  298       protected synchronized PendingMessageCursor getNextCursor() throws Exception {
  299           if (currentCursor == null || currentCursor.isEmpty()) {
  300               currentCursor = null;
  301               for (PendingMessageCursor tsp : storePrefetches) {
  302                   if (tsp.hasNext()) {
  303                       currentCursor = tsp;
  304                       break;
  305                   }
  306               }
  307               // round-robin
  308               if (storePrefetches.size()>1) {
  309                   PendingMessageCursor first = storePrefetches.remove(0);
  310                   storePrefetches.add(first);
  311               }
  312           }
  313           return currentCursor;
  314       }
  315       
  316       public String toString() {
  317           return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
  318       }
  319   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]