Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.cursors;
   18   
   19   import java.io.IOException;
   20   import org.apache.activemq.broker.region.Queue;
   21   import org.apache.activemq.command.Message;
   22   import org.apache.activemq.command.MessageId;
   23   import org.apache.activemq.store.MessageStore;
   24   import org.apache.commons.logging.Log;
   25   import org.apache.commons.logging.LogFactory;
   26   
   27   /**
   28    * persist pending messages pending message (messages awaiting dispatch to a
   29    * consumer) cursor
   30    * 
   31    * @version $Revision: 474985 $
   32    */
   33   class QueueStorePrefetch extends AbstractStoreCursor {
   34       private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
   35       private final MessageStore store;
   36      
   37       /**
   38        * Construct it
   39        * @param queue
   40        */
   41       public QueueStorePrefetch(Queue queue) {
   42           super(queue);
   43           this.store = queue.getMessageStore();
   44   
   45       }
   46   
   47       public boolean recoverMessageReference(MessageId messageReference) throws Exception {
   48           Message msg = this.store.getMessage(messageReference);
   49           if (msg != null) {
   50               return recoverMessage(msg);
   51           } else {
   52               String err = "Failed to retrieve message for id: " + messageReference;
   53               LOG.error(err);
   54               throw new IOException(err);
   55           }
   56       }
   57   
   58      
   59           
   60       @Override
   61       protected synchronized int getStoreSize() {
   62           try {
   63               int result = this.store.getMessageCount();
   64               return result;
   65               
   66           } catch (IOException e) {
   67               LOG.error("Failed to get message count", e);
   68               throw new RuntimeException(e);
   69           }
   70       }
   71       
   72       @Override
   73       protected synchronized boolean isStoreEmpty() {
   74           try {
   75               return this.store.isEmpty();
   76               
   77           } catch (Exception e) {
   78               LOG.error("Failed to get message count", e);
   79               throw new RuntimeException(e);
   80           }
   81       }
   82       
   83       @Override
   84       protected void resetBatch() {
   85           this.store.resetBatching();
   86       }
   87       
   88       @Override
   89       protected void setBatch(MessageId messageId) throws Exception {
   90           store.setBatch(messageId);
   91           batchResetNeeded = false;
   92       }
   93   
   94       
   95       @Override
   96       protected void doFillBatch() throws Exception {
   97           this.store.recoverNextMessages(this.maxBatchSize, this);
   98       }
   99   
  100       @Override
  101       public String toString() {
  102           return "QueueStorePrefetch" + System.identityHashCode(this);
  103       }
  104   
  105   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » cursors » [javadoc | source]