Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.journal;
   18   
   19   import java.io.IOException;
   20   import java.util.ArrayList;
   21   import java.util.Collections;
   22   import java.util.HashSet;
   23   import java.util.Iterator;
   24   import java.util.LinkedHashMap;
   25   import java.util.List;
   26   import java.util.Map;
   27   import java.util.Set;
   28   
   29   import org.apache.activeio.journal.RecordLocation;
   30   import org.apache.activemq.broker.ConnectionContext;
   31   import org.apache.activemq.command.ActiveMQDestination;
   32   import org.apache.activemq.command.JournalQueueAck;
   33   import org.apache.activemq.command.Message;
   34   import org.apache.activemq.command.MessageAck;
   35   import org.apache.activemq.command.MessageId;
   36   import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
   37   import org.apache.activemq.store.MessageRecoveryListener;
   38   import org.apache.activemq.store.MessageStore;
   39   import org.apache.activemq.store.PersistenceAdapter;
   40   import org.apache.activemq.store.AbstractMessageStore;
   41   import org.apache.activemq.transaction.Synchronization;
   42   import org.apache.activemq.usage.MemoryUsage;
   43   import org.apache.activemq.usage.SystemUsage;
   44   import org.apache.activemq.util.Callback;
   45   import org.apache.activemq.util.TransactionTemplate;
   46   import org.apache.commons.logging.Log;
   47   import org.apache.commons.logging.LogFactory;
   48   
   49   /**
   50    * A MessageStore that uses a Journal to store it's messages.
   51    * 
   52    * @version $Revision: 1.14 $
   53    */
   54   public class JournalMessageStore extends AbstractMessageStore {
   55   
   56       private static final Log LOG = LogFactory.getLog(JournalMessageStore.class);
   57   
   58       protected final JournalPersistenceAdapter peristenceAdapter;
   59       protected final JournalTransactionStore transactionStore;
   60       protected final MessageStore longTermStore;
   61       protected final TransactionTemplate transactionTemplate;
   62       protected RecordLocation lastLocation;
   63       protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
   64   
   65       private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
   66       private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
   67   
   68       /** A MessageStore that we can use to retrieve messages quickly. */
   69       private Map<MessageId, Message> cpAddedMessageIds;
   70   
   71   
   72       private MemoryUsage memoryUsage;
   73   
   74       public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
   75           super(destination);
   76           this.peristenceAdapter = adapter;
   77           this.transactionStore = adapter.getTransactionStore();
   78           this.longTermStore = checkpointStore;
   79           this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
   80       }
   81   
   82       
   83       public void setMemoryUsage(MemoryUsage memoryUsage) {
   84           this.memoryUsage=memoryUsage;
   85           longTermStore.setMemoryUsage(memoryUsage);
   86       }
   87   
   88       /**
   89        * Not synchronized since the Journal has better throughput if you increase
   90        * the number of concurrent writes that it is doing.
   91        */
   92       public void addMessage(ConnectionContext context, final Message message) throws IOException {
   93   
   94           final MessageId id = message.getMessageId();
   95   
   96           final boolean debug = LOG.isDebugEnabled();
   97           message.incrementReferenceCount();
   98   
   99           final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
  100           if (!context.isInTransaction()) {
  101               if (debug) {
  102                   LOG.debug("Journalled message add for: " + id + ", at: " + location);
  103               }
  104               addMessage(message, location);
  105           } else {
  106               if (debug) {
  107                   LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
  108               }
  109               synchronized (this) {
  110                   inFlightTxLocations.add(location);
  111               }
  112               transactionStore.addMessage(this, message, location);
  113               context.getTransaction().addSynchronization(new Synchronization() {
  114                   public void afterCommit() throws Exception {
  115                       if (debug) {
  116                           LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
  117                       }
  118                       synchronized (JournalMessageStore.this) {
  119                           inFlightTxLocations.remove(location);
  120                           addMessage(message, location);
  121                       }
  122                   }
  123   
  124                   public void afterRollback() throws Exception {
  125                       if (debug) {
  126                           LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
  127                       }
  128                       synchronized (JournalMessageStore.this) {
  129                           inFlightTxLocations.remove(location);
  130                       }
  131                       message.decrementReferenceCount();
  132                   }
  133               });
  134           }
  135       }
  136   
  137       void addMessage(final Message message, final RecordLocation location) {
  138           synchronized (this) {
  139               lastLocation = location;
  140               MessageId id = message.getMessageId();
  141               messages.put(id, message);
  142           }
  143       }
  144   
  145       public void replayAddMessage(ConnectionContext context, Message message) {
  146           try {
  147               // Only add the message if it has not already been added.
  148               Message t = longTermStore.getMessage(message.getMessageId());
  149               if (t == null) {
  150                   longTermStore.addMessage(context, message);
  151               }
  152           } catch (Throwable e) {
  153               LOG.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
  154           }
  155       }
  156   
  157       /**
  158        */
  159       public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
  160           final boolean debug = LOG.isDebugEnabled();
  161           JournalQueueAck remove = new JournalQueueAck();
  162           remove.setDestination(destination);
  163           remove.setMessageAck(ack);
  164   
  165           final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
  166           if (!context.isInTransaction()) {
  167               if (debug) {
  168                   LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
  169               }
  170               removeMessage(ack, location);
  171           } else {
  172               if (debug) {
  173                   LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
  174               }
  175               synchronized (this) {
  176                   inFlightTxLocations.add(location);
  177               }
  178               transactionStore.removeMessage(this, ack, location);
  179               context.getTransaction().addSynchronization(new Synchronization() {
  180                   public void afterCommit() throws Exception {
  181                       if (debug) {
  182                           LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
  183                       }
  184                       synchronized (JournalMessageStore.this) {
  185                           inFlightTxLocations.remove(location);
  186                           removeMessage(ack, location);
  187                       }
  188                   }
  189   
  190                   public void afterRollback() throws Exception {
  191                       if (debug) {
  192                           LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
  193                       }
  194                       synchronized (JournalMessageStore.this) {
  195                           inFlightTxLocations.remove(location);
  196                       }
  197                   }
  198               });
  199   
  200           }
  201       }
  202   
  203       final void removeMessage(final MessageAck ack, final RecordLocation location) {
  204           synchronized (this) {
  205               lastLocation = location;
  206               MessageId id = ack.getLastMessageId();
  207               Message message = messages.remove(id);
  208               if (message == null) {
  209                   messageAcks.add(ack);
  210               } else {
  211                   message.decrementReferenceCount();
  212               }
  213           }
  214       }
  215   
  216       public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
  217           try {
  218               // Only remove the message if it has not already been removed.
  219               Message t = longTermStore.getMessage(messageAck.getLastMessageId());
  220               if (t != null) {
  221                   longTermStore.removeMessage(context, messageAck);
  222               }
  223           } catch (Throwable e) {
  224               LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
  225           }
  226       }
  227   
  228       /**
  229        * @return
  230        * @throws IOException
  231        */
  232       public RecordLocation checkpoint() throws IOException {
  233           return checkpoint(null);
  234       }
  235   
  236       /**
  237        * @return
  238        * @throws IOException
  239        */
  240       @SuppressWarnings("unchecked")
  241       public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
  242   
  243           final List<MessageAck> cpRemovedMessageLocations;
  244           final List<RecordLocation> cpActiveJournalLocations;
  245           final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
  246   
  247           // swap out the message hash maps..
  248           synchronized (this) {
  249               cpAddedMessageIds = this.messages;
  250               cpRemovedMessageLocations = this.messageAcks;
  251   
  252               cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
  253   
  254               this.messages = new LinkedHashMap<MessageId, Message>();
  255               this.messageAcks = new ArrayList<MessageAck>();
  256           }
  257   
  258           transactionTemplate.run(new Callback() {
  259               public void execute() throws Exception {
  260   
  261                   int size = 0;
  262   
  263                   PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
  264                   ConnectionContext context = transactionTemplate.getContext();
  265   
  266                   // Checkpoint the added messages.
  267                   synchronized (JournalMessageStore.this) {
  268                       Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
  269                       while (iterator.hasNext()) {
  270                           Message message = iterator.next();
  271                           try {
  272                               longTermStore.addMessage(context, message);
  273                           } catch (Throwable e) {
  274                               LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
  275                           }
  276                           size += message.getSize();
  277                           message.decrementReferenceCount();
  278                           // Commit the batch if it's getting too big
  279                           if (size >= maxCheckpointMessageAddSize) {
  280                               persitanceAdapter.commitTransaction(context);
  281                               persitanceAdapter.beginTransaction(context);
  282                               size = 0;
  283                           }
  284                       }
  285                   }
  286   
  287                   persitanceAdapter.commitTransaction(context);
  288                   persitanceAdapter.beginTransaction(context);
  289   
  290                   // Checkpoint the removed messages.
  291                   Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
  292                   while (iterator.hasNext()) {
  293                       try {
  294                           MessageAck ack = iterator.next();
  295                           longTermStore.removeMessage(transactionTemplate.getContext(), ack);
  296                       } catch (Throwable e) {
  297                           LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
  298                       }
  299                   }
  300   
  301                   if (postCheckpointTest != null) {
  302                       postCheckpointTest.execute();
  303                   }
  304               }
  305   
  306           });
  307   
  308           synchronized (this) {
  309               cpAddedMessageIds = null;
  310           }
  311   
  312           if (cpActiveJournalLocations.size() > 0) {
  313               Collections.sort(cpActiveJournalLocations);
  314               return cpActiveJournalLocations.get(0);
  315           }
  316           synchronized (this) {
  317               return lastLocation;
  318           }
  319       }
  320   
  321       /**
  322        * 
  323        */
  324       public Message getMessage(MessageId identity) throws IOException {
  325           Message answer = null;
  326   
  327           synchronized (this) {
  328               // Do we have a still have it in the journal?
  329               answer = messages.get(identity);
  330               if (answer == null && cpAddedMessageIds != null) {
  331                   answer = cpAddedMessageIds.get(identity);
  332               }
  333           }
  334   
  335           if (answer != null) {
  336               return answer;
  337           }
  338   
  339           // If all else fails try the long term message store.
  340           return longTermStore.getMessage(identity);
  341       }
  342   
  343       /**
  344        * Replays the checkpointStore first as those messages are the oldest ones,
  345        * then messages are replayed from the transaction log and then the cache is
  346        * updated.
  347        * 
  348        * @param listener
  349        * @throws Exception
  350        */
  351       public void recover(final MessageRecoveryListener listener) throws Exception {
  352           peristenceAdapter.checkpoint(true, true);
  353           longTermStore.recover(listener);
  354       }
  355   
  356       public void start() throws Exception {
  357           if (this.memoryUsage != null) {
  358               this.memoryUsage.addUsageListener(peristenceAdapter);
  359           }
  360           longTermStore.start();
  361       }
  362   
  363       public void stop() throws Exception {
  364           longTermStore.stop();
  365           if (this.memoryUsage != null) {
  366               this.memoryUsage.removeUsageListener(peristenceAdapter);
  367           }
  368       }
  369   
  370       /**
  371        * @return Returns the longTermStore.
  372        */
  373       public MessageStore getLongTermMessageStore() {
  374           return longTermStore;
  375       }
  376   
  377       /**
  378        * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
  379        */
  380       public void removeAllMessages(ConnectionContext context) throws IOException {
  381           peristenceAdapter.checkpoint(true, true);
  382           longTermStore.removeAllMessages(context);
  383       }
  384   
  385       public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
  386           throw new IOException("The journal does not support message references.");
  387       }
  388   
  389       public String getMessageReference(MessageId identity) throws IOException {
  390           throw new IOException("The journal does not support message references.");
  391       }
  392   
  393       /**
  394        * @return
  395        * @throws IOException
  396        * @see org.apache.activemq.store.MessageStore#getMessageCount()
  397        */
  398       public int getMessageCount() throws IOException {
  399           peristenceAdapter.checkpoint(true, true);
  400           return longTermStore.getMessageCount();
  401       }
  402   
  403       public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
  404           peristenceAdapter.checkpoint(true, true);
  405           longTermStore.recoverNextMessages(maxReturned, listener);
  406   
  407       }
  408   
  409       public void resetBatching() {
  410           longTermStore.resetBatching();
  411   
  412       }
  413   
  414       @Override
  415       public void setBatch(MessageId messageId) throws Exception {
  416           peristenceAdapter.checkpoint(true, true);
  417           longTermStore.setBatch(messageId);
  418       }
  419   
  420   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]