Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.journal;
   18   
   19   import java.io.File;
   20   import java.io.IOException;
   21   import java.util.ArrayList;
   22   import java.util.HashSet;
   23   import java.util.Iterator;
   24   import java.util.Set;
   25   import java.util.concurrent.Callable;
   26   import java.util.concurrent.ConcurrentHashMap;
   27   import java.util.concurrent.CountDownLatch;
   28   import java.util.concurrent.FutureTask;
   29   import java.util.concurrent.LinkedBlockingQueue;
   30   import java.util.concurrent.ThreadFactory;
   31   import java.util.concurrent.ThreadPoolExecutor;
   32   import java.util.concurrent.TimeUnit;
   33   import java.util.concurrent.atomic.AtomicBoolean;
   34   
   35   import org.apache.activeio.journal.InvalidRecordLocationException;
   36   import org.apache.activeio.journal.Journal;
   37   import org.apache.activeio.journal.JournalEventListener;
   38   import org.apache.activeio.journal.RecordLocation;
   39   import org.apache.activeio.packet.ByteArrayPacket;
   40   import org.apache.activeio.packet.Packet;
   41   import org.apache.activemq.broker.BrokerService;
   42   import org.apache.activemq.broker.BrokerServiceAware;
   43   import org.apache.activemq.broker.ConnectionContext;
   44   import org.apache.activemq.command.ActiveMQDestination;
   45   import org.apache.activemq.command.ActiveMQQueue;
   46   import org.apache.activemq.command.ActiveMQTopic;
   47   import org.apache.activemq.command.DataStructure;
   48   import org.apache.activemq.command.JournalQueueAck;
   49   import org.apache.activemq.command.JournalTopicAck;
   50   import org.apache.activemq.command.JournalTrace;
   51   import org.apache.activemq.command.JournalTransaction;
   52   import org.apache.activemq.command.Message;
   53   import org.apache.activemq.command.MessageAck;
   54   import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
   55   import org.apache.activemq.openwire.OpenWireFormat;
   56   import org.apache.activemq.store.MessageStore;
   57   import org.apache.activemq.store.PersistenceAdapter;
   58   import org.apache.activemq.store.TopicMessageStore;
   59   import org.apache.activemq.store.TransactionStore;
   60   import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
   61   import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
   62   import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
   63   import org.apache.activemq.thread.Scheduler;
   64   import org.apache.activemq.thread.Task;
   65   import org.apache.activemq.thread.TaskRunner;
   66   import org.apache.activemq.thread.TaskRunnerFactory;
   67   import org.apache.activemq.usage.Usage;
   68   import org.apache.activemq.usage.UsageListener;
   69   import org.apache.activemq.usage.SystemUsage;
   70   import org.apache.activemq.util.ByteSequence;
   71   import org.apache.activemq.util.IOExceptionSupport;
   72   import org.apache.activemq.wireformat.WireFormat;
   73   import org.apache.commons.logging.Log;
   74   import org.apache.commons.logging.LogFactory;
   75   
   76   /**
   77    * An implementation of {@link PersistenceAdapter} designed for use with a
   78    * {@link Journal} and then check pointing asynchronously on a timeout with some
   79    * other long term persistent storage.
   80    * 
   81    * @org.apache.xbean.XBean
   82    * @version $Revision: 1.17 $
   83    */
   84   public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
   85   
   86       private BrokerService brokerService;
   87   	
   88       protected static final Scheduler scheduler = Scheduler.getInstance();
   89       private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
   90   
   91       private Journal journal;
   92       private PersistenceAdapter longTermPersistence;
   93   
   94       private final WireFormat wireFormat = new OpenWireFormat();
   95   
   96       private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
   97       private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
   98   
   99       private SystemUsage usageManager;
  100       private long checkpointInterval = 1000 * 60 * 5;
  101       private long lastCheckpointRequest = System.currentTimeMillis();
  102       private long lastCleanup = System.currentTimeMillis();
  103       private int maxCheckpointWorkers = 10;
  104       private int maxCheckpointMessageAddSize = 1024 * 1024;
  105   
  106       private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
  107       private ThreadPoolExecutor checkpointExecutor;
  108   
  109       private TaskRunner checkpointTask;
  110       private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
  111       private boolean fullCheckPoint;
  112   
  113       private AtomicBoolean started = new AtomicBoolean(false);
  114   
  115       private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
  116   
  117       private TaskRunnerFactory taskRunnerFactory;
  118   
  119       public JournalPersistenceAdapter() {        
  120       }
  121       
  122       public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
  123           setJournal(journal);
  124           setTaskRunnerFactory(taskRunnerFactory);
  125           setPersistenceAdapter(longTermPersistence);
  126       }
  127   
  128       public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
  129           this.taskRunnerFactory = taskRunnerFactory;
  130       }
  131   
  132       public void setJournal(Journal journal) {
  133           this.journal = journal;
  134           journal.setJournalEventListener(this);
  135       }
  136       
  137       public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
  138           this.longTermPersistence = longTermPersistence;
  139       }
  140       
  141       final Runnable createPeriodicCheckpointTask() {
  142           return new Runnable() {
  143               public void run() {
  144                   long lastTime = 0;
  145                   synchronized (this) {
  146                       lastTime = lastCheckpointRequest;
  147                   }
  148                   if (System.currentTimeMillis() > lastTime + checkpointInterval) {
  149                       checkpoint(false, true);
  150                   }
  151               }
  152           };
  153       }
  154   
  155       /**
  156        * @param usageManager The UsageManager that is controlling the
  157        *                destination's memory usage.
  158        */
  159       public void setUsageManager(SystemUsage usageManager) {
  160           this.usageManager = usageManager;
  161           longTermPersistence.setUsageManager(usageManager);
  162       }
  163   
  164       public Set<ActiveMQDestination> getDestinations() {
  165           Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
  166           destinations.addAll(queues.keySet());
  167           destinations.addAll(topics.keySet());
  168           return destinations;
  169       }
  170   
  171       private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
  172           if (destination.isQueue()) {
  173               return createQueueMessageStore((ActiveMQQueue)destination);
  174           } else {
  175               return createTopicMessageStore((ActiveMQTopic)destination);
  176           }
  177       }
  178   
  179       public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
  180           JournalMessageStore store = queues.get(destination);
  181           if (store == null) {
  182               MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
  183               store = new JournalMessageStore(this, checkpointStore, destination);
  184               queues.put(destination, store);
  185           }
  186           return store;
  187       }
  188   
  189       public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
  190           JournalTopicMessageStore store = topics.get(destinationName);
  191           if (store == null) {
  192               TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
  193               store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
  194               topics.put(destinationName, store);
  195           }
  196           return store;
  197       }
  198   
  199       /**
  200        * Cleanup method to remove any state associated with the given destination
  201        *
  202        * @param destination Destination to forget
  203        */
  204       public void removeQueueMessageStore(ActiveMQQueue destination) {
  205           queues.remove(destination);
  206       }
  207   
  208       /**
  209        * Cleanup method to remove any state associated with the given destination
  210        *
  211        * @param destination Destination to forget
  212        */
  213       public void removeTopicMessageStore(ActiveMQTopic destination) {
  214           topics.remove(destination);
  215       }
  216   
  217       public TransactionStore createTransactionStore() throws IOException {
  218           return transactionStore;
  219       }
  220   
  221       public long getLastMessageBrokerSequenceId() throws IOException {
  222           return longTermPersistence.getLastMessageBrokerSequenceId();
  223       }
  224   
  225       public void beginTransaction(ConnectionContext context) throws IOException {
  226           longTermPersistence.beginTransaction(context);
  227       }
  228   
  229       public void commitTransaction(ConnectionContext context) throws IOException {
  230           longTermPersistence.commitTransaction(context);
  231       }
  232   
  233       public void rollbackTransaction(ConnectionContext context) throws IOException {
  234           longTermPersistence.rollbackTransaction(context);
  235       }
  236   
  237       public synchronized void start() throws Exception {
  238           if (!started.compareAndSet(false, true)) {
  239               return;
  240           }
  241   
  242           checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
  243               public boolean iterate() {
  244                   return doCheckpoint();
  245               }
  246           }, "ActiveMQ Journal Checkpoint Worker");
  247   
  248           checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
  249               public Thread newThread(Runnable runable) {
  250                   Thread t = new Thread(runable, "Journal checkpoint worker");
  251                   t.setPriority(7);
  252                   return t;
  253               }
  254           });
  255           // checkpointExecutor.allowCoreThreadTimeOut(true);
  256   
  257           this.usageManager.getMemoryUsage().addUsageListener(this);
  258   
  259           if (longTermPersistence instanceof JDBCPersistenceAdapter) {
  260               // Disabled periodic clean up as it deadlocks with the checkpoint
  261               // operations.
  262               ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
  263           }
  264   
  265           longTermPersistence.start();
  266           createTransactionStore();
  267           recover();
  268   
  269           // Do a checkpoint periodically.
  270           scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
  271   
  272       }
  273   
  274       public void stop() throws Exception {
  275   
  276           this.usageManager.getMemoryUsage().removeUsageListener(this);
  277           if (!started.compareAndSet(true, false)) {
  278               return;
  279           }
  280   
  281           scheduler.cancel(periodicCheckpointTask);
  282   
  283           // Take one final checkpoint and stop checkpoint processing.
  284           checkpoint(true, true);
  285           checkpointTask.shutdown();
  286           checkpointExecutor.shutdown();
  287   
  288           queues.clear();
  289           topics.clear();
  290   
  291           IOException firstException = null;
  292           try {
  293               journal.close();
  294           } catch (Exception e) {
  295               firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
  296           }
  297           longTermPersistence.stop();
  298   
  299           if (firstException != null) {
  300               throw firstException;
  301           }
  302       }
  303   
  304       // Properties
  305       // -------------------------------------------------------------------------
  306       public PersistenceAdapter getLongTermPersistence() {
  307           return longTermPersistence;
  308       }
  309   
  310       /**
  311        * @return Returns the wireFormat.
  312        */
  313       public WireFormat getWireFormat() {
  314           return wireFormat;
  315       }
  316   
  317       // Implementation methods
  318       // -------------------------------------------------------------------------
  319   
  320       /**
  321        * The Journal give us a call back so that we can move old data out of the
  322        * journal. Taking a checkpoint does this for us.
  323        * 
  324        * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
  325        */
  326       public void overflowNotification(RecordLocation safeLocation) {
  327           checkpoint(false, true);
  328       }
  329   
  330       /**
  331        * When we checkpoint we move all the journalled data to long term storage.
  332        * 
  333        * @param stopping
  334        * @param b
  335        */
  336       public void checkpoint(boolean sync, boolean fullCheckpoint) {
  337           try {
  338               if (journal == null) {
  339                   throw new IllegalStateException("Journal is closed.");
  340               }
  341   
  342               long now = System.currentTimeMillis();
  343               CountDownLatch latch = null;
  344               synchronized (this) {
  345                   latch = nextCheckpointCountDownLatch;
  346                   lastCheckpointRequest = now;
  347                   if (fullCheckpoint) {
  348                       this.fullCheckPoint = true;
  349                   }
  350               }
  351   
  352               checkpointTask.wakeup();
  353   
  354               if (sync) {
  355                   LOG.debug("Waking for checkpoint to complete.");
  356                   latch.await();
  357               }
  358           } catch (InterruptedException e) {
  359               Thread.currentThread().interrupt();
  360               LOG.warn("Request to start checkpoint failed: " + e, e);
  361           }
  362       }
  363   
  364       public void checkpoint(boolean sync) {
  365           checkpoint(sync, sync);
  366       }
  367   
  368       /**
  369        * This does the actual checkpoint.
  370        * 
  371        * @return
  372        */
  373       public boolean doCheckpoint() {
  374           CountDownLatch latch = null;
  375           boolean fullCheckpoint;
  376           synchronized (this) {
  377               latch = nextCheckpointCountDownLatch;
  378               nextCheckpointCountDownLatch = new CountDownLatch(1);
  379               fullCheckpoint = this.fullCheckPoint;
  380               this.fullCheckPoint = false;
  381           }
  382           try {
  383   
  384               LOG.debug("Checkpoint started.");
  385               RecordLocation newMark = null;
  386   
  387               ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
  388   
  389               //
  390               // We do many partial checkpoints (fullCheckpoint==false) to move
  391               // topic messages
  392               // to long term store as soon as possible.
  393               // 
  394               // We want to avoid doing that for queue messages since removes the
  395               // come in the same
  396               // checkpoint cycle will nullify the previous message add.
  397               // Therefore, we only
  398               // checkpoint queues on the fullCheckpoint cycles.
  399               //
  400               if (fullCheckpoint) {
  401                   Iterator<JournalMessageStore> iterator = queues.values().iterator();
  402                   while (iterator.hasNext()) {
  403                       try {
  404                           final JournalMessageStore ms = iterator.next();
  405                           FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
  406                               public RecordLocation call() throws Exception {
  407                                   return ms.checkpoint();
  408                               }
  409                           });
  410                           futureTasks.add(task);
  411                           checkpointExecutor.execute(task);
  412                       } catch (Exception e) {
  413                           LOG.error("Failed to checkpoint a message store: " + e, e);
  414                       }
  415                   }
  416               }
  417   
  418               Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
  419               while (iterator.hasNext()) {
  420                   try {
  421                       final JournalTopicMessageStore ms = iterator.next();
  422                       FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
  423                           public RecordLocation call() throws Exception {
  424                               return ms.checkpoint();
  425                           }
  426                       });
  427                       futureTasks.add(task);
  428                       checkpointExecutor.execute(task);
  429                   } catch (Exception e) {
  430                       LOG.error("Failed to checkpoint a message store: " + e, e);
  431                   }
  432               }
  433   
  434               try {
  435                   for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
  436                       FutureTask<RecordLocation> ft = iter.next();
  437                       RecordLocation mark = ft.get();
  438                       // We only set a newMark on full checkpoints.
  439                       if (fullCheckpoint) {
  440                           if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
  441                               newMark = mark;
  442                           }
  443                       }
  444                   }
  445               } catch (Throwable e) {
  446                   LOG.error("Failed to checkpoint a message store: " + e, e);
  447               }
  448   
  449               if (fullCheckpoint) {
  450                   try {
  451                       if (newMark != null) {
  452                           LOG.debug("Marking journal at: " + newMark);
  453                           journal.setMark(newMark, true);
  454                       }
  455                   } catch (Exception e) {
  456                       LOG.error("Failed to mark the Journal: " + e, e);
  457                   }
  458   
  459                   if (longTermPersistence instanceof JDBCPersistenceAdapter) {
  460                       // We may be check pointing more often than the
  461                       // checkpointInterval if under high use
  462                       // But we don't want to clean up the db that often.
  463                       long now = System.currentTimeMillis();
  464                       if (now > lastCleanup + checkpointInterval) {
  465                           lastCleanup = now;
  466                           ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
  467                       }
  468                   }
  469               }
  470   
  471               LOG.debug("Checkpoint done.");
  472           } finally {
  473               latch.countDown();
  474           }
  475           synchronized (this) {
  476               return this.fullCheckPoint;
  477           }
  478   
  479       }
  480   
  481       /**
  482        * @param location
  483        * @return
  484        * @throws IOException
  485        */
  486       public DataStructure readCommand(RecordLocation location) throws IOException {
  487           try {
  488               Packet packet = journal.read(location);
  489               return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
  490           } catch (InvalidRecordLocationException e) {
  491               throw createReadException(location, e);
  492           } catch (IOException e) {
  493               throw createReadException(location, e);
  494           }
  495       }
  496   
  497       /**
  498        * Move all the messages that were in the journal into long term storage. We
  499        * just replay and do a checkpoint.
  500        * 
  501        * @throws IOException
  502        * @throws IOException
  503        * @throws InvalidRecordLocationException
  504        * @throws IllegalStateException
  505        */
  506       private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
  507   
  508           RecordLocation pos = null;
  509           int transactionCounter = 0;
  510   
  511           LOG.info("Journal Recovery Started from: " + journal);
  512           ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
  513   
  514           // While we have records in the journal.
  515           while ((pos = journal.getNextRecordLocation(pos)) != null) {
  516               Packet data = journal.read(pos);
  517               DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
  518   
  519               if (c instanceof Message) {
  520                   Message message = (Message)c;
  521                   JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
  522                   if (message.isInTransaction()) {
  523                       transactionStore.addMessage(store, message, pos);
  524                   } else {
  525                       store.replayAddMessage(context, message);
  526                       transactionCounter++;
  527                   }
  528               } else {
  529                   switch (c.getDataStructureType()) {
  530                   case JournalQueueAck.DATA_STRUCTURE_TYPE: {
  531                       JournalQueueAck command = (JournalQueueAck)c;
  532                       JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
  533                       if (command.getMessageAck().isInTransaction()) {
  534                           transactionStore.removeMessage(store, command.getMessageAck(), pos);
  535                       } else {
  536                           store.replayRemoveMessage(context, command.getMessageAck());
  537                           transactionCounter++;
  538                       }
  539                   }
  540                       break;
  541                   case JournalTopicAck.DATA_STRUCTURE_TYPE: {
  542                       JournalTopicAck command = (JournalTopicAck)c;
  543                       JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
  544                       if (command.getTransactionId() != null) {
  545                           transactionStore.acknowledge(store, command, pos);
  546                       } else {
  547                           store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
  548                           transactionCounter++;
  549                       }
  550                   }
  551                       break;
  552                   case JournalTransaction.DATA_STRUCTURE_TYPE: {
  553                       JournalTransaction command = (JournalTransaction)c;
  554                       try {
  555                           // Try to replay the packet.
  556                           switch (command.getType()) {
  557                           case JournalTransaction.XA_PREPARE:
  558                               transactionStore.replayPrepare(command.getTransactionId());
  559                               break;
  560                           case JournalTransaction.XA_COMMIT:
  561                           case JournalTransaction.LOCAL_COMMIT:
  562                               Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
  563                               if (tx == null) {
  564                                   break; // We may be trying to replay a commit
  565                               }
  566                               // that
  567                               // was already committed.
  568   
  569                               // Replay the committed operations.
  570                               tx.getOperations();
  571                               for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
  572                                   TxOperation op = (TxOperation)iter.next();
  573                                   if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
  574                                       op.store.replayAddMessage(context, (Message)op.data);
  575                                   }
  576                                   if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
  577                                       op.store.replayRemoveMessage(context, (MessageAck)op.data);
  578                                   }
  579                                   if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
  580                                       JournalTopicAck ack = (JournalTopicAck)op.data;
  581                                       ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
  582                                   }
  583                               }
  584                               transactionCounter++;
  585                               break;
  586                           case JournalTransaction.LOCAL_ROLLBACK:
  587                           case JournalTransaction.XA_ROLLBACK:
  588                               transactionStore.replayRollback(command.getTransactionId());
  589                               break;
  590                           default:
  591                               throw new IOException("Invalid journal command type: " + command.getType());
  592                           }
  593                       } catch (IOException e) {
  594                           LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
  595                       }
  596                   }
  597                       break;
  598                   case JournalTrace.DATA_STRUCTURE_TYPE:
  599                       JournalTrace trace = (JournalTrace)c;
  600                       LOG.debug("TRACE Entry: " + trace.getMessage());
  601                       break;
  602                   default:
  603                       LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
  604                   }
  605               }
  606           }
  607   
  608           RecordLocation location = writeTraceMessage("RECOVERED", true);
  609           journal.setMark(location, true);
  610   
  611           LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
  612       }
  613   
  614       private IOException createReadException(RecordLocation location, Exception e) {
  615           return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
  616       }
  617   
  618       protected IOException createWriteException(DataStructure packet, Exception e) {
  619           return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
  620       }
  621   
  622       protected IOException createWriteException(String command, Exception e) {
  623           return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
  624       }
  625   
  626       protected IOException createRecoveryFailedException(Exception e) {
  627           return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
  628       }
  629   
  630       /**
  631        * @param command
  632        * @param sync
  633        * @return
  634        * @throws IOException
  635        */
  636       public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
  637           if (started.get()) {
  638               try {
  639           	    return journal.write(toPacket(wireFormat.marshal(command)), sync);
  640               } catch (IOException ioe) {
  641           	    LOG.error("Cannot write to the journal", ioe);
  642           	    brokerService.handleIOException(ioe);
  643           	    throw ioe;
  644               }
  645           }
  646           throw new IOException("closed");
  647       }
  648   
  649       private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
  650           JournalTrace trace = new JournalTrace();
  651           trace.setMessage(message);
  652           return writeCommand(trace, sync);
  653       }
  654   
  655       public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
  656           newPercentUsage = (newPercentUsage / 10) * 10;
  657           oldPercentUsage = (oldPercentUsage / 10) * 10;
  658           if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
  659               boolean sync = newPercentUsage >= 90;
  660               checkpoint(sync, true);
  661           }
  662       }
  663   
  664       public JournalTransactionStore getTransactionStore() {
  665           return transactionStore;
  666       }
  667   
  668       public void deleteAllMessages() throws IOException {
  669           try {
  670               JournalTrace trace = new JournalTrace();
  671               trace.setMessage("DELETED");
  672               RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
  673               journal.setMark(location, true);
  674               LOG.info("Journal deleted: ");
  675           } catch (IOException e) {
  676               throw e;
  677           } catch (Throwable e) {
  678               throw IOExceptionSupport.create(e);
  679           }
  680           longTermPersistence.deleteAllMessages();
  681       }
  682   
  683       public SystemUsage getUsageManager() {
  684           return usageManager;
  685       }
  686   
  687       public int getMaxCheckpointMessageAddSize() {
  688           return maxCheckpointMessageAddSize;
  689       }
  690   
  691       public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
  692           this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
  693       }
  694   
  695       public int getMaxCheckpointWorkers() {
  696           return maxCheckpointWorkers;
  697       }
  698   
  699       public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
  700           this.maxCheckpointWorkers = maxCheckpointWorkers;
  701       }
  702   
  703       public boolean isUseExternalMessageReferences() {
  704           return false;
  705       }
  706   
  707       public void setUseExternalMessageReferences(boolean enable) {
  708           if (enable) {
  709               throw new IllegalArgumentException("The journal does not support message references.");
  710           }
  711       }
  712   
  713       public Packet toPacket(ByteSequence sequence) {
  714           return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
  715       }
  716   
  717       public ByteSequence toByteSequence(Packet packet) {
  718           org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
  719           return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
  720       }
  721   
  722       public void setBrokerName(String brokerName) {
  723           longTermPersistence.setBrokerName(brokerName);
  724       }
  725   
  726       public String toString() {
  727           return "JournalPersistenceAdapator(" + longTermPersistence + ")";
  728       }
  729   
  730       public void setDirectory(File dir) {
  731       }
  732       
  733       public long size(){
  734           return 0;
  735       }
  736   
  737       public void setBrokerService(BrokerService brokerService) {
  738           this.brokerService = brokerService;
  739           PersistenceAdapter pa = getLongTermPersistence();
  740           if( pa instanceof BrokerServiceAware ) {
  741               ((BrokerServiceAware)pa).setBrokerService(brokerService);
  742           }
  743       }
  744   
  745   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » journal » [javadoc | source]