1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.memory.list; 18 19 import java.io.IOException; 20 import java.util.ArrayList; 21 import java.util.Iterator; 22 import java.util.LinkedList; 23 import java.util.List; 24 import org.apache.activemq.broker.region.MessageReference; 25 import org.apache.activemq.command.ActiveMQDestination; 26 import org.apache.activemq.command.Message; 27 import org.apache.activemq.filter.DestinationFilter; 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 31 /** 32 * A simple fixed size {@link MessageList} where there is a single, fixed size 33 * list that all messages are added to for simplicity. Though this will lead to 34 * possibly slow recovery times as many more messages than is necessary will 35 * have to be iterated through for each subscription. 36 * 37 * @version $Revision: 1.1 $ 38 */ 39 public class SimpleMessageList implements MessageList { 40 private static final Log LOG = LogFactory.getLog(SimpleMessageList.class); 41 private LinkedList<MessageReference> list = new LinkedList<MessageReference>(); 42 private int maximumSize = 100 * 64 * 1024; 43 private int size; 44 private Object lock = new Object(); 45 46 public SimpleMessageList() { 47 } 48 49 public SimpleMessageList(int maximumSize) { 50 this.maximumSize = maximumSize; 51 } 52 53 public void add(MessageReference node) { 54 int delta = node.getMessageHardRef().getSize(); 55 synchronized (lock) { 56 list.add(node); 57 size += delta; 58 while (size > maximumSize) { 59 MessageReference evicted = list.removeFirst(); 60 size -= evicted.getMessageHardRef().getSize(); 61 } 62 } 63 } 64 65 public List<MessageReference> getMessages(ActiveMQDestination destination) { 66 return getList(); 67 } 68 69 public Message[] browse(ActiveMQDestination destination) { 70 List<Message> result = new ArrayList<Message>(); 71 DestinationFilter filter = DestinationFilter.parseFilter(destination); 72 synchronized (lock) { 73 for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) { 74 MessageReference ref = i.next(); 75 Message msg; 76 try { 77 msg = ref.getMessage(); 78 if (filter.matches(msg.getDestination())) { 79 result.add(msg); 80 } 81 } catch (IOException e) { 82 LOG.error("Failed to get Message from MessageReference: " + ref, e); 83 } 84 85 } 86 } 87 return result.toArray(new Message[result.size()]); 88 } 89 90 /** 91 * Returns a copy of the list 92 */ 93 public List<MessageReference> getList() { 94 synchronized (lock) { 95 return new ArrayList<MessageReference>(list); 96 } 97 } 98 99 public int getSize() { 100 synchronized (lock) { 101 return size; 102 } 103 } 104 105 public void clear() { 106 synchronized (lock) { 107 list.clear(); 108 size = 0; 109 } 110 } 111 112 }