1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 package org.apache.synapse.transport.vfs; 20 21 import org.apache.synapse.transport.base.BaseConstants; 22 import org.apache.synapse.transport.base.BaseUtils; 23 import org.apache.synapse.transport.base.AbstractPollingTransportListener; 24 import org.apache.synapse.transport.base.ManagementSupport; 25 import org.apache.axiom.soap.SOAPEnvelope; 26 import org.apache.axis2.addressing.EndpointReference; 27 import org.apache.axis2.AxisFault; 28 import org.apache.axis2.Constants; 29 import org.apache.axis2.description; 30 import org.apache.axis2.transport.TransportUtils; 31 import org.apache.axis2.context.ConfigurationContext; 32 import org.apache.axis2.context.MessageContext; 33 import org.apache.commons.vfs; 34 import org.apache.commons.vfs.impl.StandardFileSystemManager; 35 36 import javax.xml.namespace.QName; 37 import javax.xml.stream.XMLStreamException; 38 39 import java.util; 40 import java.io.File; 41 import java.io.IOException; 42 import java.io.InputStream; 43 import java.text.DateFormat; 44 import java.text.SimpleDateFormat; 45 46 /** 47 * The "vfs" transport is a polling based transport - i.e. it gets kicked off at 48 * specified periodic durations, and would iterate through a list of directories or files 49 * specified according to poll durations. When scanning a directory, it will match 50 * its contents against a given regex to find the set of input files. For compressed 51 * files, the contents could be matched against a regex to find individual files. 52 * Each of these files thus found would be submitted as an Axis2 "message" into the 53 * Axis2 engine. 54 * 55 * The processed files would be deleted or renamed as specified in the configuration 56 * 57 * Supported VFS example URIs 58 * 59 * file:///directory/filename.ext 60 * file:////somehost/someshare/afile.txt 61 * jar:../lib/classes.jar!/META-INF/manifest.mf 62 * zip:http://somehost/downloads/somefile.zip 63 * jar:zip:outer.zip!/nested.jar!/somedir 64 * jar:zip:outer.zip!/nested.jar!/some%21dir 65 * tar:gz:http://anyhost/dir/mytar.tar.gz!/mytar.tar!/path/in/tar/README.txt 66 * tgz:file://anyhost/dir/mytar.tgz!/somepath/somefile 67 * gz:/my/gz/file.gz 68 * http://somehost:8080/downloads/somefile.jar 69 * http://myusername@somehost/index.html 70 * webdav://somehost:8080/dist 71 * ftp://myusername:mypassword@somehost/pub/downloads/somefile.tgz[?passive=true] 72 * sftp://myusername:mypassword@somehost/pub/downloads/somefile.tgz 73 * smb://somehost/home 74 * 75 * axis2.xml - transport definition 76 * <transportReceiver name="file" class="org.apache.synapse.transport.vfs.VFSTransportListener"/> 77 * 78 * services.xml - service attachment 79 * required parameters 80 * <parameter name="transport.vfs.FileURI">..</parameter> 81 * <parameter name="transport.vfs.ContentType">..</parameter> 82 * 83 * optional parameters 84 * <parameter name="transport.vfs.FileNamePattern">..</parameter> 85 * <parameter name="transport.PollInterval">..</parameter> 86 * 87 * <parameter name="transport.vfs.ActionAfterProcess">..</parameter> 88 * <parameter name="transport.vfs.ActionAfterErrors" >..</parameter> 89 * <parameter name="transport.vfs.ActionAfterFailure">..</parameter> 90 * 91 * <parameter name="transport.vfs.ReplyFileURI" >..</parameter> 92 * <parameter name="transport.vfs.ReplyFileName">..</parameter> 93 * 94 * FTP testing URIs 95 * ftp://ftpuser:password@asankha/somefile.csv?passive=true 96 * ftp://vfs:apache@vfs.netfirms.com/somepath/somefile.xml?passive=true 97 */ 98 public class VFSTransportListener extends AbstractPollingTransportListener 99 implements ManagementSupport { 100 101 public static final String TRANSPORT_NAME = "vfs"; 102 103 public static final String DELETE = "DELETE"; 104 public static final String MOVE = "MOVE"; 105 106 /** Keep the list of directories/files and poll durations */ 107 private final List<PollTableEntry> pollTable = new ArrayList<PollTableEntry>(); 108 /** Keep the list of removed pollTable entries */ 109 private final List<PollTableEntry> removeTable = new ArrayList<PollTableEntry>(); 110 /** The VFS file system manager */ 111 private FileSystemManager fsManager = null; 112 113 /** 114 * Initializes the VFS transport by getting the VFS File System manager 115 * @param cfgCtx the Axsi2 configuration context 116 * @param trpInDesc the VFS transport in description from the axis2.xml 117 * @throws AxisFault on error 118 */ 119 public void init(ConfigurationContext cfgCtx, TransportInDescription trpInDesc) 120 throws AxisFault { 121 setTransportName(TRANSPORT_NAME); 122 super.init(cfgCtx, trpInDesc); 123 try { 124 StandardFileSystemManager fsm = new StandardFileSystemManager(); 125 fsm.setConfiguration(getClass().getClassLoader().getResource("providers.xml")); 126 fsm.init(); 127 fsManager = fsm; 128 } catch (FileSystemException e) { 129 handleException("Error initializing the file transport : " + e.getMessage(), e); 130 } 131 } 132 133 /** 134 * On a poller tick, iterate over the list of directories/files and check if 135 * it is time to scan the contents for new files 136 */ 137 public void onPoll() { 138 if (!removeTable.isEmpty()) { 139 pollTable.removeAll(removeTable); 140 } 141 142 for (PollTableEntry entry : pollTable) { 143 long startTime = System.currentTimeMillis(); 144 if (startTime > entry.getNextPollTime()) { 145 scanFileOrDirectory(entry, entry.getFileURI()); 146 } 147 } 148 } 149 150 /** 151 * Search for files that match the given regex pattern and create a list 152 * Then process each of these files and update the status of the scan on 153 * the poll table 154 * @param entry the poll table entry for the scan 155 * @param fileURI the file or directory to be scanned 156 */ 157 private void scanFileOrDirectory(final PollTableEntry entry, String fileURI) { 158 159 FileObject fileObject = null; 160 161 if (log.isDebugEnabled()) { 162 log.debug("Scanning directory or file : " + fileURI); 163 } 164 165 boolean wasError = true; 166 int retryCount = 0; 167 int maxRetryCount = entry.getMaxRetryCount(); 168 long reconnectionTimeout = entry.getReconnectTimeout(); 169 170 while(wasError) { 171 try { 172 retryCount++; 173 fileObject = fsManager.resolveFile(fileURI); 174 175 if(fileObject == null) { 176 log.error("fileObject is null"); 177 throw new FileSystemException("fileObject is null"); 178 } 179 180 wasError = false; 181 182 } catch(FileSystemException e) { 183 log.error("cannot resolve fileObject", e); 184 if(maxRetryCount <= retryCount) 185 processFailure("cannot resolve fileObject repeatedly: " + e.getMessage(), e, entry); 186 return; 187 } 188 189 if(wasError) { 190 try { 191 Thread.sleep(reconnectionTimeout); 192 } catch (InterruptedException e2) { 193 e2.printStackTrace(); 194 } 195 } 196 } 197 198 try { 199 if (fileObject.exists() && fileObject.isReadable()) { 200 201 entry.setLastPollState(PollTableEntry.NONE); 202 FileObject[] children = null; 203 try { 204 children = fileObject.getChildren(); 205 } catch (FileSystemException ignore) {} 206 207 // if this is a file that would translate to a single message 208 if (children == null || children.length == 0) { 209 210 if (fileObject.getType() == FileType.FILE) { 211 try { 212 processFile(entry, fileObject); 213 entry.setLastPollState(PollTableEntry.SUCCSESSFUL); 214 metrics.incrementMessagesReceived(); 215 216 } catch (AxisFault e) { 217 entry.setLastPollState(PollTableEntry.FAILED); 218 metrics.incrementFaultsReceiving(); 219 } 220 221 moveOrDeleteAfterProcessing(entry, fileObject); 222 } 223 224 } else { 225 int failCount = 0; 226 int successCount = 0; 227 228 if (log.isDebugEnabled()) { 229 log.debug("File name pattern :" + entry.getFileNamePattern()); 230 } 231 for (FileObject child : children) { 232 if (log.isDebugEnabled()) { 233 log.debug("Matching file :" + child.getName().getBaseName()); 234 } 235 if ((entry.getFileNamePattern() != null) 236 && (child.getName().getBaseName().matches(entry.getFileNamePattern()))) { 237 try { 238 if (log.isDebugEnabled()) { 239 log.debug("Processing file :" + child); 240 } 241 processFile(entry, child); 242 successCount++; 243 // tell moveOrDeleteAfterProcessing() file was success 244 entry.setLastPollState(PollTableEntry.SUCCSESSFUL); 245 metrics.incrementMessagesReceived(); 246 247 } catch (Exception e) { 248 logException("Error processing File URI : " + child.getName(), e); 249 failCount++; 250 // tell moveOrDeleteAfterProcessing() file failed 251 entry.setLastPollState(PollTableEntry.FAILED); 252 metrics.incrementFaultsReceiving(); 253 } 254 255 moveOrDeleteAfterProcessing(entry, child); 256 } 257 } 258 259 if (failCount == 0 && successCount > 0) { 260 entry.setLastPollState(PollTableEntry.SUCCSESSFUL); 261 } else if (successCount == 0 && failCount > 0) { 262 entry.setLastPollState(PollTableEntry.FAILED); 263 } else { 264 entry.setLastPollState(PollTableEntry.WITH_ERRORS); 265 } 266 } 267 268 // processing of this poll table entry is complete 269 long now = System.currentTimeMillis(); 270 entry.setLastPollTime(now); 271 entry.setNextPollTime(now + entry.getPollInterval()); 272 273 } else { 274 if (log.isDebugEnabled()) { 275 log.debug("Unable to access or read file or directory : " + fileURI); 276 } 277 } 278 279 } catch (FileSystemException e) { 280 processFailure("Error checking for existence and readability : " + fileURI, e, entry); 281 } 282 283 } 284 285 /** 286 * Take specified action to either move or delete the processed file, depending on the outcome 287 * @param entry the PollTableEntry for the file that has been processed 288 * @param fileObject the FileObject representing the file to be moved or deleted 289 */ 290 private void moveOrDeleteAfterProcessing(final PollTableEntry entry, FileObject fileObject) { 291 292 String moveToDirectory = null; 293 try { 294 switch (entry.getLastPollState()) { 295 case PollTableEntry.SUCCSESSFUL: 296 if (entry.getActionAfterProcess() == PollTableEntry.MOVE) { 297 moveToDirectory = entry.getMoveAfterProcess(); 298 } 299 break; 300 301 case PollTableEntry.WITH_ERRORS: 302 if (entry.getActionAfterProcess() == PollTableEntry.MOVE) { 303 moveToDirectory = entry.getMoveAfterErrors(); 304 } 305 break; 306 307 case PollTableEntry.FAILED: 308 if (entry.getActionAfterProcess() == PollTableEntry.MOVE) { 309 moveToDirectory = entry.getMoveAfterFailure(); 310 } 311 break; 312 case PollTableEntry.NONE: 313 return; 314 } 315 316 if (moveToDirectory != null) { 317 String prefix = ""; 318 if(entry.getMoveTimestampFormat() != null) { 319 Date now = new Date(); 320 prefix = entry.getMoveTimestampFormat().format(now); 321 } 322 String destName = moveToDirectory + File.separator + prefix + fileObject.getName().getBaseName(); 323 if (log.isDebugEnabled()) { 324 log.debug("Moving to file :" + destName); 325 } 326 FileObject dest = fsManager.resolveFile(destName); 327 try { 328 fileObject.moveTo(dest); 329 } catch (FileSystemException e) { 330 log.error("Error moving file : " + fileObject + " to " + moveToDirectory, e); 331 } 332 } else { 333 try { 334 if (log.isDebugEnabled()) { 335 log.debug("Deleting file :" + fileObject); 336 } 337 fileObject.close(); 338 if (!fileObject.delete()) { 339 log.error("Cannot delete file : " + fileObject); 340 } 341 } catch (FileSystemException e) { 342 log.error("Error deleting file : " + fileObject, e); 343 } 344 } 345 346 } catch (FileSystemException e) { 347 log.error("Error resolving directory to move after processing : " + moveToDirectory, e); 348 } 349 } 350 351 /** 352 * Process a single file through Axis2 353 * @param entry the PollTableEntry for the file (or its parent directory or archive) 354 * @param file the file that contains the actual message pumped into Axis2 355 * @throws AxisFault on error 356 */ 357 private void processFile(PollTableEntry entry, FileObject file) throws AxisFault { 358 359 try { 360 FileContent content = file.getContent(); 361 String fileName = file.getName().getBaseName(); 362 String filePath = file.getName().getPath(); 363 364 metrics.incrementBytesReceived(content.getSize()); 365 366 Map<String, Object> transportHeaders = new HashMap<String, Object>(); 367 transportHeaders.put(VFSConstants.FILE_PATH, filePath); 368 transportHeaders.put(VFSConstants.FILE_NAME, fileName); 369 370 try { 371 transportHeaders.put(VFSConstants.FILE_LENGTH, Long.valueOf(content.getSize())); 372 } catch (FileSystemException ignore) {} 373 try { 374 transportHeaders.put(VFSConstants.LAST_MODIFIED, 375 Long.valueOf(content.getLastModifiedTime())); 376 } catch (FileSystemException ignore) {} 377 378 // compute the unique message ID 379 String messageId = filePath + "_" + fileName + 380 "_" + System.currentTimeMillis() + "_" + (int) Math.random() * 1000; 381 382 String contentType = entry.getContentType(); 383 if (!BaseUtils.isValid(contentType)) { 384 if (file.getName().getExtension().toLowerCase().endsWith(".xml")) { 385 contentType = "text/xml"; 386 } else if (file.getName().getExtension().toLowerCase().endsWith(".txt")) { 387 contentType = "text/plain"; 388 } 389 } 390 391 // if the content type was not found, but the service defined it.. use it 392 if (contentType == null) { 393 if (entry.getContentType() != null) { 394 contentType = entry.getContentType(); 395 } else if (VFSUtils.getInstace().getProperty( 396 content, BaseConstants.CONTENT_TYPE) != null) { 397 contentType = 398 VFSUtils.getInstace().getProperty(content, BaseConstants.CONTENT_TYPE); 399 } 400 } 401 402 MessageContext msgContext = createMessageContext(); 403 // set to bypass dispatching if we know the service - we already should! 404 AxisService service = cfgCtx.getAxisConfiguration().getService(entry.getServiceName()); 405 msgContext.setAxisService(service); 406 407 // find the operation for the message, or default to one 408 Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); 409 QName operationQName = ( 410 operationParam != null ? 411 BaseUtils.getQNameFromString(operationParam.getValue()) : 412 BaseConstants.DEFAULT_OPERATION); 413 414 AxisOperation operation = service.getOperation(operationQName); 415 if (operation != null) { 416 msgContext.setAxisOperation(operation); 417 } 418 419 // does the service specify a default reply file URI ? 420 Parameter param = service.getParameter(VFSConstants.REPLY_FILE_URI); 421 if (param != null && param.getValue() != null) { 422 msgContext.setProperty( 423 Constants.OUT_TRANSPORT_INFO, 424 new VFSOutTransportInfo((String) param.getValue())); 425 } 426 427 428 // set the message payload to the message context 429 InputStream in = content.getInputStream(); 430 try { 431 SOAPEnvelope envelope; 432 try { 433 envelope = TransportUtils.createSOAPMessage(msgContext, in, contentType); 434 } catch (XMLStreamException ex) { 435 handleException("Error parsing XML", ex); 436 return; 437 } 438 msgContext.setEnvelope(envelope); 439 } 440 finally { 441 try { 442 in.close(); 443 } catch (IOException ex) { 444 handleException("Error closing stream", ex); 445 } 446 } 447 448 handleIncomingMessage( 449 msgContext, 450 transportHeaders, 451 null, //* SOAP Action - not applicable *// 452 contentType 453 ); 454 455 if (log.isDebugEnabled()) { 456 log.debug("Processed file : " + file + " of Content-type : " + contentType); 457 } 458 459 } catch (FileSystemException e) { 460 handleException("Error reading file content or attributes : " + file, e); 461 462 } finally { 463 try { 464 file.close(); 465 } catch (FileSystemException warn) { 466 log.warn("Cannot close file after processing : " + file.getName().getPath(), warn); 467 } 468 } 469 } 470 471 /** 472 * method to log a failure to the log file and to update the last poll status and time 473 * @param msg text for the log message 474 * @param e optiona exception encountered or null 475 * @param entry the PollTableEntry 476 */ 477 private void processFailure(String msg, Exception e, PollTableEntry entry) { 478 if (e == null) { 479 log.error(msg); 480 } else { 481 log.error(msg, e); 482 } 483 long now = System.currentTimeMillis(); 484 entry.setLastPollState(PollTableEntry.FAILED); 485 entry.setLastPollTime(now); 486 entry.setNextPollTime(now + entry.getPollInterval()); 487 } 488 489 /** 490 * Get the EPR for the given service over the VFS transport 491 * vfs:uri (@see http://jakarta.apache.org/commons/vfs/filesystems.html for the URI formats) 492 * @param serviceName service name 493 * @param ip ignored 494 * @return the EPR for the service 495 * @throws AxisFault not used 496 */ 497 public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { 498 for (PollTableEntry entry : pollTable) { 499 if (entry.getServiceName().equals(serviceName) 500 || serviceName.startsWith(entry.getServiceName() + ".")) { 501 return new EndpointReference[]{new EndpointReference("vfs:" + entry.getFileURI())}; 502 } 503 } 504 return null; 505 } 506 507 protected void startListeningForService(AxisService service) { 508 509 if (service.getName().startsWith("__")) { 510 return; 511 } 512 513 Parameter param = service.getParameter(BaseConstants.TRANSPORT_POLL_INTERVAL); 514 long pollInterval = BaseConstants.DEFAULT_POLL_INTERVAL; 515 if (param != null && param.getValue() instanceof String) { 516 try { 517 pollInterval = Integer.parseInt(param.getValue().toString()); 518 } catch (NumberFormatException e) { 519 log.error("Invalid poll interval : " + param.getValue() + " for service : " + 520 service.getName() + " default to : " 521 + (BaseConstants.DEFAULT_POLL_INTERVAL/1000) + "sec", e); 522 disableTransportForService(service); 523 } 524 } 525 526 PollTableEntry entry = new PollTableEntry(); 527 try { 528 entry.setFileURI( 529 BaseUtils.getRequiredServiceParam(service, VFSConstants.TRANSPORT_FILE_FILE_URI)); 530 entry.setFileNamePattern(BaseUtils.getOptionalServiceParam(service, 531 VFSConstants.TRANSPORT_FILE_FILE_NAME_PATTERN)); 532 entry.setContentType(BaseUtils.getRequiredServiceParam(service, 533 VFSConstants.TRANSPORT_FILE_CONTENT_TYPE)); 534 String option = BaseUtils.getOptionalServiceParam( 535 service, VFSConstants.TRANSPORT_FILE_ACTION_AFTER_PROCESS); 536 entry.setActionAfterProcess( 537 MOVE.equals(option) ? PollTableEntry.MOVE : PollTableEntry.DELETE); 538 option = BaseUtils.getOptionalServiceParam( 539 service, VFSConstants.TRANSPORT_FILE_ACTION_AFTER_ERRORS); 540 entry.setActionAfterErrors( 541 MOVE.equals(option) ? PollTableEntry.MOVE : PollTableEntry.DELETE); 542 option = BaseUtils.getOptionalServiceParam( 543 service, VFSConstants.TRANSPORT_FILE_ACTION_AFTER_FAILURE); 544 entry.setActionAfterFailure( 545 MOVE.equals(option) ? PollTableEntry.MOVE : PollTableEntry.DELETE); 546 547 String moveDirectoryAfterProcess = BaseUtils.getOptionalServiceParam( 548 service, VFSConstants.TRANSPORT_FILE_MOVE_AFTER_PROCESS); 549 entry.setMoveAfterProcess(moveDirectoryAfterProcess); 550 String moveDirectoryAfterErrors = BaseUtils.getOptionalServiceParam( 551 service, VFSConstants.TRANSPORT_FILE_MOVE_AFTER_ERRORS); 552 entry.setMoveAfterErrors(moveDirectoryAfterErrors); 553 String moveDirectoryAfterFailure = BaseUtils.getOptionalServiceParam( 554 service, VFSConstants.TRANSPORT_FILE_MOVE_AFTER_FAILURE); 555 entry.setMoveAfterFailure(moveDirectoryAfterFailure); 556 557 String moveFileTimestampFormat = BaseUtils.getOptionalServiceParam( 558 service, VFSConstants.TRANSPORT_FILE_MOVE_TIMESTAMP_FORMAT); 559 if(moveFileTimestampFormat != null) { 560 DateFormat moveTimestampFormat = new SimpleDateFormat(moveFileTimestampFormat); 561 entry.setMoveTimestampFormat(moveTimestampFormat); 562 } 563 564 String strMaxRetryCount = BaseUtils.getOptionalServiceParam( 565 service, VFSConstants.MAX_RETRY_COUNT); 566 if(strMaxRetryCount != null) 567 entry.setMaxRetryCount(Integer.parseInt(strMaxRetryCount)); 568 569 String strReconnectTimeout = BaseUtils.getOptionalServiceParam( 570 service, VFSConstants.RECONNECT_TIMEOUT); 571 if(strReconnectTimeout != null) 572 entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout) * 1000); 573 574 entry.setServiceName(service.getName()); 575 schedulePoll(service, pollInterval); 576 pollTable.add(entry); 577 578 } catch (AxisFault axisFault) { 579 String msg = "Error configuring the File/VFS transport for Service : " + 580 service.getName() + " :: " + axisFault.getMessage(); 581 log.warn(msg); 582 disableTransportForService(service); 583 } 584 } 585 586 protected void stopListeningForService(AxisService service) { 587 Iterator iter = pollTable.iterator(); 588 while (iter.hasNext()) { 589 PollTableEntry entry = (PollTableEntry) iter.next(); 590 if (service.getName().equals(entry.getServiceName())) { 591 cancelPoll(service); 592 removeTable.add(entry); 593 } 594 } 595 } 596 }