1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.region.cursors; 18 19 import java.io.IOException; 20 import java.util.Iterator; 21 import java.util.LinkedList; 22 import java.util.concurrent.atomic.AtomicBoolean; 23 import java.util.concurrent.atomic.AtomicLong; 24 25 import org.apache.activemq.broker.Broker; 26 import org.apache.activemq.broker.ConnectionContext; 27 import org.apache.activemq.broker.region.Destination; 28 import org.apache.activemq.broker.region.MessageReference; 29 import org.apache.activemq.broker.region.QueueMessageReference; 30 import org.apache.activemq.command.Message; 31 import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 32 import org.apache.activemq.kaha.CommandMarshaller; 33 import org.apache.activemq.kaha.ListContainer; 34 import org.apache.activemq.kaha.Store; 35 import org.apache.activemq.openwire.OpenWireFormat; 36 import org.apache.activemq.usage.SystemUsage; 37 import org.apache.activemq.usage.Usage; 38 import org.apache.activemq.usage.UsageListener; 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 42 /** 43 * persist pending messages pending message (messages awaiting dispatch to a 44 * consumer) cursor 45 * 46 * @version $Revision: 911759 $ 47 */ 48 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 49 private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class); 50 private static final AtomicLong NAME_COUNT = new AtomicLong(); 51 protected Broker broker; 52 private Store store; 53 private String name; 54 private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>(); 55 private ListContainer<MessageReference> diskList; 56 private Iterator<MessageReference> iter; 57 private Destination regionDestination; 58 private boolean iterating; 59 private boolean flushRequired; 60 private AtomicBoolean started = new AtomicBoolean(); 61 /** 62 * @param name 63 * @param store 64 */ 65 public FilePendingMessageCursor(Broker broker,String name) { 66 this.useCache=false; 67 this.broker = broker; 68 //the store can be null if the BrokerService has persistence 69 //turned off 70 this.store= broker.getTempDataStore(); 71 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 72 } 73 74 public void start() throws Exception { 75 if (started.compareAndSet(false, true)) { 76 super.start(); 77 if (systemUsage != null) { 78 systemUsage.getMemoryUsage().addUsageListener(this); 79 } 80 } 81 } 82 83 public void stop() throws Exception { 84 if (started.compareAndSet(true, false)) { 85 super.stop(); 86 if (systemUsage != null) { 87 systemUsage.getMemoryUsage().removeUsageListener(this); 88 } 89 } 90 } 91 92 /** 93 * @return true if there are no pending messages 94 */ 95 public synchronized boolean isEmpty() { 96 if(memoryList.isEmpty() && isDiskListEmpty()){ 97 return true; 98 } 99 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 100 MessageReference node = iterator.next(); 101 if (node== QueueMessageReference.NULL_MESSAGE){ 102 continue; 103 } 104 if (!node.isDropped()) { 105 return false; 106 } 107 // We can remove dropped references. 108 iterator.remove(); 109 } 110 return isDiskListEmpty(); 111 } 112 113 114 115 /** 116 * reset the cursor 117 */ 118 public synchronized void reset() { 119 iterating = true; 120 last = null; 121 iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator(); 122 } 123 124 public synchronized void release() { 125 iterating = false; 126 if (flushRequired) { 127 flushRequired = false; 128 flushToDisk(); 129 } 130 } 131 132 public synchronized void destroy() throws Exception { 133 stop(); 134 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 135 Message node = (Message)i.next(); 136 node.decrementReferenceCount(); 137 } 138 memoryList.clear(); 139 destroyDiskList(); 140 } 141 142 private void destroyDiskList() throws Exception { 143 if (!isDiskListEmpty()) { 144 Iterator<MessageReference> iterator = diskList.iterator(); 145 while (iterator.hasNext()) { 146 iterator.next(); 147 iterator.remove(); 148 } 149 diskList.clear(); 150 } 151 store.deleteListContainer(name, "TopicSubscription"); 152 } 153 154 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 155 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 156 int count = 0; 157 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 158 MessageReference ref = i.next(); 159 ref.incrementReferenceCount(); 160 result.add(ref); 161 count++; 162 } 163 if (count < maxItems && !isDiskListEmpty()) { 164 for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) { 165 Message message = (Message)i.next(); 166 message.setRegionDestination(regionDestination); 167 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 168 message.incrementReferenceCount(); 169 result.add(message); 170 count++; 171 } 172 } 173 return result; 174 } 175 176 /** 177 * add message to await dispatch 178 * 179 * @param node 180 */ 181 public synchronized void addMessageLast(MessageReference node) { 182 if (!node.isExpired()) { 183 try { 184 regionDestination = node.getMessage().getRegionDestination(); 185 if (isDiskListEmpty()) { 186 if (hasSpace() || this.store==null) { 187 memoryList.add(node); 188 node.incrementReferenceCount(); 189 return; 190 } 191 } 192 if (!hasSpace()) { 193 if (isDiskListEmpty()) { 194 expireOldMessages(); 195 if (hasSpace()) { 196 memoryList.add(node); 197 node.incrementReferenceCount(); 198 return; 199 } else { 200 flushToDisk(); 201 } 202 } 203 } 204 systemUsage.getTempUsage().waitForSpace(); 205 getDiskList().add(node); 206 207 } catch (Exception e) { 208 LOG.error("Caught an Exception adding a message: " + node 209 + " first to FilePendingMessageCursor ", e); 210 throw new RuntimeException(e); 211 } 212 } else { 213 discard(node); 214 } 215 } 216 217 /** 218 * add message to await dispatch 219 * 220 * @param node 221 */ 222 public synchronized void addMessageFirst(MessageReference node) { 223 if (!node.isExpired()) { 224 try { 225 regionDestination = node.getMessage().getRegionDestination(); 226 if (isDiskListEmpty()) { 227 if (hasSpace()) { 228 memoryList.addFirst(node); 229 node.incrementReferenceCount(); 230 return; 231 } 232 } 233 if (!hasSpace()) { 234 if (isDiskListEmpty()) { 235 expireOldMessages(); 236 if (hasSpace()) { 237 memoryList.addFirst(node); 238 node.incrementReferenceCount(); 239 return; 240 } else { 241 flushToDisk(); 242 } 243 } 244 } 245 systemUsage.getTempUsage().waitForSpace(); 246 node.decrementReferenceCount(); 247 getDiskList().addFirst(node); 248 249 } catch (Exception e) { 250 LOG.error("Caught an Exception adding a message: " + node 251 + " first to FilePendingMessageCursor ", e); 252 throw new RuntimeException(e); 253 } 254 } else { 255 discard(node); 256 } 257 } 258 259 /** 260 * @return true if there pending messages to dispatch 261 */ 262 public synchronized boolean hasNext() { 263 return iter.hasNext(); 264 } 265 266 /** 267 * @return the next pending message 268 */ 269 public synchronized MessageReference next() { 270 Message message = (Message)iter.next(); 271 last = message; 272 if (!isDiskListEmpty()) { 273 // got from disk 274 message.setRegionDestination(regionDestination); 275 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 276 } 277 message.incrementReferenceCount(); 278 return message; 279 } 280 281 /** 282 * remove the message at the cursor position 283 */ 284 public synchronized void remove() { 285 iter.remove(); 286 if (last != null) { 287 last.decrementReferenceCount(); 288 } 289 } 290 291 /** 292 * @param node 293 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 294 */ 295 public synchronized void remove(MessageReference node) { 296 if (memoryList.remove(node)) { 297 node.decrementReferenceCount(); 298 } 299 if (!isDiskListEmpty()) { 300 getDiskList().remove(node); 301 } 302 } 303 304 /** 305 * @return the number of pending messages 306 */ 307 public synchronized int size() { 308 return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size()); 309 } 310 311 /** 312 * clear all pending messages 313 */ 314 public synchronized void clear() { 315 memoryList.clear(); 316 if (!isDiskListEmpty()) { 317 getDiskList().clear(); 318 } 319 last=null; 320 } 321 322 public synchronized boolean isFull() { 323 324 return super.isFull() 325 || (systemUsage != null && systemUsage.getTempUsage().isFull()); 326 327 } 328 329 public boolean hasMessagesBufferedToDeliver() { 330 return !isEmpty(); 331 } 332 333 public void setSystemUsage(SystemUsage usageManager) { 334 super.setSystemUsage(usageManager); 335 } 336 337 public void onUsageChanged(Usage usage, int oldPercentUsage, 338 int newPercentUsage) { 339 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 340 synchronized (this) { 341 flushRequired = true; 342 if (!iterating) { 343 expireOldMessages(); 344 if (!hasSpace()) { 345 flushToDisk(); 346 flushRequired = false; 347 } 348 } 349 } 350 } 351 } 352 353 public boolean isTransient() { 354 return true; 355 } 356 357 protected boolean isSpaceInMemoryList() { 358 return hasSpace() && isDiskListEmpty(); 359 } 360 361 protected synchronized void expireOldMessages() { 362 if (!memoryList.isEmpty()) { 363 LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList); 364 this.memoryList = new LinkedList<MessageReference>(); 365 while (!tmpList.isEmpty()) { 366 MessageReference node = tmpList.removeFirst(); 367 if (node.isExpired()) { 368 discard(node); 369 }else { 370 memoryList.add(node); 371 } 372 } 373 } 374 375 } 376 377 protected synchronized void flushToDisk() { 378 379 if (!memoryList.isEmpty()) { 380 while (!memoryList.isEmpty()) { 381 MessageReference node = memoryList.removeFirst(); 382 node.decrementReferenceCount(); 383 getDiskList().addLast(node); 384 } 385 memoryList.clear(); 386 } 387 } 388 389 protected boolean isDiskListEmpty() { 390 return diskList == null || diskList.isEmpty(); 391 } 392 393 protected ListContainer<MessageReference> getDiskList() { 394 if (diskList == null) { 395 try { 396 diskList = store.getListContainer(name, "TopicSubscription", true); 397 diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat())); 398 } catch (IOException e) { 399 LOG.error("Caught an IO Exception getting the DiskList " + name, e); 400 throw new RuntimeException(e); 401 } 402 } 403 return diskList; 404 } 405 406 protected void discard(MessageReference message) { 407 message.decrementReferenceCount(); 408 if (LOG.isDebugEnabled()) { 409 LOG.debug("Discarding message " + message); 410 } 411 broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message); 412 } 413 }