1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.broker.region.cursors; 18 19 import java.util.Collections; 20 import java.util.HashMap; 21 import java.util.List; 22 import java.util.Map; 23 import java.util.concurrent.CopyOnWriteArrayList; 24 25 import org.apache.activemq.advisory.AdvisorySupport; 26 import org.apache.activemq.broker.Broker; 27 import org.apache.activemq.broker.ConnectionContext; 28 import org.apache.activemq.broker.region.Destination; 29 import org.apache.activemq.broker.region.MessageReference; 30 import org.apache.activemq.broker.region.Subscription; 31 import org.apache.activemq.broker.region.Topic; 32 import org.apache.activemq.command.Message; 33 import org.apache.activemq.usage.SystemUsage; 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 37 /** 38 * persist pending messages pending message (messages awaiting dispatch to a 39 * consumer) cursor 40 * 41 * @version $Revision: 813962 $ 42 */ 43 public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { 44 45 private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class); 46 private final String clientId; 47 private final String subscriberName; 48 private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>(); 49 private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>(); 50 private final PendingMessageCursor nonPersistent; 51 private PendingMessageCursor currentCursor; 52 private final Subscription subscription; 53 /** 54 * @param broker Broker for this cursor 55 * @param clientId clientId for this cursor 56 * @param subscriberName subscriber name for this cursor 57 * @param maxBatchSize currently ignored 58 * @param subscription subscription for this cursor 59 */ 60 public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) { 61 this.subscription=subscription; 62 this.clientId = clientId; 63 this.subscriberName = subscriberName; 64 if (broker.getBrokerService().isPersistent()) { 65 this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName); 66 }else { 67 this.nonPersistent = new VMPendingMessageCursor(); 68 } 69 70 this.nonPersistent.setMaxBatchSize(maxBatchSize); 71 this.nonPersistent.setSystemUsage(systemUsage); 72 this.storePrefetches.add(this.nonPersistent); 73 } 74 75 public synchronized void start() throws Exception { 76 if (!isStarted()) { 77 super.start(); 78 for (PendingMessageCursor tsp : storePrefetches) { 79 tsp.setMessageAudit(getMessageAudit()); 80 tsp.start(); 81 } 82 } 83 } 84 85 public synchronized void stop() throws Exception { 86 if (isStarted()) { 87 super.stop(); 88 for (PendingMessageCursor tsp : storePrefetches) { 89 tsp.stop(); 90 } 91 } 92 } 93 94 /** 95 * Add a destination 96 * 97 * @param context 98 * @param destination 99 * @throws Exception 100 */ 101 public synchronized void add(ConnectionContext context, Destination destination) throws Exception { 102 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) { 103 TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName); 104 tsp.setMaxBatchSize(getMaxBatchSize()); 105 tsp.setSystemUsage(systemUsage); 106 tsp.setEnableAudit(isEnableAudit()); 107 tsp.setMaxAuditDepth(getMaxAuditDepth()); 108 tsp.setMaxProducersToAudit(getMaxProducersToAudit()); 109 tsp.setMemoryUsageHighWaterMark(getMemoryUsageHighWaterMark()); 110 topics.put(destination, tsp); 111 storePrefetches.add(tsp); 112 if (isStarted()) { 113 tsp.start(); 114 } 115 } 116 } 117 118 /** 119 * remove a destination 120 * 121 * @param context 122 * @param destination 123 * @throws Exception 124 */ 125 public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 126 PendingMessageCursor tsp = topics.remove(destination); 127 if (tsp != null) { 128 storePrefetches.remove(tsp); 129 } 130 return Collections.EMPTY_LIST; 131 } 132 133 /** 134 * @return true if there are no pending messages 135 */ 136 public synchronized boolean isEmpty() { 137 for (PendingMessageCursor tsp : storePrefetches) { 138 if( !tsp.isEmpty() ) 139 return false; 140 } 141 return true; 142 } 143 144 public synchronized boolean isEmpty(Destination destination) { 145 boolean result = true; 146 TopicStorePrefetch tsp = topics.get(destination); 147 if (tsp != null) { 148 result = tsp.isEmpty(); 149 } 150 return result; 151 } 152 153 /** 154 * Informs the Broker if the subscription needs to intervention to recover 155 * it's state e.g. DurableTopicSubscriber may do 156 * 157 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor 158 * @return true if recovery required 159 */ 160 public boolean isRecoveryRequired() { 161 return false; 162 } 163 164 public synchronized void addMessageLast(MessageReference node) throws Exception { 165 if (node != null) { 166 Message msg = node.getMessage(); 167 if (isStarted()) { 168 if (!msg.isPersistent()) { 169 nonPersistent.addMessageLast(node); 170 } 171 } 172 if (msg.isPersistent()) { 173 Destination dest = msg.getRegionDestination(); 174 TopicStorePrefetch tsp = topics.get(dest); 175 if (tsp != null) { 176 tsp.addMessageLast(node); 177 } 178 } 179 } 180 } 181 182 public synchronized void addRecoveredMessage(MessageReference node) throws Exception { 183 nonPersistent.addMessageLast(node); 184 } 185 186 public synchronized void clear() { 187 for (PendingMessageCursor tsp : storePrefetches) { 188 tsp.clear(); 189 } 190 } 191 192 public synchronized boolean hasNext() { 193 boolean result = true; 194 if (result) { 195 try { 196 currentCursor = getNextCursor(); 197 } catch (Exception e) { 198 LOG.error("Failed to get current cursor ", e); 199 throw new RuntimeException(e); 200 } 201 result = currentCursor != null ? currentCursor.hasNext() : false; 202 } 203 return result; 204 } 205 206 public synchronized MessageReference next() { 207 MessageReference result = currentCursor != null ? currentCursor.next() : null; 208 return result; 209 } 210 211 public synchronized void remove() { 212 if (currentCursor != null) { 213 currentCursor.remove(); 214 } 215 } 216 217 public synchronized void remove(MessageReference node) { 218 if (currentCursor != null) { 219 currentCursor.remove(node); 220 } 221 } 222 223 public synchronized void reset() { 224 for (PendingMessageCursor storePrefetch : storePrefetches) { 225 storePrefetch.reset(); 226 } 227 } 228 229 public synchronized void release() { 230 for (PendingMessageCursor storePrefetch : storePrefetches) { 231 storePrefetch.release(); 232 } 233 } 234 235 public synchronized int size() { 236 int pendingCount=0; 237 for (PendingMessageCursor tsp : storePrefetches) { 238 pendingCount += tsp.size(); 239 } 240 return pendingCount; 241 } 242 243 public void setMaxBatchSize(int maxBatchSize) { 244 for (PendingMessageCursor storePrefetch : storePrefetches) { 245 storePrefetch.setMaxBatchSize(maxBatchSize); 246 } 247 super.setMaxBatchSize(maxBatchSize); 248 } 249 250 public synchronized void gc() { 251 for (PendingMessageCursor tsp : storePrefetches) { 252 tsp.gc(); 253 } 254 } 255 256 public void setSystemUsage(SystemUsage usageManager) { 257 super.setSystemUsage(usageManager); 258 for (PendingMessageCursor tsp : storePrefetches) { 259 tsp.setSystemUsage(usageManager); 260 } 261 } 262 263 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 264 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 265 for (PendingMessageCursor cursor : storePrefetches) { 266 cursor.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 267 } 268 } 269 270 public void setMaxProducersToAudit(int maxProducersToAudit) { 271 super.setMaxProducersToAudit(maxProducersToAudit); 272 for (PendingMessageCursor cursor : storePrefetches) { 273 cursor.setMaxAuditDepth(maxAuditDepth); 274 } 275 } 276 277 public void setMaxAuditDepth(int maxAuditDepth) { 278 super.setMaxAuditDepth(maxAuditDepth); 279 for (PendingMessageCursor cursor : storePrefetches) { 280 cursor.setMaxAuditDepth(maxAuditDepth); 281 } 282 } 283 284 public void setEnableAudit(boolean enableAudit) { 285 super.setEnableAudit(enableAudit); 286 for (PendingMessageCursor cursor : storePrefetches) { 287 cursor.setEnableAudit(enableAudit); 288 } 289 } 290 291 public void setUseCache(boolean useCache) { 292 super.setUseCache(useCache); 293 for (PendingMessageCursor cursor : storePrefetches) { 294 cursor.setUseCache(useCache); 295 } 296 } 297 298 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 299 if (currentCursor == null || currentCursor.isEmpty()) { 300 currentCursor = null; 301 for (PendingMessageCursor tsp : storePrefetches) { 302 if (tsp.hasNext()) { 303 currentCursor = tsp; 304 break; 305 } 306 } 307 // round-robin 308 if (storePrefetches.size()>1) { 309 PendingMessageCursor first = storePrefetches.remove(0); 310 storePrefetches.add(first); 311 } 312 } 313 return currentCursor; 314 } 315 316 public String toString() { 317 return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")"; 318 } 319 }