1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.console.command.store.amq; 18 19 import java.io.File; 20 import java.io.InputStream; 21 import java.io.PrintWriter; 22 import java.util.ArrayList; 23 import java.util.Arrays; 24 import java.util.Collections; 25 import java.util.HashMap; 26 import java.util.Iterator; 27 import java.util.List; 28 import java.util.Map; 29 import java.util.Scanner; 30 31 import org.apache.activemq.command.ActiveMQBlobMessage; 32 import org.apache.activemq.command.ActiveMQBytesMessage; 33 import org.apache.activemq.command.ActiveMQMapMessage; 34 import org.apache.activemq.command.ActiveMQMessage; 35 import org.apache.activemq.command.ActiveMQObjectMessage; 36 import org.apache.activemq.command.ActiveMQStreamMessage; 37 import org.apache.activemq.command.ActiveMQTextMessage; 38 import org.apache.activemq.command.DataStructure; 39 import org.apache.activemq.command.JournalQueueAck; 40 import org.apache.activemq.command.JournalTopicAck; 41 import org.apache.activemq.command.JournalTrace; 42 import org.apache.activemq.command.JournalTransaction; 43 import org.apache.activemq.kaha.impl.async.Location; 44 import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager; 45 import org.apache.activemq.openwire.OpenWireFormat; 46 import org.apache.activemq.util.ByteSequence; 47 import org.apache.activemq.wireformat.WireFormat; 48 import org.apache.velocity.Template; 49 import org.apache.velocity.VelocityContext; 50 import org.apache.velocity.app.Velocity; 51 import org.apache.velocity.app.VelocityEngine; 52 import org.josql.Query; 53 54 /** 55 * Allows you to view the contents of a Journal. 56 * 57 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 58 */ 59 public class AMQJournalTool { 60 61 private final ArrayList<File> dirs = new ArrayList<File>(); 62 private final WireFormat wireFormat = new OpenWireFormat(); 63 private final HashMap<String, String> resources = new HashMap<String, String>(); 64 65 private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}"; 66 private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}"; 67 private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}"; 68 private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}"; 69 private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}"; 70 private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}"; 71 private String where; 72 private VelocityContext context; 73 private VelocityEngine velocity; 74 private boolean help; 75 76 public static void main(String[] args) throws Exception { 77 AMQJournalTool consumerTool = new AMQJournalTool(); 78 String[] directories = CommandLineSupport 79 .setOptions(consumerTool, args); 80 if (directories.length < 1) { 81 System.out 82 .println("Please specify the directories with journal data to scan"); 83 return; 84 } 85 for (int i = 0; i < directories.length; i++) { 86 consumerTool.getDirs().add(new File(directories[i])); 87 } 88 consumerTool.execute(); 89 } 90 91 public void execute() throws Exception { 92 93 if( help ) { 94 showHelp(); 95 return; 96 } 97 98 if (getDirs().size() < 1) { 99 System.out.println(""); 100 System.out.println("Invalid Usage: Please specify the directories with journal data to scan"); 101 System.out.println(""); 102 showHelp(); 103 return; 104 } 105 106 for (File dir : getDirs()) { 107 if( !dir.exists() ) { 108 System.out.println(""); 109 System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist"); 110 System.out.println(""); 111 showHelp(); 112 return; 113 } 114 if( !dir.isDirectory() ) { 115 System.out.println(""); 116 System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory"); 117 System.out.println(""); 118 showHelp(); 119 return; 120 } 121 } 122 123 124 context = new VelocityContext(); 125 List keys = Arrays.asList(context.getKeys()); 126 127 for (Iterator iterator = System.getProperties().entrySet() 128 .iterator(); iterator.hasNext();) { 129 Map.Entry kv = (Map.Entry) iterator.next(); 130 String name = (String) kv.getKey(); 131 String value = (String) kv.getValue(); 132 133 if (!keys.contains(name)) { 134 context.put(name, value); 135 } 136 } 137 138 velocity = new VelocityEngine(); 139 velocity.setProperty(Velocity.RESOURCE_LOADER, "all"); 140 velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName()); 141 velocity.init(); 142 143 144 resources.put("message", messageFormat); 145 resources.put("topicAck", topicAckFormat); 146 resources.put("queueAck", queueAckFormat); 147 resources.put("transaction", transactionFormat); 148 resources.put("trace", traceFormat); 149 resources.put("unknown", unknownFormat); 150 151 Query query = null; 152 if (where != null) { 153 query = new Query(); 154 query.parse("select * from "+Entry.class.getName()+" where "+where); 155 156 } 157 158 ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs()); 159 manager.start(); 160 try { 161 Location curr = manager.getFirstLocation(); 162 while (curr != null) { 163 164 ByteSequence data = manager.read(curr); 165 DataStructure c = (DataStructure) wireFormat.unmarshal(data); 166 167 Entry entry = new Entry(); 168 entry.setLocation(curr); 169 entry.setRecord(c); 170 entry.setData(data); 171 entry.setQuery(query); 172 process(entry); 173 174 curr = manager.getNextLocation(curr); 175 } 176 } finally { 177 manager.close(); 178 } 179 } 180 181 private void showHelp() { 182 InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt"); 183 Scanner scanner = new Scanner(is); 184 while (scanner.hasNextLine()) { 185 String line = scanner.nextLine(); 186 System.out.println(line); 187 } 188 scanner.close(); } 189 190 private void process(Entry entry) throws Exception { 191 192 Location location = entry.getLocation(); 193 DataStructure record = entry.getRecord(); 194 195 switch (record.getDataStructureType()) { 196 case ActiveMQMessage.DATA_STRUCTURE_TYPE: 197 entry.setType("ActiveMQMessage"); 198 entry.setFormater("message"); 199 display(entry); 200 break; 201 case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE: 202 entry.setType("ActiveMQBytesMessage"); 203 entry.setFormater("message"); 204 display(entry); 205 break; 206 case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE: 207 entry.setType("ActiveMQBlobMessage"); 208 entry.setFormater("message"); 209 display(entry); 210 break; 211 case ActiveMQMapMessage.DATA_STRUCTURE_TYPE: 212 entry.setType("ActiveMQMapMessage"); 213 entry.setFormater("message"); 214 display(entry); 215 break; 216 case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE: 217 entry.setType("ActiveMQObjectMessage"); 218 entry.setFormater("message"); 219 display(entry); 220 break; 221 case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE: 222 entry.setType("ActiveMQStreamMessage"); 223 entry.setFormater("message"); 224 display(entry); 225 break; 226 case ActiveMQTextMessage.DATA_STRUCTURE_TYPE: 227 entry.setType("ActiveMQTextMessage"); 228 entry.setFormater("message"); 229 display(entry); 230 break; 231 case JournalQueueAck.DATA_STRUCTURE_TYPE: 232 entry.setType("Queue Ack"); 233 entry.setFormater("queueAck"); 234 display(entry); 235 break; 236 case JournalTopicAck.DATA_STRUCTURE_TYPE: 237 entry.setType("Topic Ack"); 238 entry.setFormater("topicAck"); 239 display(entry); 240 break; 241 case JournalTransaction.DATA_STRUCTURE_TYPE: 242 entry.setType(getType((JournalTransaction) record)); 243 entry.setFormater("transaction"); 244 display(entry); 245 break; 246 case JournalTrace.DATA_STRUCTURE_TYPE: 247 entry.setType("Trace"); 248 entry.setFormater("trace"); 249 display(entry); 250 break; 251 default: 252 entry.setType("Unknown"); 253 entry.setFormater("unknown"); 254 display(entry); 255 break; 256 } 257 } 258 259 private String getType(JournalTransaction record) { 260 switch (record.getType()) { 261 case JournalTransaction.XA_PREPARE: 262 return "XA Prepare"; 263 case JournalTransaction.XA_COMMIT: 264 return "XA Commit"; 265 case JournalTransaction.XA_ROLLBACK: 266 return "XA Rollback"; 267 case JournalTransaction.LOCAL_COMMIT: 268 return "Commit"; 269 case JournalTransaction.LOCAL_ROLLBACK: 270 return "Rollback"; 271 } 272 return "Unknown Transaction"; 273 } 274 275 private void display(Entry entry) throws Exception { 276 277 if (entry.getQuery() != null) { 278 List list = Collections.singletonList(entry); 279 List results = entry.getQuery().execute(list).getResults(); 280 if (results.isEmpty()) { 281 return; 282 } 283 } 284 285 CustomResourceLoader.setResources(resources); 286 try { 287 288 context.put("location", entry.getLocation()); 289 context.put("record", entry.getRecord()); 290 context.put("type", entry.getType()); 291 if (entry.getRecord() instanceof ActiveMQMessage) { 292 context.put("body", new MessageBodyFormatter( 293 (ActiveMQMessage) entry.getRecord())); 294 } 295 296 Template template = velocity.getTemplate(entry.getFormater()); 297 PrintWriter writer = new PrintWriter(System.out); 298 template.merge(context, writer); 299 writer.println(); 300 writer.flush(); 301 } finally { 302 CustomResourceLoader.setResources(null); 303 } 304 } 305 306 public void setMessageFormat(String messageFormat) { 307 this.messageFormat = messageFormat; 308 } 309 310 public void setTopicAckFormat(String ackFormat) { 311 this.topicAckFormat = ackFormat; 312 } 313 314 public void setTransactionFormat(String transactionFormat) { 315 this.transactionFormat = transactionFormat; 316 } 317 318 public void setTraceFormat(String traceFormat) { 319 this.traceFormat = traceFormat; 320 } 321 322 public void setUnknownFormat(String unknownFormat) { 323 this.unknownFormat = unknownFormat; 324 } 325 326 public void setQueueAckFormat(String queueAckFormat) { 327 this.queueAckFormat = queueAckFormat; 328 } 329 330 public String getQuery() { 331 return where; 332 } 333 334 public void setWhere(String query) { 335 this.where = query; 336 } 337 338 public boolean isHelp() { 339 return help; 340 } 341 342 public void setHelp(boolean help) { 343 this.help = help; 344 } 345 346 /** 347 * @return the dirs 348 */ 349 public ArrayList<File> getDirs() { 350 return dirs; 351 } 352 353 }