Home » activemq-parent-5.3.1-source-release » org.apache.activemq.console.command.store.amq » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.console.command.store.amq;
   18   
   19   import java.io.File;
   20   import java.io.InputStream;
   21   import java.io.PrintWriter;
   22   import java.util.ArrayList;
   23   import java.util.Arrays;
   24   import java.util.Collections;
   25   import java.util.HashMap;
   26   import java.util.Iterator;
   27   import java.util.List;
   28   import java.util.Map;
   29   import java.util.Scanner;
   30   
   31   import org.apache.activemq.command.ActiveMQBlobMessage;
   32   import org.apache.activemq.command.ActiveMQBytesMessage;
   33   import org.apache.activemq.command.ActiveMQMapMessage;
   34   import org.apache.activemq.command.ActiveMQMessage;
   35   import org.apache.activemq.command.ActiveMQObjectMessage;
   36   import org.apache.activemq.command.ActiveMQStreamMessage;
   37   import org.apache.activemq.command.ActiveMQTextMessage;
   38   import org.apache.activemq.command.DataStructure;
   39   import org.apache.activemq.command.JournalQueueAck;
   40   import org.apache.activemq.command.JournalTopicAck;
   41   import org.apache.activemq.command.JournalTrace;
   42   import org.apache.activemq.command.JournalTransaction;
   43   import org.apache.activemq.kaha.impl.async.Location;
   44   import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
   45   import org.apache.activemq.openwire.OpenWireFormat;
   46   import org.apache.activemq.util.ByteSequence;
   47   import org.apache.activemq.wireformat.WireFormat;
   48   import org.apache.velocity.Template;
   49   import org.apache.velocity.VelocityContext;
   50   import org.apache.velocity.app.Velocity;
   51   import org.apache.velocity.app.VelocityEngine;
   52   import org.josql.Query;
   53   
   54   /**
   55    * Allows you to view the contents of a Journal.
   56    * 
   57    * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
   58    */
   59   public class AMQJournalTool {
   60   
   61   	private final ArrayList<File> dirs = new ArrayList<File>();
   62   	private final WireFormat wireFormat = new OpenWireFormat();
   63   	private final HashMap<String, String> resources = new HashMap<String, String>();
   64   
   65   	private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
   66   	private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
   67   	private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
   68   	private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
   69   	private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
   70   	private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
   71   	private String where;
   72   	private VelocityContext context;
   73   	private VelocityEngine velocity;
   74   	private boolean help;
   75   
   76   	public static void main(String[] args) throws Exception {
   77   		AMQJournalTool consumerTool = new AMQJournalTool();
   78   		String[] directories = CommandLineSupport
   79   				.setOptions(consumerTool, args);
   80   		if (directories.length < 1) {
   81   			System.out
   82   					.println("Please specify the directories with journal data to scan");
   83   			return;
   84   		}
   85   		for (int i = 0; i < directories.length; i++) {
   86   			consumerTool.getDirs().add(new File(directories[i]));
   87   		}
   88   		consumerTool.execute();
   89   	}
   90   
   91   	public void execute() throws Exception {
   92   
   93   		if( help ) {
   94   			showHelp();
   95   			return;
   96   		}
   97   		
   98   		if (getDirs().size() < 1) {
   99   			System.out.println("");
  100   			System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
  101   			System.out.println("");
  102   			showHelp();
  103   			return;
  104   		}
  105   
  106   		for (File dir : getDirs()) {
  107   			if( !dir.exists() ) {
  108   				System.out.println("");
  109   				System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
  110   				System.out.println("");
  111   				showHelp();
  112   				return;
  113   			}
  114   			if( !dir.isDirectory() ) {
  115   				System.out.println("");
  116   				System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
  117   				System.out.println("");
  118   				showHelp();
  119   				return;
  120   			}
  121   		}
  122   		
  123   		
  124   		context = new VelocityContext();
  125   		List keys = Arrays.asList(context.getKeys());
  126   
  127   		for (Iterator iterator = System.getProperties().entrySet()
  128   				.iterator(); iterator.hasNext();) {
  129   			Map.Entry kv = (Map.Entry) iterator.next();
  130   			String name = (String) kv.getKey();
  131   			String value = (String) kv.getValue();
  132   
  133   			if (!keys.contains(name)) {
  134   				context.put(name, value);
  135   			}
  136   		}
  137   		
  138   		velocity = new VelocityEngine();
  139   		velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
  140   		velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
  141   		velocity.init();
  142   
  143   
  144   		resources.put("message", messageFormat);
  145   		resources.put("topicAck", topicAckFormat);
  146   		resources.put("queueAck", queueAckFormat);
  147   		resources.put("transaction", transactionFormat);
  148   		resources.put("trace", traceFormat);
  149   		resources.put("unknown", unknownFormat);
  150   
  151   		Query query = null;
  152   		if (where != null) {
  153   			query = new Query();
  154   			query.parse("select * from "+Entry.class.getName()+" where "+where);
  155   
  156   		}
  157   
  158   		ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
  159   		manager.start();
  160   		try {
  161   			Location curr = manager.getFirstLocation();
  162   			while (curr != null) {
  163   
  164   				ByteSequence data = manager.read(curr);
  165   				DataStructure c = (DataStructure) wireFormat.unmarshal(data);
  166   
  167   				Entry entry = new Entry();
  168   				entry.setLocation(curr);
  169   				entry.setRecord(c);
  170   				entry.setData(data);
  171   				entry.setQuery(query);
  172   				process(entry);
  173   
  174   				curr = manager.getNextLocation(curr);
  175   			}
  176   		} finally {
  177   			manager.close();
  178   		}
  179   	}
  180   
  181   	private void showHelp() {
  182   		InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
  183   		Scanner scanner = new Scanner(is);
  184   		while (scanner.hasNextLine()) {
  185   			String line = scanner.nextLine();
  186   			System.out.println(line);
  187   		}
  188   		scanner.close();	}
  189   
  190   	private void process(Entry entry) throws Exception {
  191   
  192   		Location location = entry.getLocation();
  193   		DataStructure record = entry.getRecord();
  194   
  195   		switch (record.getDataStructureType()) {
  196   		case ActiveMQMessage.DATA_STRUCTURE_TYPE:
  197   			entry.setType("ActiveMQMessage");
  198   			entry.setFormater("message");
  199   			display(entry);
  200   			break;
  201   		case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
  202   			entry.setType("ActiveMQBytesMessage");
  203   			entry.setFormater("message");
  204   			display(entry);
  205   			break;
  206   		case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
  207   			entry.setType("ActiveMQBlobMessage");
  208   			entry.setFormater("message");
  209   			display(entry);
  210   			break;
  211   		case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
  212   			entry.setType("ActiveMQMapMessage");
  213   			entry.setFormater("message");
  214   			display(entry);
  215   			break;
  216   		case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
  217   			entry.setType("ActiveMQObjectMessage");
  218   			entry.setFormater("message");
  219   			display(entry);
  220   			break;
  221   		case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
  222   			entry.setType("ActiveMQStreamMessage");
  223   			entry.setFormater("message");
  224   			display(entry);
  225   			break;
  226   		case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
  227   			entry.setType("ActiveMQTextMessage");
  228   			entry.setFormater("message");
  229   			display(entry);
  230   			break;
  231   		case JournalQueueAck.DATA_STRUCTURE_TYPE:
  232   			entry.setType("Queue Ack");
  233   			entry.setFormater("queueAck");
  234   			display(entry);
  235   			break;
  236   		case JournalTopicAck.DATA_STRUCTURE_TYPE:
  237   			entry.setType("Topic Ack");
  238   			entry.setFormater("topicAck");
  239   			display(entry);
  240   			break;
  241   		case JournalTransaction.DATA_STRUCTURE_TYPE:
  242   			entry.setType(getType((JournalTransaction) record));
  243   			entry.setFormater("transaction");
  244   			display(entry);
  245   			break;
  246   		case JournalTrace.DATA_STRUCTURE_TYPE:
  247   			entry.setType("Trace");
  248   			entry.setFormater("trace");
  249   			display(entry);
  250   			break;
  251   		default:
  252   			entry.setType("Unknown");
  253   			entry.setFormater("unknown");
  254   			display(entry);
  255   			break;
  256   		}
  257   	}
  258   
  259   	private String getType(JournalTransaction record) {
  260   		switch (record.getType()) {
  261   		case JournalTransaction.XA_PREPARE:
  262   			return "XA Prepare";
  263   		case JournalTransaction.XA_COMMIT:
  264   			return "XA Commit";
  265   		case JournalTransaction.XA_ROLLBACK:
  266   			return "XA Rollback";
  267   		case JournalTransaction.LOCAL_COMMIT:
  268   			return "Commit";
  269   		case JournalTransaction.LOCAL_ROLLBACK:
  270   			return "Rollback";
  271   		}
  272   		return "Unknown Transaction";
  273   	}
  274   
  275   	private void display(Entry entry) throws Exception {
  276   
  277   		if (entry.getQuery() != null) {
  278   			List list = Collections.singletonList(entry);
  279   			List results = entry.getQuery().execute(list).getResults();
  280   			if (results.isEmpty()) {
  281   				return;
  282   			}
  283   		}
  284   
  285   		CustomResourceLoader.setResources(resources);
  286   		try {
  287   
  288   			context.put("location", entry.getLocation());
  289   			context.put("record", entry.getRecord());
  290   			context.put("type", entry.getType());
  291   			if (entry.getRecord() instanceof ActiveMQMessage) {
  292   				context.put("body", new MessageBodyFormatter(
  293   						(ActiveMQMessage) entry.getRecord()));
  294   			}
  295   
  296   			Template template = velocity.getTemplate(entry.getFormater());
  297   			PrintWriter writer = new PrintWriter(System.out);
  298   			template.merge(context, writer);
  299   			writer.println();
  300   			writer.flush();
  301   		} finally {
  302   			CustomResourceLoader.setResources(null);
  303   		}
  304   	}
  305   
  306   	public void setMessageFormat(String messageFormat) {
  307   		this.messageFormat = messageFormat;
  308   	}
  309   
  310   	public void setTopicAckFormat(String ackFormat) {
  311   		this.topicAckFormat = ackFormat;
  312   	}
  313   
  314   	public void setTransactionFormat(String transactionFormat) {
  315   		this.transactionFormat = transactionFormat;
  316   	}
  317   
  318   	public void setTraceFormat(String traceFormat) {
  319   		this.traceFormat = traceFormat;
  320   	}
  321   
  322   	public void setUnknownFormat(String unknownFormat) {
  323   		this.unknownFormat = unknownFormat;
  324   	}
  325   
  326   	public void setQueueAckFormat(String queueAckFormat) {
  327   		this.queueAckFormat = queueAckFormat;
  328   	}
  329   
  330   	public String getQuery() {
  331   		return where;
  332   	}
  333   
  334   	public void setWhere(String query) {
  335   		this.where = query;
  336   	}
  337   
  338   	public boolean isHelp() {
  339   		return help;
  340   	}
  341   
  342   	public void setHelp(boolean help) {
  343   		this.help = help;
  344   	}
  345   
  346   	/**
  347   	 * @return the dirs
  348   	 */
  349   	public ArrayList<File> getDirs() {
  350   		return dirs;
  351   	}
  352   
  353   }

Home » activemq-parent-5.3.1-source-release » org.apache.activemq.console.command.store.amq » [javadoc | source]