1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.store.journal; 18 19 import java.io.IOException; 20 import java.util.ArrayList; 21 import java.util.Collections; 22 import java.util.HashSet; 23 import java.util.Iterator; 24 import java.util.LinkedHashMap; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 29 import org.apache.activeio.journal.RecordLocation; 30 import org.apache.activemq.broker.ConnectionContext; 31 import org.apache.activemq.command.ActiveMQDestination; 32 import org.apache.activemq.command.JournalQueueAck; 33 import org.apache.activemq.command.Message; 34 import org.apache.activemq.command.MessageAck; 35 import org.apache.activemq.command.MessageId; 36 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 37 import org.apache.activemq.store.MessageRecoveryListener; 38 import org.apache.activemq.store.MessageStore; 39 import org.apache.activemq.store.PersistenceAdapter; 40 import org.apache.activemq.store.AbstractMessageStore; 41 import org.apache.activemq.transaction.Synchronization; 42 import org.apache.activemq.usage.MemoryUsage; 43 import org.apache.activemq.usage.SystemUsage; 44 import org.apache.activemq.util.Callback; 45 import org.apache.activemq.util.TransactionTemplate; 46 import org.apache.commons.logging.Log; 47 import org.apache.commons.logging.LogFactory; 48 49 /** 50 * A MessageStore that uses a Journal to store it's messages. 51 * 52 * @version $Revision: 1.14 $ 53 */ 54 public class JournalMessageStore extends AbstractMessageStore { 55 56 private static final Log LOG = LogFactory.getLog(JournalMessageStore.class); 57 58 protected final JournalPersistenceAdapter peristenceAdapter; 59 protected final JournalTransactionStore transactionStore; 60 protected final MessageStore longTermStore; 61 protected final TransactionTemplate transactionTemplate; 62 protected RecordLocation lastLocation; 63 protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>(); 64 65 private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>(); 66 private List<MessageAck> messageAcks = new ArrayList<MessageAck>(); 67 68 /** A MessageStore that we can use to retrieve messages quickly. */ 69 private Map<MessageId, Message> cpAddedMessageIds; 70 71 72 private MemoryUsage memoryUsage; 73 74 public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { 75 super(destination); 76 this.peristenceAdapter = adapter; 77 this.transactionStore = adapter.getTransactionStore(); 78 this.longTermStore = checkpointStore; 79 this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext())); 80 } 81 82 83 public void setMemoryUsage(MemoryUsage memoryUsage) { 84 this.memoryUsage=memoryUsage; 85 longTermStore.setMemoryUsage(memoryUsage); 86 } 87 88 /** 89 * Not synchronized since the Journal has better throughput if you increase 90 * the number of concurrent writes that it is doing. 91 */ 92 public void addMessage(ConnectionContext context, final Message message) throws IOException { 93 94 final MessageId id = message.getMessageId(); 95 96 final boolean debug = LOG.isDebugEnabled(); 97 message.incrementReferenceCount(); 98 99 final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired()); 100 if (!context.isInTransaction()) { 101 if (debug) { 102 LOG.debug("Journalled message add for: " + id + ", at: " + location); 103 } 104 addMessage(message, location); 105 } else { 106 if (debug) { 107 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); 108 } 109 synchronized (this) { 110 inFlightTxLocations.add(location); 111 } 112 transactionStore.addMessage(this, message, location); 113 context.getTransaction().addSynchronization(new Synchronization() { 114 public void afterCommit() throws Exception { 115 if (debug) { 116 LOG.debug("Transacted message add commit for: " + id + ", at: " + location); 117 } 118 synchronized (JournalMessageStore.this) { 119 inFlightTxLocations.remove(location); 120 addMessage(message, location); 121 } 122 } 123 124 public void afterRollback() throws Exception { 125 if (debug) { 126 LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); 127 } 128 synchronized (JournalMessageStore.this) { 129 inFlightTxLocations.remove(location); 130 } 131 message.decrementReferenceCount(); 132 } 133 }); 134 } 135 } 136 137 void addMessage(final Message message, final RecordLocation location) { 138 synchronized (this) { 139 lastLocation = location; 140 MessageId id = message.getMessageId(); 141 messages.put(id, message); 142 } 143 } 144 145 public void replayAddMessage(ConnectionContext context, Message message) { 146 try { 147 // Only add the message if it has not already been added. 148 Message t = longTermStore.getMessage(message.getMessageId()); 149 if (t == null) { 150 longTermStore.addMessage(context, message); 151 } 152 } catch (Throwable e) { 153 LOG.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e); 154 } 155 } 156 157 /** 158 */ 159 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 160 final boolean debug = LOG.isDebugEnabled(); 161 JournalQueueAck remove = new JournalQueueAck(); 162 remove.setDestination(destination); 163 remove.setMessageAck(ack); 164 165 final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired()); 166 if (!context.isInTransaction()) { 167 if (debug) { 168 LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); 169 } 170 removeMessage(ack, location); 171 } else { 172 if (debug) { 173 LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); 174 } 175 synchronized (this) { 176 inFlightTxLocations.add(location); 177 } 178 transactionStore.removeMessage(this, ack, location); 179 context.getTransaction().addSynchronization(new Synchronization() { 180 public void afterCommit() throws Exception { 181 if (debug) { 182 LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location); 183 } 184 synchronized (JournalMessageStore.this) { 185 inFlightTxLocations.remove(location); 186 removeMessage(ack, location); 187 } 188 } 189 190 public void afterRollback() throws Exception { 191 if (debug) { 192 LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location); 193 } 194 synchronized (JournalMessageStore.this) { 195 inFlightTxLocations.remove(location); 196 } 197 } 198 }); 199 200 } 201 } 202 203 final void removeMessage(final MessageAck ack, final RecordLocation location) { 204 synchronized (this) { 205 lastLocation = location; 206 MessageId id = ack.getLastMessageId(); 207 Message message = messages.remove(id); 208 if (message == null) { 209 messageAcks.add(ack); 210 } else { 211 message.decrementReferenceCount(); 212 } 213 } 214 } 215 216 public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { 217 try { 218 // Only remove the message if it has not already been removed. 219 Message t = longTermStore.getMessage(messageAck.getLastMessageId()); 220 if (t != null) { 221 longTermStore.removeMessage(context, messageAck); 222 } 223 } catch (Throwable e) { 224 LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e); 225 } 226 } 227 228 /** 229 * @return 230 * @throws IOException 231 */ 232 public RecordLocation checkpoint() throws IOException { 233 return checkpoint(null); 234 } 235 236 /** 237 * @return 238 * @throws IOException 239 */ 240 @SuppressWarnings("unchecked") 241 public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException { 242 243 final List<MessageAck> cpRemovedMessageLocations; 244 final List<RecordLocation> cpActiveJournalLocations; 245 final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); 246 247 // swap out the message hash maps.. 248 synchronized (this) { 249 cpAddedMessageIds = this.messages; 250 cpRemovedMessageLocations = this.messageAcks; 251 252 cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations); 253 254 this.messages = new LinkedHashMap<MessageId, Message>(); 255 this.messageAcks = new ArrayList<MessageAck>(); 256 } 257 258 transactionTemplate.run(new Callback() { 259 public void execute() throws Exception { 260 261 int size = 0; 262 263 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter(); 264 ConnectionContext context = transactionTemplate.getContext(); 265 266 // Checkpoint the added messages. 267 synchronized (JournalMessageStore.this) { 268 Iterator<Message> iterator = cpAddedMessageIds.values().iterator(); 269 while (iterator.hasNext()) { 270 Message message = iterator.next(); 271 try { 272 longTermStore.addMessage(context, message); 273 } catch (Throwable e) { 274 LOG.warn("Message could not be added to long term store: " + e.getMessage(), e); 275 } 276 size += message.getSize(); 277 message.decrementReferenceCount(); 278 // Commit the batch if it's getting too big 279 if (size >= maxCheckpointMessageAddSize) { 280 persitanceAdapter.commitTransaction(context); 281 persitanceAdapter.beginTransaction(context); 282 size = 0; 283 } 284 } 285 } 286 287 persitanceAdapter.commitTransaction(context); 288 persitanceAdapter.beginTransaction(context); 289 290 // Checkpoint the removed messages. 291 Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator(); 292 while (iterator.hasNext()) { 293 try { 294 MessageAck ack = iterator.next(); 295 longTermStore.removeMessage(transactionTemplate.getContext(), ack); 296 } catch (Throwable e) { 297 LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e); 298 } 299 } 300 301 if (postCheckpointTest != null) { 302 postCheckpointTest.execute(); 303 } 304 } 305 306 }); 307 308 synchronized (this) { 309 cpAddedMessageIds = null; 310 } 311 312 if (cpActiveJournalLocations.size() > 0) { 313 Collections.sort(cpActiveJournalLocations); 314 return cpActiveJournalLocations.get(0); 315 } 316 synchronized (this) { 317 return lastLocation; 318 } 319 } 320 321 /** 322 * 323 */ 324 public Message getMessage(MessageId identity) throws IOException { 325 Message answer = null; 326 327 synchronized (this) { 328 // Do we have a still have it in the journal? 329 answer = messages.get(identity); 330 if (answer == null && cpAddedMessageIds != null) { 331 answer = cpAddedMessageIds.get(identity); 332 } 333 } 334 335 if (answer != null) { 336 return answer; 337 } 338 339 // If all else fails try the long term message store. 340 return longTermStore.getMessage(identity); 341 } 342 343 /** 344 * Replays the checkpointStore first as those messages are the oldest ones, 345 * then messages are replayed from the transaction log and then the cache is 346 * updated. 347 * 348 * @param listener 349 * @throws Exception 350 */ 351 public void recover(final MessageRecoveryListener listener) throws Exception { 352 peristenceAdapter.checkpoint(true, true); 353 longTermStore.recover(listener); 354 } 355 356 public void start() throws Exception { 357 if (this.memoryUsage != null) { 358 this.memoryUsage.addUsageListener(peristenceAdapter); 359 } 360 longTermStore.start(); 361 } 362 363 public void stop() throws Exception { 364 longTermStore.stop(); 365 if (this.memoryUsage != null) { 366 this.memoryUsage.removeUsageListener(peristenceAdapter); 367 } 368 } 369 370 /** 371 * @return Returns the longTermStore. 372 */ 373 public MessageStore getLongTermMessageStore() { 374 return longTermStore; 375 } 376 377 /** 378 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 379 */ 380 public void removeAllMessages(ConnectionContext context) throws IOException { 381 peristenceAdapter.checkpoint(true, true); 382 longTermStore.removeAllMessages(context); 383 } 384 385 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 386 throw new IOException("The journal does not support message references."); 387 } 388 389 public String getMessageReference(MessageId identity) throws IOException { 390 throw new IOException("The journal does not support message references."); 391 } 392 393 /** 394 * @return 395 * @throws IOException 396 * @see org.apache.activemq.store.MessageStore#getMessageCount() 397 */ 398 public int getMessageCount() throws IOException { 399 peristenceAdapter.checkpoint(true, true); 400 return longTermStore.getMessageCount(); 401 } 402 403 public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { 404 peristenceAdapter.checkpoint(true, true); 405 longTermStore.recoverNextMessages(maxReturned, listener); 406 407 } 408 409 public void resetBatching() { 410 longTermStore.resetBatching(); 411 412 } 413 414 @Override 415 public void setBatch(MessageId messageId) throws Exception { 416 peristenceAdapter.checkpoint(true, true); 417 longTermStore.setBatch(messageId); 418 } 419 420 }