Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.cursors;
   18   
   19   import java.util.Collections;
   20   import java.util.LinkedList;
   21   import java.util.List;
   22   import org.apache.activemq.ActiveMQMessageAudit;
   23   import org.apache.activemq.broker.ConnectionContext;
   24   import org.apache.activemq.broker.region.BaseDestination;
   25   import org.apache.activemq.broker.region.Destination;
   26   import org.apache.activemq.broker.region.MessageReference;
   27   import org.apache.activemq.command.MessageId;
   28   import org.apache.activemq.usage.SystemUsage;
   29   
   30   /**
   31    * Abstract method holder for pending message (messages awaiting disptach to a
   32    * consumer) cursor
   33    * 
   34    * @version $Revision: 882100 $
   35    */
   36   public class AbstractPendingMessageCursor implements PendingMessageCursor {
   37       protected int memoryUsageHighWaterMark = 70;
   38       protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
   39       protected SystemUsage systemUsage;
   40       protected int maxProducersToAudit=1024;
   41       protected int maxAuditDepth=1000;
   42       protected boolean enableAudit=true;
   43       protected ActiveMQMessageAudit audit;
   44       protected boolean useCache=true;
   45       private boolean started=false;
   46       protected MessageReference last = null;
   47     
   48   
   49       public synchronized void start() throws Exception  {
   50           if (!started && enableAudit && audit==null) {
   51               audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
   52           }
   53           started=true;
   54       }
   55   
   56       public synchronized void stop() throws Exception  {
   57           started=false;
   58           audit=null;
   59           gc();
   60       }
   61   
   62       public void add(ConnectionContext context, Destination destination) throws Exception {
   63       }
   64   
   65       @SuppressWarnings("unchecked")
   66       public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
   67           return Collections.EMPTY_LIST;
   68       }
   69   
   70       public boolean isRecoveryRequired() {
   71           return true;
   72       }
   73   
   74       public void addMessageFirst(MessageReference node) throws Exception {
   75       }
   76   
   77       public void addMessageLast(MessageReference node) throws Exception {
   78       }
   79   
   80       public void addRecoveredMessage(MessageReference node) throws Exception {
   81           addMessageLast(node);
   82       }
   83   
   84       public void clear() {
   85       }
   86   
   87       public boolean hasNext() {
   88           return false;
   89       }
   90   
   91       public boolean isEmpty() {
   92           return false;
   93       }
   94   
   95       public boolean isEmpty(Destination destination) {
   96           return isEmpty();
   97       }
   98   
   99       public MessageReference next() {
  100           return null;
  101       }
  102   
  103       public void remove() {
  104       }
  105   
  106       public void reset() {
  107       }
  108   
  109       public int size() {
  110           return 0;
  111       }
  112   
  113       public int getMaxBatchSize() {
  114           return maxBatchSize;
  115       }
  116   
  117       public void setMaxBatchSize(int maxBatchSize) {
  118           this.maxBatchSize = maxBatchSize;
  119       }
  120   
  121       protected void fillBatch() throws Exception {
  122       }
  123   
  124       public void resetForGC() {
  125           reset();
  126       }
  127   
  128       public void remove(MessageReference node) {
  129       }
  130   
  131       public void gc() {
  132       }
  133   
  134       public void setSystemUsage(SystemUsage usageManager) {
  135           this.systemUsage = usageManager;
  136       }
  137   
  138       public boolean hasSpace() {
  139           return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
  140       }
  141   
  142       public boolean isFull() {
  143           return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
  144       }
  145   
  146       public void release() {
  147       }
  148   
  149       public boolean hasMessagesBufferedToDeliver() {
  150           return false;
  151       }
  152   
  153       /**
  154        * @return the memoryUsageHighWaterMark
  155        */
  156       public int getMemoryUsageHighWaterMark() {
  157           return memoryUsageHighWaterMark;
  158       }
  159   
  160       /**
  161        * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
  162        */
  163       public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
  164           this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
  165       }
  166   
  167       /**
  168        * @return the usageManager
  169        */
  170       public SystemUsage getSystemUsage() {
  171           return this.systemUsage;
  172       }
  173   
  174       /**
  175        * destroy the cursor
  176        * 
  177        * @throws Exception
  178        */
  179       public void destroy() throws Exception {
  180           stop();
  181       }
  182   
  183       /**
  184        * Page in a restricted number of messages
  185        * 
  186        * @param maxItems maximum number of messages to return
  187        * @return a list of paged in messages
  188        */
  189       public LinkedList<MessageReference> pageInList(int maxItems) {
  190           throw new RuntimeException("Not supported");
  191       }
  192   
  193       /**
  194        * @return the maxProducersToAudit
  195        */
  196       public int getMaxProducersToAudit() {
  197           return maxProducersToAudit;
  198       }
  199   
  200       /**
  201        * @param maxProducersToAudit the maxProducersToAudit to set
  202        */
  203       public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
  204           this.maxProducersToAudit = maxProducersToAudit;
  205           if (audit != null) {
  206               audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
  207           }
  208       }
  209   
  210       /**
  211        * @return the maxAuditDepth
  212        */
  213       public int getMaxAuditDepth() {
  214           return maxAuditDepth;
  215       }
  216       
  217   
  218       /**
  219        * @param maxAuditDepth the maxAuditDepth to set
  220        */
  221       public synchronized void setMaxAuditDepth(int maxAuditDepth) {
  222           this.maxAuditDepth = maxAuditDepth;
  223           if (audit != null) {
  224               audit.setAuditDepth(maxAuditDepth);
  225           }
  226       }
  227       
  228       
  229       /**
  230        * @return the enableAudit
  231        */
  232       public boolean isEnableAudit() {
  233           return enableAudit;
  234       }
  235   
  236       /**
  237        * @param enableAudit the enableAudit to set
  238        */
  239       public synchronized void setEnableAudit(boolean enableAudit) {
  240           this.enableAudit = enableAudit;
  241           if (enableAudit && started && audit==null) {
  242               audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
  243           }
  244       }
  245       
  246       public boolean isTransient() {
  247           return false;
  248       }
  249       
  250          
  251       /**
  252        * set the audit
  253        * @param audit new audit component
  254        */
  255       public void setMessageAudit(ActiveMQMessageAudit audit) {
  256       	this.audit=audit;
  257       }
  258       
  259       
  260       /**
  261        * @return the audit
  262        */
  263       public ActiveMQMessageAudit getMessageAudit() {
  264       	return audit;
  265       }
  266       
  267       public boolean isUseCache() {
  268           return useCache;
  269       }
  270   
  271       public void setUseCache(boolean useCache) {
  272           this.useCache = useCache;
  273       }
  274   
  275       public synchronized boolean isDuplicate(MessageId messageId) {
  276           boolean unique = recordUniqueId(messageId);
  277           rollback(messageId);
  278           return !unique;
  279       }
  280       
  281       /**
  282        * records a message id and checks if it is a duplicate
  283        * @param messageId
  284        * @return true if id is unique, false otherwise.
  285        */
  286       public synchronized boolean recordUniqueId(MessageId messageId) {
  287           if (!enableAudit || audit==null) {
  288               return true;
  289           }
  290           return !audit.isDuplicate(messageId);
  291       }
  292       
  293       public synchronized void rollback(MessageId id) {
  294           if (audit != null) {
  295               audit.rollback(id);
  296           }
  297       }
  298       
  299       protected synchronized boolean isStarted() {
  300           return started;
  301       }
  302   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]