1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.store.journal; 18 19 import java.io.File; 20 import java.io.IOException; 21 import java.util.ArrayList; 22 import java.util.HashSet; 23 import java.util.Iterator; 24 import java.util.Set; 25 import java.util.concurrent.Callable; 26 import java.util.concurrent.ConcurrentHashMap; 27 import java.util.concurrent.CountDownLatch; 28 import java.util.concurrent.FutureTask; 29 import java.util.concurrent.LinkedBlockingQueue; 30 import java.util.concurrent.ThreadFactory; 31 import java.util.concurrent.ThreadPoolExecutor; 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 35 import org.apache.activeio.journal.InvalidRecordLocationException; 36 import org.apache.activeio.journal.Journal; 37 import org.apache.activeio.journal.JournalEventListener; 38 import org.apache.activeio.journal.RecordLocation; 39 import org.apache.activeio.packet.ByteArrayPacket; 40 import org.apache.activeio.packet.Packet; 41 import org.apache.activemq.broker.BrokerService; 42 import org.apache.activemq.broker.BrokerServiceAware; 43 import org.apache.activemq.broker.ConnectionContext; 44 import org.apache.activemq.command.ActiveMQDestination; 45 import org.apache.activemq.command.ActiveMQQueue; 46 import org.apache.activemq.command.ActiveMQTopic; 47 import org.apache.activemq.command.DataStructure; 48 import org.apache.activemq.command.JournalQueueAck; 49 import org.apache.activemq.command.JournalTopicAck; 50 import org.apache.activemq.command.JournalTrace; 51 import org.apache.activemq.command.JournalTransaction; 52 import org.apache.activemq.command.Message; 53 import org.apache.activemq.command.MessageAck; 54 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 55 import org.apache.activemq.openwire.OpenWireFormat; 56 import org.apache.activemq.store.MessageStore; 57 import org.apache.activemq.store.PersistenceAdapter; 58 import org.apache.activemq.store.TopicMessageStore; 59 import org.apache.activemq.store.TransactionStore; 60 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 61 import org.apache.activemq.store.journal.JournalTransactionStore.Tx; 62 import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation; 63 import org.apache.activemq.thread.Scheduler; 64 import org.apache.activemq.thread.Task; 65 import org.apache.activemq.thread.TaskRunner; 66 import org.apache.activemq.thread.TaskRunnerFactory; 67 import org.apache.activemq.usage.Usage; 68 import org.apache.activemq.usage.UsageListener; 69 import org.apache.activemq.usage.SystemUsage; 70 import org.apache.activemq.util.ByteSequence; 71 import org.apache.activemq.util.IOExceptionSupport; 72 import org.apache.activemq.wireformat.WireFormat; 73 import org.apache.commons.logging.Log; 74 import org.apache.commons.logging.LogFactory; 75 76 /** 77 * An implementation of {@link PersistenceAdapter} designed for use with a 78 * {@link Journal} and then check pointing asynchronously on a timeout with some 79 * other long term persistent storage. 80 * 81 * @org.apache.xbean.XBean 82 * @version $Revision: 1.17 $ 83 */ 84 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { 85 86 private BrokerService brokerService; 87 88 protected static final Scheduler scheduler = Scheduler.getInstance(); 89 private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class); 90 91 private Journal journal; 92 private PersistenceAdapter longTermPersistence; 93 94 private final WireFormat wireFormat = new OpenWireFormat(); 95 96 private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>(); 97 private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>(); 98 99 private SystemUsage usageManager; 100 private long checkpointInterval = 1000 * 60 * 5; 101 private long lastCheckpointRequest = System.currentTimeMillis(); 102 private long lastCleanup = System.currentTimeMillis(); 103 private int maxCheckpointWorkers = 10; 104 private int maxCheckpointMessageAddSize = 1024 * 1024; 105 106 private JournalTransactionStore transactionStore = new JournalTransactionStore(this); 107 private ThreadPoolExecutor checkpointExecutor; 108 109 private TaskRunner checkpointTask; 110 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1); 111 private boolean fullCheckPoint; 112 113 private AtomicBoolean started = new AtomicBoolean(false); 114 115 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask(); 116 117 private TaskRunnerFactory taskRunnerFactory; 118 119 public JournalPersistenceAdapter() { 120 } 121 122 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { 123 setJournal(journal); 124 setTaskRunnerFactory(taskRunnerFactory); 125 setPersistenceAdapter(longTermPersistence); 126 } 127 128 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 129 this.taskRunnerFactory = taskRunnerFactory; 130 } 131 132 public void setJournal(Journal journal) { 133 this.journal = journal; 134 journal.setJournalEventListener(this); 135 } 136 137 public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) { 138 this.longTermPersistence = longTermPersistence; 139 } 140 141 final Runnable createPeriodicCheckpointTask() { 142 return new Runnable() { 143 public void run() { 144 long lastTime = 0; 145 synchronized (this) { 146 lastTime = lastCheckpointRequest; 147 } 148 if (System.currentTimeMillis() > lastTime + checkpointInterval) { 149 checkpoint(false, true); 150 } 151 } 152 }; 153 } 154 155 /** 156 * @param usageManager The UsageManager that is controlling the 157 * destination's memory usage. 158 */ 159 public void setUsageManager(SystemUsage usageManager) { 160 this.usageManager = usageManager; 161 longTermPersistence.setUsageManager(usageManager); 162 } 163 164 public Set<ActiveMQDestination> getDestinations() { 165 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations()); 166 destinations.addAll(queues.keySet()); 167 destinations.addAll(topics.keySet()); 168 return destinations; 169 } 170 171 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException { 172 if (destination.isQueue()) { 173 return createQueueMessageStore((ActiveMQQueue)destination); 174 } else { 175 return createTopicMessageStore((ActiveMQTopic)destination); 176 } 177 } 178 179 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 180 JournalMessageStore store = queues.get(destination); 181 if (store == null) { 182 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination); 183 store = new JournalMessageStore(this, checkpointStore, destination); 184 queues.put(destination, store); 185 } 186 return store; 187 } 188 189 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { 190 JournalTopicMessageStore store = topics.get(destinationName); 191 if (store == null) { 192 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName); 193 store = new JournalTopicMessageStore(this, checkpointStore, destinationName); 194 topics.put(destinationName, store); 195 } 196 return store; 197 } 198 199 /** 200 * Cleanup method to remove any state associated with the given destination 201 * 202 * @param destination Destination to forget 203 */ 204 public void removeQueueMessageStore(ActiveMQQueue destination) { 205 queues.remove(destination); 206 } 207 208 /** 209 * Cleanup method to remove any state associated with the given destination 210 * 211 * @param destination Destination to forget 212 */ 213 public void removeTopicMessageStore(ActiveMQTopic destination) { 214 topics.remove(destination); 215 } 216 217 public TransactionStore createTransactionStore() throws IOException { 218 return transactionStore; 219 } 220 221 public long getLastMessageBrokerSequenceId() throws IOException { 222 return longTermPersistence.getLastMessageBrokerSequenceId(); 223 } 224 225 public void beginTransaction(ConnectionContext context) throws IOException { 226 longTermPersistence.beginTransaction(context); 227 } 228 229 public void commitTransaction(ConnectionContext context) throws IOException { 230 longTermPersistence.commitTransaction(context); 231 } 232 233 public void rollbackTransaction(ConnectionContext context) throws IOException { 234 longTermPersistence.rollbackTransaction(context); 235 } 236 237 public synchronized void start() throws Exception { 238 if (!started.compareAndSet(false, true)) { 239 return; 240 } 241 242 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { 243 public boolean iterate() { 244 return doCheckpoint(); 245 } 246 }, "ActiveMQ Journal Checkpoint Worker"); 247 248 checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 249 public Thread newThread(Runnable runable) { 250 Thread t = new Thread(runable, "Journal checkpoint worker"); 251 t.setPriority(7); 252 return t; 253 } 254 }); 255 // checkpointExecutor.allowCoreThreadTimeOut(true); 256 257 this.usageManager.getMemoryUsage().addUsageListener(this); 258 259 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 260 // Disabled periodic clean up as it deadlocks with the checkpoint 261 // operations. 262 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0); 263 } 264 265 longTermPersistence.start(); 266 createTransactionStore(); 267 recover(); 268 269 // Do a checkpoint periodically. 270 scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10); 271 272 } 273 274 public void stop() throws Exception { 275 276 this.usageManager.getMemoryUsage().removeUsageListener(this); 277 if (!started.compareAndSet(true, false)) { 278 return; 279 } 280 281 scheduler.cancel(periodicCheckpointTask); 282 283 // Take one final checkpoint and stop checkpoint processing. 284 checkpoint(true, true); 285 checkpointTask.shutdown(); 286 checkpointExecutor.shutdown(); 287 288 queues.clear(); 289 topics.clear(); 290 291 IOException firstException = null; 292 try { 293 journal.close(); 294 } catch (Exception e) { 295 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e); 296 } 297 longTermPersistence.stop(); 298 299 if (firstException != null) { 300 throw firstException; 301 } 302 } 303 304 // Properties 305 // ------------------------------------------------------------------------- 306 public PersistenceAdapter getLongTermPersistence() { 307 return longTermPersistence; 308 } 309 310 /** 311 * @return Returns the wireFormat. 312 */ 313 public WireFormat getWireFormat() { 314 return wireFormat; 315 } 316 317 // Implementation methods 318 // ------------------------------------------------------------------------- 319 320 /** 321 * The Journal give us a call back so that we can move old data out of the 322 * journal. Taking a checkpoint does this for us. 323 * 324 * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) 325 */ 326 public void overflowNotification(RecordLocation safeLocation) { 327 checkpoint(false, true); 328 } 329 330 /** 331 * When we checkpoint we move all the journalled data to long term storage. 332 * 333 * @param stopping 334 * @param b 335 */ 336 public void checkpoint(boolean sync, boolean fullCheckpoint) { 337 try { 338 if (journal == null) { 339 throw new IllegalStateException("Journal is closed."); 340 } 341 342 long now = System.currentTimeMillis(); 343 CountDownLatch latch = null; 344 synchronized (this) { 345 latch = nextCheckpointCountDownLatch; 346 lastCheckpointRequest = now; 347 if (fullCheckpoint) { 348 this.fullCheckPoint = true; 349 } 350 } 351 352 checkpointTask.wakeup(); 353 354 if (sync) { 355 LOG.debug("Waking for checkpoint to complete."); 356 latch.await(); 357 } 358 } catch (InterruptedException e) { 359 Thread.currentThread().interrupt(); 360 LOG.warn("Request to start checkpoint failed: " + e, e); 361 } 362 } 363 364 public void checkpoint(boolean sync) { 365 checkpoint(sync, sync); 366 } 367 368 /** 369 * This does the actual checkpoint. 370 * 371 * @return 372 */ 373 public boolean doCheckpoint() { 374 CountDownLatch latch = null; 375 boolean fullCheckpoint; 376 synchronized (this) { 377 latch = nextCheckpointCountDownLatch; 378 nextCheckpointCountDownLatch = new CountDownLatch(1); 379 fullCheckpoint = this.fullCheckPoint; 380 this.fullCheckPoint = false; 381 } 382 try { 383 384 LOG.debug("Checkpoint started."); 385 RecordLocation newMark = null; 386 387 ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size()); 388 389 // 390 // We do many partial checkpoints (fullCheckpoint==false) to move 391 // topic messages 392 // to long term store as soon as possible. 393 // 394 // We want to avoid doing that for queue messages since removes the 395 // come in the same 396 // checkpoint cycle will nullify the previous message add. 397 // Therefore, we only 398 // checkpoint queues on the fullCheckpoint cycles. 399 // 400 if (fullCheckpoint) { 401 Iterator<JournalMessageStore> iterator = queues.values().iterator(); 402 while (iterator.hasNext()) { 403 try { 404 final JournalMessageStore ms = iterator.next(); 405 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 406 public RecordLocation call() throws Exception { 407 return ms.checkpoint(); 408 } 409 }); 410 futureTasks.add(task); 411 checkpointExecutor.execute(task); 412 } catch (Exception e) { 413 LOG.error("Failed to checkpoint a message store: " + e, e); 414 } 415 } 416 } 417 418 Iterator<JournalTopicMessageStore> iterator = topics.values().iterator(); 419 while (iterator.hasNext()) { 420 try { 421 final JournalTopicMessageStore ms = iterator.next(); 422 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { 423 public RecordLocation call() throws Exception { 424 return ms.checkpoint(); 425 } 426 }); 427 futureTasks.add(task); 428 checkpointExecutor.execute(task); 429 } catch (Exception e) { 430 LOG.error("Failed to checkpoint a message store: " + e, e); 431 } 432 } 433 434 try { 435 for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) { 436 FutureTask<RecordLocation> ft = iter.next(); 437 RecordLocation mark = ft.get(); 438 // We only set a newMark on full checkpoints. 439 if (fullCheckpoint) { 440 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) { 441 newMark = mark; 442 } 443 } 444 } 445 } catch (Throwable e) { 446 LOG.error("Failed to checkpoint a message store: " + e, e); 447 } 448 449 if (fullCheckpoint) { 450 try { 451 if (newMark != null) { 452 LOG.debug("Marking journal at: " + newMark); 453 journal.setMark(newMark, true); 454 } 455 } catch (Exception e) { 456 LOG.error("Failed to mark the Journal: " + e, e); 457 } 458 459 if (longTermPersistence instanceof JDBCPersistenceAdapter) { 460 // We may be check pointing more often than the 461 // checkpointInterval if under high use 462 // But we don't want to clean up the db that often. 463 long now = System.currentTimeMillis(); 464 if (now > lastCleanup + checkpointInterval) { 465 lastCleanup = now; 466 ((JDBCPersistenceAdapter)longTermPersistence).cleanup(); 467 } 468 } 469 } 470 471 LOG.debug("Checkpoint done."); 472 } finally { 473 latch.countDown(); 474 } 475 synchronized (this) { 476 return this.fullCheckPoint; 477 } 478 479 } 480 481 /** 482 * @param location 483 * @return 484 * @throws IOException 485 */ 486 public DataStructure readCommand(RecordLocation location) throws IOException { 487 try { 488 Packet packet = journal.read(location); 489 return (DataStructure)wireFormat.unmarshal(toByteSequence(packet)); 490 } catch (InvalidRecordLocationException e) { 491 throw createReadException(location, e); 492 } catch (IOException e) { 493 throw createReadException(location, e); 494 } 495 } 496 497 /** 498 * Move all the messages that were in the journal into long term storage. We 499 * just replay and do a checkpoint. 500 * 501 * @throws IOException 502 * @throws IOException 503 * @throws InvalidRecordLocationException 504 * @throws IllegalStateException 505 */ 506 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException { 507 508 RecordLocation pos = null; 509 int transactionCounter = 0; 510 511 LOG.info("Journal Recovery Started from: " + journal); 512 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 513 514 // While we have records in the journal. 515 while ((pos = journal.getNextRecordLocation(pos)) != null) { 516 Packet data = journal.read(pos); 517 DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data)); 518 519 if (c instanceof Message) { 520 Message message = (Message)c; 521 JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination()); 522 if (message.isInTransaction()) { 523 transactionStore.addMessage(store, message, pos); 524 } else { 525 store.replayAddMessage(context, message); 526 transactionCounter++; 527 } 528 } else { 529 switch (c.getDataStructureType()) { 530 case JournalQueueAck.DATA_STRUCTURE_TYPE: { 531 JournalQueueAck command = (JournalQueueAck)c; 532 JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination()); 533 if (command.getMessageAck().isInTransaction()) { 534 transactionStore.removeMessage(store, command.getMessageAck(), pos); 535 } else { 536 store.replayRemoveMessage(context, command.getMessageAck()); 537 transactionCounter++; 538 } 539 } 540 break; 541 case JournalTopicAck.DATA_STRUCTURE_TYPE: { 542 JournalTopicAck command = (JournalTopicAck)c; 543 JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination()); 544 if (command.getTransactionId() != null) { 545 transactionStore.acknowledge(store, command, pos); 546 } else { 547 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()); 548 transactionCounter++; 549 } 550 } 551 break; 552 case JournalTransaction.DATA_STRUCTURE_TYPE: { 553 JournalTransaction command = (JournalTransaction)c; 554 try { 555 // Try to replay the packet. 556 switch (command.getType()) { 557 case JournalTransaction.XA_PREPARE: 558 transactionStore.replayPrepare(command.getTransactionId()); 559 break; 560 case JournalTransaction.XA_COMMIT: 561 case JournalTransaction.LOCAL_COMMIT: 562 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared()); 563 if (tx == null) { 564 break; // We may be trying to replay a commit 565 } 566 // that 567 // was already committed. 568 569 // Replay the committed operations. 570 tx.getOperations(); 571 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) { 572 TxOperation op = (TxOperation)iter.next(); 573 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) { 574 op.store.replayAddMessage(context, (Message)op.data); 575 } 576 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) { 577 op.store.replayRemoveMessage(context, (MessageAck)op.data); 578 } 579 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) { 580 JournalTopicAck ack = (JournalTopicAck)op.data; 581 ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()); 582 } 583 } 584 transactionCounter++; 585 break; 586 case JournalTransaction.LOCAL_ROLLBACK: 587 case JournalTransaction.XA_ROLLBACK: 588 transactionStore.replayRollback(command.getTransactionId()); 589 break; 590 default: 591 throw new IOException("Invalid journal command type: " + command.getType()); 592 } 593 } catch (IOException e) { 594 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e); 595 } 596 } 597 break; 598 case JournalTrace.DATA_STRUCTURE_TYPE: 599 JournalTrace trace = (JournalTrace)c; 600 LOG.debug("TRACE Entry: " + trace.getMessage()); 601 break; 602 default: 603 LOG.error("Unknown type of record in transaction log which will be discarded: " + c); 604 } 605 } 606 } 607 608 RecordLocation location = writeTraceMessage("RECOVERED", true); 609 journal.setMark(location, true); 610 611 LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered."); 612 } 613 614 private IOException createReadException(RecordLocation location, Exception e) { 615 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e); 616 } 617 618 protected IOException createWriteException(DataStructure packet, Exception e) { 619 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e); 620 } 621 622 protected IOException createWriteException(String command, Exception e) { 623 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e); 624 } 625 626 protected IOException createRecoveryFailedException(Exception e) { 627 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e); 628 } 629 630 /** 631 * @param command 632 * @param sync 633 * @return 634 * @throws IOException 635 */ 636 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { 637 if (started.get()) { 638 try { 639 return journal.write(toPacket(wireFormat.marshal(command)), sync); 640 } catch (IOException ioe) { 641 LOG.error("Cannot write to the journal", ioe); 642 brokerService.handleIOException(ioe); 643 throw ioe; 644 } 645 } 646 throw new IOException("closed"); 647 } 648 649 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException { 650 JournalTrace trace = new JournalTrace(); 651 trace.setMessage(message); 652 return writeCommand(trace, sync); 653 } 654 655 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 656 newPercentUsage = (newPercentUsage / 10) * 10; 657 oldPercentUsage = (oldPercentUsage / 10) * 10; 658 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) { 659 boolean sync = newPercentUsage >= 90; 660 checkpoint(sync, true); 661 } 662 } 663 664 public JournalTransactionStore getTransactionStore() { 665 return transactionStore; 666 } 667 668 public void deleteAllMessages() throws IOException { 669 try { 670 JournalTrace trace = new JournalTrace(); 671 trace.setMessage("DELETED"); 672 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false); 673 journal.setMark(location, true); 674 LOG.info("Journal deleted: "); 675 } catch (IOException e) { 676 throw e; 677 } catch (Throwable e) { 678 throw IOExceptionSupport.create(e); 679 } 680 longTermPersistence.deleteAllMessages(); 681 } 682 683 public SystemUsage getUsageManager() { 684 return usageManager; 685 } 686 687 public int getMaxCheckpointMessageAddSize() { 688 return maxCheckpointMessageAddSize; 689 } 690 691 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) { 692 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize; 693 } 694 695 public int getMaxCheckpointWorkers() { 696 return maxCheckpointWorkers; 697 } 698 699 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) { 700 this.maxCheckpointWorkers = maxCheckpointWorkers; 701 } 702 703 public boolean isUseExternalMessageReferences() { 704 return false; 705 } 706 707 public void setUseExternalMessageReferences(boolean enable) { 708 if (enable) { 709 throw new IllegalArgumentException("The journal does not support message references."); 710 } 711 } 712 713 public Packet toPacket(ByteSequence sequence) { 714 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length)); 715 } 716 717 public ByteSequence toByteSequence(Packet packet) { 718 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence(); 719 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); 720 } 721 722 public void setBrokerName(String brokerName) { 723 longTermPersistence.setBrokerName(brokerName); 724 } 725 726 public String toString() { 727 return "JournalPersistenceAdapator(" + longTermPersistence + ")"; 728 } 729 730 public void setDirectory(File dir) { 731 } 732 733 public long size(){ 734 return 0; 735 } 736 737 public void setBrokerService(BrokerService brokerService) { 738 this.brokerService = brokerService; 739 PersistenceAdapter pa = getLongTermPersistence(); 740 if( pa instanceof BrokerServiceAware ) { 741 ((BrokerServiceAware)pa).setBrokerService(brokerService); 742 } 743 } 744 745 }