1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.region.cursors; 18 19 import java.util.Iterator; 20 import java.util.LinkedHashMap; 21 import java.util.Map.Entry; 22 import org.apache.activemq.broker.region.Destination; 23 import org.apache.activemq.broker.region.MessageReference; 24 import org.apache.activemq.command.Message; 25 import org.apache.activemq.command.MessageId; 26 import org.apache.activemq.store.MessageRecoveryListener; 27 import org.apache.commons.logging.Log; 28 import org.apache.commons.logging.LogFactory; 29 30 /** 31 * Store based cursor 32 * 33 */ 34 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { 35 private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class); 36 protected final Destination regionDestination; 37 private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> (); 38 private Iterator<Entry<MessageId, Message>> iterator = null; 39 private boolean cacheEnabled=false; 40 protected boolean batchResetNeeded = true; 41 protected boolean storeHasMessages = false; 42 protected int size; 43 private MessageId lastCachedId; 44 45 protected AbstractStoreCursor(Destination destination) { 46 this.regionDestination=destination; 47 } 48 49 @Override 50 public final synchronized void start() throws Exception{ 51 if (!isStarted()) { 52 super.start(); 53 clear(); 54 resetBatch(); 55 this.size = getStoreSize(); 56 this.storeHasMessages=this.size > 0; 57 if (!this.storeHasMessages&&useCache) { 58 cacheEnabled=true; 59 } 60 } 61 } 62 63 @Override 64 public final synchronized void stop() throws Exception { 65 resetBatch(); 66 super.stop(); 67 gc(); 68 } 69 70 71 public final boolean recoverMessage(Message message) throws Exception { 72 return recoverMessage(message,false); 73 } 74 75 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { 76 boolean recovered = false; 77 if (recordUniqueId(message.getMessageId())) { 78 if (!cached) { 79 message.setRegionDestination(regionDestination); 80 if( message.getMemoryUsage()==null ) { 81 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 82 } 83 } 84 message.incrementReferenceCount(); 85 batchList.put(message.getMessageId(), message); 86 clearIterator(true); 87 recovered = true; 88 } else { 89 /* 90 * we should expect to get these - as the message is recorded as it before it goes into 91 * the cache. If subsequently, we pull out that message from the store (before its deleted) 92 * it will be a duplicate - but should be ignored 93 */ 94 //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message); 95 storeHasMessages = true; 96 } 97 return recovered; 98 } 99 100 @Override 101 public final void reset() { 102 if (batchList.isEmpty()) { 103 try { 104 fillBatch(); 105 } catch (Exception e) { 106 LOG.error("Failed to fill batch", e); 107 throw new RuntimeException(e); 108 } 109 } 110 clearIterator(true); 111 size(); 112 } 113 114 @Override 115 public synchronized void release() { 116 clearIterator(false); 117 } 118 119 private synchronized void clearIterator(boolean ensureIterator) { 120 boolean haveIterator = this.iterator != null; 121 this.iterator=null; 122 last = null; 123 if(haveIterator&&ensureIterator) { 124 ensureIterator(); 125 } 126 } 127 128 private synchronized void ensureIterator() { 129 if(this.iterator==null) { 130 this.iterator=this.batchList.entrySet().iterator(); 131 } 132 } 133 134 135 public final void finished() { 136 } 137 138 @Override 139 public final synchronized boolean hasNext() { 140 if (batchList.isEmpty()) { 141 try { 142 fillBatch(); 143 } catch (Exception e) { 144 LOG.error("Failed to fill batch", e); 145 throw new RuntimeException(e); 146 } 147 } 148 ensureIterator(); 149 return this.iterator.hasNext(); 150 } 151 152 @Override 153 public final synchronized MessageReference next() { 154 MessageReference result = null; 155 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { 156 result = this.iterator.next().getValue(); 157 } 158 last = result; 159 if (result != null) { 160 result.incrementReferenceCount(); 161 } 162 return result; 163 } 164 165 @Override 166 public final synchronized void addMessageLast(MessageReference node) throws Exception { 167 if (cacheEnabled && hasSpace()) { 168 recoverMessage(node.getMessage(),true); 169 lastCachedId = node.getMessageId(); 170 } else { 171 if (cacheEnabled) { 172 cacheEnabled=false; 173 if (LOG.isDebugEnabled()) { 174 LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " disabling cache on size:" + size 175 + ", lastCachedIdSeq: " + (lastCachedId == null ? -1 : lastCachedId.getBrokerSequenceId()) 176 + " current node seqId: " + node.getMessageId().getBrokerSequenceId()); 177 } 178 // sync with store on disabling the cache 179 if (lastCachedId != null) { 180 setBatch(lastCachedId); 181 } 182 } 183 } 184 size++; 185 } 186 187 protected void setBatch(MessageId messageId) throws Exception { 188 } 189 190 @Override 191 public final synchronized void addMessageFirst(MessageReference node) throws Exception { 192 cacheEnabled=false; 193 size++; 194 } 195 196 @Override 197 public final synchronized void remove() { 198 size--; 199 if (iterator!=null) { 200 iterator.remove(); 201 } 202 if (last != null) { 203 last.decrementReferenceCount(); 204 } 205 if (size==0 && isStarted() && useCache && hasSpace() && isStoreEmpty()) { 206 if (LOG.isDebugEnabled()) { 207 LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove"); 208 } 209 cacheEnabled=true; 210 } 211 } 212 213 @Override 214 public final synchronized void remove(MessageReference node) { 215 size--; 216 cacheEnabled=false; 217 batchList.remove(node.getMessageId()); 218 } 219 220 @Override 221 public final synchronized void clear() { 222 gc(); 223 } 224 225 @Override 226 public final synchronized void gc() { 227 for (Message msg : batchList.values()) { 228 rollback(msg.getMessageId()); 229 msg.decrementReferenceCount(); 230 } 231 batchList.clear(); 232 clearIterator(false); 233 batchResetNeeded = true; 234 this.cacheEnabled=false; 235 if (isStarted()) { 236 size = getStoreSize(); 237 } else { 238 size = 0; 239 } 240 } 241 242 @Override 243 protected final synchronized void fillBatch() { 244 if (batchResetNeeded) { 245 resetBatch(); 246 this.batchResetNeeded = false; 247 } 248 if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) { 249 this.storeHasMessages = false; 250 try { 251 doFillBatch(); 252 } catch (Exception e) { 253 LOG.error("Failed to fill batch", e); 254 throw new RuntimeException(e); 255 } 256 if (!this.batchList.isEmpty()) { 257 this.storeHasMessages=true; 258 } 259 } 260 } 261 262 @Override 263 public final synchronized boolean isEmpty() { 264 // negative means more messages added to store through queue.send since last reset 265 return size == 0; 266 } 267 268 @Override 269 public final synchronized boolean hasMessagesBufferedToDeliver() { 270 return !batchList.isEmpty(); 271 } 272 273 @Override 274 public final synchronized int size() { 275 if (size < 0) { 276 this.size = getStoreSize(); 277 } 278 return size; 279 } 280 281 282 protected abstract void doFillBatch() throws Exception; 283 284 protected abstract void resetBatch(); 285 286 protected abstract int getStoreSize(); 287 288 protected abstract boolean isStoreEmpty(); 289 }