Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.cursors;
   18   
   19   import java.util.Iterator;
   20   import java.util.LinkedHashMap;
   21   import java.util.Map.Entry;
   22   import org.apache.activemq.broker.region.Destination;
   23   import org.apache.activemq.broker.region.MessageReference;
   24   import org.apache.activemq.command.Message;
   25   import org.apache.activemq.command.MessageId;
   26   import org.apache.activemq.store.MessageRecoveryListener;
   27   import org.apache.commons.logging.Log;
   28   import org.apache.commons.logging.LogFactory;
   29   
   30   /**
   31    *  Store based cursor
   32    *
   33    */
   34   public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
   35       private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
   36       protected final Destination regionDestination;
   37       private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
   38       private Iterator<Entry<MessageId, Message>> iterator = null;
   39       private boolean cacheEnabled=false;
   40       protected boolean batchResetNeeded = true;
   41       protected boolean storeHasMessages = false;
   42       protected int size;
   43       private MessageId lastCachedId;
   44       
   45       protected AbstractStoreCursor(Destination destination) {
   46           this.regionDestination=destination;
   47       }
   48       
   49       @Override
   50       public final synchronized void start() throws Exception{
   51           if (!isStarted()) {
   52               super.start();
   53               clear();
   54               resetBatch();
   55               this.size = getStoreSize();
   56               this.storeHasMessages=this.size > 0;
   57               if (!this.storeHasMessages&&useCache) {
   58                   cacheEnabled=true;
   59               }
   60           } 
   61       }
   62       
   63       @Override
   64       public final synchronized void stop() throws Exception {
   65           resetBatch();
   66           super.stop();
   67           gc();
   68       }
   69   
   70       
   71       public final boolean recoverMessage(Message message) throws Exception {
   72           return recoverMessage(message,false);
   73       }
   74       
   75       public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
   76           boolean recovered = false;
   77           if (recordUniqueId(message.getMessageId())) {
   78               if (!cached) {
   79                   message.setRegionDestination(regionDestination);
   80                   if( message.getMemoryUsage()==null ) {
   81                       message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
   82                   }
   83               }
   84               message.incrementReferenceCount();
   85               batchList.put(message.getMessageId(), message);
   86               clearIterator(true);
   87               recovered = true;
   88           } else {
   89               /*
   90                * we should expect to get these - as the message is recorded as it before it goes into
   91                * the cache. If subsequently, we pull out that message from the store (before its deleted)
   92                * it will be a duplicate - but should be ignored
   93                */
   94               //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
   95               storeHasMessages = true;
   96           }
   97           return recovered;
   98       }
   99       
  100       @Override
  101       public final void reset() {
  102           if (batchList.isEmpty()) {
  103               try {
  104                   fillBatch();
  105               } catch (Exception e) {
  106                   LOG.error("Failed to fill batch", e);
  107                   throw new RuntimeException(e);
  108               }
  109           }
  110           clearIterator(true);
  111           size();
  112       }
  113       
  114       @Override
  115       public synchronized void release() {
  116           clearIterator(false);
  117       }
  118       
  119       private synchronized void clearIterator(boolean ensureIterator) {
  120           boolean haveIterator = this.iterator != null;
  121           this.iterator=null;
  122           last = null;
  123           if(haveIterator&&ensureIterator) {
  124               ensureIterator();
  125           }
  126       }
  127       
  128       private synchronized void ensureIterator() {
  129           if(this.iterator==null) {
  130               this.iterator=this.batchList.entrySet().iterator();
  131           }
  132       }
  133   
  134   
  135       public final void finished() {
  136       }
  137           
  138       @Override
  139       public final synchronized boolean hasNext() {
  140           if (batchList.isEmpty()) {
  141               try {
  142                   fillBatch();
  143               } catch (Exception e) {
  144                   LOG.error("Failed to fill batch", e);
  145                   throw new RuntimeException(e);
  146               }
  147           }
  148           ensureIterator();
  149           return this.iterator.hasNext();
  150       }
  151       
  152       @Override
  153       public final synchronized MessageReference next() {
  154           MessageReference result = null;
  155           if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
  156               result = this.iterator.next().getValue();
  157           }
  158           last = result;
  159           if (result != null) {
  160               result.incrementReferenceCount();
  161           }
  162           return result;
  163       }
  164       
  165       @Override
  166       public final synchronized void addMessageLast(MessageReference node) throws Exception {
  167           if (cacheEnabled && hasSpace()) {
  168               recoverMessage(node.getMessage(),true);
  169               lastCachedId = node.getMessageId();
  170           } else {
  171               if (cacheEnabled) {
  172                   cacheEnabled=false;
  173                   if (LOG.isDebugEnabled()) {
  174                       LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size
  175                               + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId())
  176                               + " current node seqId: " + node.getMessageId().getBrokerSequenceId());
  177                   }
  178                   // sync with store on disabling the cache
  179                   if (lastCachedId != null) {
  180                       setBatch(lastCachedId);
  181                   }
  182               }
  183           }
  184           size++;
  185       }
  186   
  187       protected void setBatch(MessageId messageId) throws Exception {
  188       }
  189   
  190       @Override
  191       public final synchronized void addMessageFirst(MessageReference node) throws Exception {
  192           cacheEnabled=false;
  193           size++;
  194       }
  195   
  196       @Override
  197       public final synchronized void remove() {
  198           size--;
  199           if (iterator!=null) {
  200               iterator.remove();
  201           }
  202           if (last != null) {
  203               last.decrementReferenceCount();
  204           }
  205           if (size==0 && isStarted() && useCache && hasSpace() && isStoreEmpty()) {
  206               if (LOG.isDebugEnabled()) {
  207                   LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");
  208               }
  209               cacheEnabled=true;
  210           }
  211       }
  212   
  213       @Override
  214       public final synchronized void remove(MessageReference node) {
  215           size--;
  216           cacheEnabled=false;
  217           batchList.remove(node.getMessageId());
  218       }
  219       
  220       @Override
  221       public final synchronized void clear() {
  222           gc();
  223       }
  224       
  225       @Override
  226       public final synchronized void gc() {
  227           for (Message msg : batchList.values()) {
  228               rollback(msg.getMessageId());
  229               msg.decrementReferenceCount();
  230           }
  231           batchList.clear();
  232           clearIterator(false);
  233           batchResetNeeded = true;
  234           this.cacheEnabled=false;
  235           if (isStarted()) { 
  236               size = getStoreSize();
  237           } else {
  238               size = 0;
  239           }
  240       }
  241       
  242       @Override
  243       protected final synchronized void fillBatch() {
  244           if (batchResetNeeded) {
  245               resetBatch();
  246               this.batchResetNeeded = false;
  247           }
  248           if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
  249               this.storeHasMessages = false;
  250               try {
  251                   doFillBatch();
  252               } catch (Exception e) {
  253                   LOG.error("Failed to fill batch", e);
  254                   throw new RuntimeException(e);
  255               }
  256               if (!this.batchList.isEmpty()) {
  257                   this.storeHasMessages=true;
  258               }
  259           }
  260       }
  261       
  262       @Override
  263       public final synchronized boolean isEmpty() {
  264           // negative means more messages added to store through queue.send since last reset
  265           return size == 0;
  266       }
  267   
  268       @Override
  269       public final synchronized boolean hasMessagesBufferedToDeliver() {
  270           return !batchList.isEmpty();
  271       }
  272   
  273       @Override
  274       public final synchronized int size() {
  275           if (size < 0) {
  276               this.size = getStoreSize();
  277           }
  278           return size;
  279       }
  280       
  281       
  282       protected abstract void doFillBatch() throws Exception;
  283       
  284       protected abstract void resetBatch();
  285       
  286       protected abstract int getStoreSize();
  287       
  288       protected abstract boolean isStoreEmpty();
  289   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]