Home » activemq-parent-5.3.1-source-release » org.apache.activemq.console.command.store.amq.reader » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.console.command.store.amq.reader;
   18   
   19   import java.io.File;
   20   import java.io.IOException;
   21   import java.util.HashSet;
   22   import java.util.Iterator;
   23   import java.util.Set;
   24   
   25   import javax.jms.InvalidSelectorException;
   26   import javax.jms.Message;
   27   
   28   import org.apache.activemq.command.DataStructure;
   29   import org.apache.activemq.filter.BooleanExpression;
   30   import org.apache.activemq.kaha.impl.async.AsyncDataManager;
   31   import org.apache.activemq.kaha.impl.async.Location;
   32   import org.apache.activemq.openwire.OpenWireFormat;
   33   import org.apache.activemq.selector.SelectorParser;
   34   import org.apache.activemq.util.ByteSequence;
   35   import org.apache.activemq.wireformat.WireFormat;
   36   
   37   /**
   38    * Reads and iterates through data log files for the AMQMessage Store
   39    * 
   40    */
   41   public class AMQReader implements Iterable<Message> {
   42   
   43       private AsyncDataManager dataManager;
   44       private WireFormat wireFormat = new OpenWireFormat();
   45       private File file;
   46       private BooleanExpression expression;
   47   
   48       /**
   49        * List all the data files in a directory
   50        * @param directory
   51        * @return
   52        * @throws IOException
   53        */
   54       public static Set<File> listDataFiles(File directory) throws IOException{
   55           Set<File>result = new HashSet<File>();
   56           if (directory == null || !directory.exists() || !directory.isDirectory()) {
   57               throw new IOException("Invalid Directory " + directory);
   58           }
   59           AsyncDataManager dataManager = new AsyncDataManager();
   60           dataManager.setDirectory(directory);
   61           dataManager.start();
   62           Set<File> set = dataManager.getFiles();
   63           if (set != null) {
   64               result.addAll(set);
   65           }
   66           dataManager.close();
   67           return result;
   68       }
   69       /**
   70        * Create the AMQReader to read a directory of amq data logs - or an
   71        * individual data log file
   72        * 
   73        * @param file the directory - or file
   74        * @throws IOException 
   75        * @throws InvalidSelectorException 
   76        * @throws IOException
   77        * @throws InvalidSelectorException 
   78        */
   79       public AMQReader(File file) throws InvalidSelectorException, IOException {
   80           this(file,null);
   81       }
   82       
   83       /**
   84        * Create the AMQReader to read a directory of amq data logs - or an
   85        * individual data log file
   86        * 
   87        * @param file the directory - or file
   88        * @param selector the JMS selector or null to select all
   89        * @throws IOException
   90        * @throws InvalidSelectorException 
   91        */
   92       public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
   93           String str = selector != null ? selector.trim() : null;
   94           if (str != null && str.length() > 0) {
   95               this.expression=SelectorParser.parse(str);
   96           }
   97           dataManager = new AsyncDataManager();
   98           dataManager.setArchiveDataLogs(false);
   99           if (file.isDirectory()) {
  100               dataManager.setDirectory(file);
  101           } else {
  102               dataManager.setDirectory(file.getParentFile());
  103               dataManager.setDirectoryArchive(file);
  104               this.file = file;
  105           }
  106           dataManager.start();
  107       }
  108   
  109       public Iterator<Message> iterator() {
  110           return new AMQIterator(this,this.expression);
  111       }
  112   
  113       
  114       protected MessageLocation getNextMessage(MessageLocation lastLocation)
  115               throws IllegalStateException, IOException {
  116           if (this.file != null) {
  117               return getInternalNextMessage(this.file, lastLocation);
  118           }
  119           return getInternalNextMessage(lastLocation);
  120       }
  121   
  122       private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
  123               throws IllegalStateException, IOException {
  124           return getInternalNextMessage(null, lastLocation);
  125       }
  126   
  127       private MessageLocation getInternalNextMessage(File file,
  128               MessageLocation lastLocation) throws IllegalStateException,
  129               IOException {
  130           MessageLocation result = lastLocation;
  131           if (result != null) {
  132               result.setMessage(null);
  133           }
  134           Message message = null;
  135           Location pos = lastLocation != null ? lastLocation.getLocation() : null;
  136           while ((pos = getNextLocation(file, pos)) != null) {
  137               message = getMessage(pos);
  138               if (message != null) {
  139                   if (result == null) {
  140                       result = new MessageLocation();
  141                   }
  142                   result.setMessage(message);
  143                   break;
  144               }
  145           }
  146           result.setLocation(pos);
  147           if (pos == null && message == null) {
  148               result = null;
  149           } else {
  150               result.setLocation(pos);
  151           }
  152           return result;
  153       }
  154   
  155       private Location getNextLocation(File file, Location last)
  156               throws IllegalStateException, IOException {
  157           if (file != null) {
  158               return dataManager.getNextLocation(file, last, true);
  159           }
  160           return dataManager.getNextLocation(last);
  161       }
  162   
  163       private Message getMessage(Location location) throws IOException {
  164           ByteSequence data = dataManager.read(location);
  165           DataStructure c = (DataStructure) wireFormat.unmarshal(data);
  166           if (c instanceof Message) {
  167               return (Message) c;
  168           }
  169           return null;
  170   
  171       }
  172   }

Home » activemq-parent-5.3.1-source-release » org.apache.activemq.console.command.store.amq.reader » [javadoc | source]