1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.console.command.store.amq.reader; 18 19 import java.io.File; 20 import java.io.IOException; 21 import java.util.HashSet; 22 import java.util.Iterator; 23 import java.util.Set; 24 25 import javax.jms.InvalidSelectorException; 26 import javax.jms.Message; 27 28 import org.apache.activemq.command.DataStructure; 29 import org.apache.activemq.filter.BooleanExpression; 30 import org.apache.activemq.kaha.impl.async.AsyncDataManager; 31 import org.apache.activemq.kaha.impl.async.Location; 32 import org.apache.activemq.openwire.OpenWireFormat; 33 import org.apache.activemq.selector.SelectorParser; 34 import org.apache.activemq.util.ByteSequence; 35 import org.apache.activemq.wireformat.WireFormat; 36 37 /** 38 * Reads and iterates through data log files for the AMQMessage Store 39 * 40 */ 41 public class AMQReader implements Iterable<Message> { 42 43 private AsyncDataManager dataManager; 44 private WireFormat wireFormat = new OpenWireFormat(); 45 private File file; 46 private BooleanExpression expression; 47 48 /** 49 * List all the data files in a directory 50 * @param directory 51 * @return 52 * @throws IOException 53 */ 54 public static Set<File> listDataFiles(File directory) throws IOException{ 55 Set<File>result = new HashSet<File>(); 56 if (directory == null || !directory.exists() || !directory.isDirectory()) { 57 throw new IOException("Invalid Directory " + directory); 58 } 59 AsyncDataManager dataManager = new AsyncDataManager(); 60 dataManager.setDirectory(directory); 61 dataManager.start(); 62 Set<File> set = dataManager.getFiles(); 63 if (set != null) { 64 result.addAll(set); 65 } 66 dataManager.close(); 67 return result; 68 } 69 /** 70 * Create the AMQReader to read a directory of amq data logs - or an 71 * individual data log file 72 * 73 * @param file the directory - or file 74 * @throws IOException 75 * @throws InvalidSelectorException 76 * @throws IOException 77 * @throws InvalidSelectorException 78 */ 79 public AMQReader(File file) throws InvalidSelectorException, IOException { 80 this(file,null); 81 } 82 83 /** 84 * Create the AMQReader to read a directory of amq data logs - or an 85 * individual data log file 86 * 87 * @param file the directory - or file 88 * @param selector the JMS selector or null to select all 89 * @throws IOException 90 * @throws InvalidSelectorException 91 */ 92 public AMQReader(File file, String selector) throws IOException, InvalidSelectorException { 93 String str = selector != null ? selector.trim() : null; 94 if (str != null && str.length() > 0) { 95 this.expression=SelectorParser.parse(str); 96 } 97 dataManager = new AsyncDataManager(); 98 dataManager.setArchiveDataLogs(false); 99 if (file.isDirectory()) { 100 dataManager.setDirectory(file); 101 } else { 102 dataManager.setDirectory(file.getParentFile()); 103 dataManager.setDirectoryArchive(file); 104 this.file = file; 105 } 106 dataManager.start(); 107 } 108 109 public Iterator<Message> iterator() { 110 return new AMQIterator(this,this.expression); 111 } 112 113 114 protected MessageLocation getNextMessage(MessageLocation lastLocation) 115 throws IllegalStateException, IOException { 116 if (this.file != null) { 117 return getInternalNextMessage(this.file, lastLocation); 118 } 119 return getInternalNextMessage(lastLocation); 120 } 121 122 private MessageLocation getInternalNextMessage(MessageLocation lastLocation) 123 throws IllegalStateException, IOException { 124 return getInternalNextMessage(null, lastLocation); 125 } 126 127 private MessageLocation getInternalNextMessage(File file, 128 MessageLocation lastLocation) throws IllegalStateException, 129 IOException { 130 MessageLocation result = lastLocation; 131 if (result != null) { 132 result.setMessage(null); 133 } 134 Message message = null; 135 Location pos = lastLocation != null ? lastLocation.getLocation() : null; 136 while ((pos = getNextLocation(file, pos)) != null) { 137 message = getMessage(pos); 138 if (message != null) { 139 if (result == null) { 140 result = new MessageLocation(); 141 } 142 result.setMessage(message); 143 break; 144 } 145 } 146 result.setLocation(pos); 147 if (pos == null && message == null) { 148 result = null; 149 } else { 150 result.setLocation(pos); 151 } 152 return result; 153 } 154 155 private Location getNextLocation(File file, Location last) 156 throws IllegalStateException, IOException { 157 if (file != null) { 158 return dataManager.getNextLocation(file, last, true); 159 } 160 return dataManager.getNextLocation(last); 161 } 162 163 private Message getMessage(Location location) throws IOException { 164 ByteSequence data = dataManager.read(location); 165 DataStructure c = (DataStructure) wireFormat.unmarshal(data); 166 if (c instanceof Message) { 167 return (Message) c; 168 } 169 return null; 170 171 } 172 }