Home » synapse-1.2-src » org.apache.synapse.transport.vfs » [javadoc | source]

    1   /*
    2   *  Licensed to the Apache Software Foundation (ASF) under one
    3   *  or more contributor license agreements.  See the NOTICE file
    4   *  distributed with this work for additional information
    5   *  regarding copyright ownership.  The ASF licenses this file
    6   *  to you under the Apache License, Version 2.0 (the
    7   *  "License"); you may not use this file except in compliance
    8   *  with the License.  You may obtain a copy of the License at
    9   *
   10   *   http://www.apache.org/licenses/LICENSE-2.0
   11   *
   12   *  Unless required by applicable law or agreed to in writing,
   13   *  software distributed under the License is distributed on an
   14   *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15   *  KIND, either express or implied.  See the License for the
   16   *  specific language governing permissions and limitations
   17   *  under the License.
   18   */
   19   package org.apache.synapse.transport.vfs;
   20   
   21   import org.apache.synapse.transport.base;
   22   import org.apache.axis2.transport.OutTransportInfo;
   23   import org.apache.axis2.transport.MessageFormatter;
   24   import org.apache.axis2.context.MessageContext;
   25   import org.apache.axis2.context.ConfigurationContext;
   26   import org.apache.axis2.AxisFault;
   27   import org.apache.axis2.description.TransportOutDescription;
   28   import org.apache.commons.vfs;
   29   import org.apache.commons.vfs.impl.StandardFileSystemManager;
   30   import org.apache.commons.io.output.CountingOutputStream;
   31   import org.apache.commons.logging.LogFactory;
   32   import org.apache.axiom.om.OMOutputFormat;
   33   
   34   import java.io.IOException;
   35   
   36   /**
   37    * axis2.xml - transport definition
   38    *  <transportSender name="file" class="org.apache.synapse.transport.vfs.VFSTransportSender"/>
   39    */
   40   public class VFSTransportSender extends AbstractTransportSender implements ManagementSupport {
   41   
   42       public static final String TRANSPORT_NAME = "vfs";
   43   
   44       /** The VFS file system manager */
   45       private FileSystemManager fsManager = null;
   46   
   47       /**
   48        * The public constructor
   49        */
   50       public VFSTransportSender() {
   51           log = LogFactory.getLog(VFSTransportSender.class);
   52       }
   53   
   54       /**
   55        * Initialize the VFS file system manager and be ready to send messages
   56        * @param cfgCtx the axis2 configuration context
   57        * @param transportOut the transport-out description
   58        * @throws AxisFault on error
   59        */
   60       public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
   61           setTransportName(TRANSPORT_NAME);
   62           super.init(cfgCtx, transportOut);
   63           try {
   64               StandardFileSystemManager fsm = new StandardFileSystemManager();
   65               fsm.setConfiguration(getClass().getClassLoader().getResource("providers.xml"));
   66               fsm.init();
   67               fsManager = fsm;
   68           } catch (FileSystemException e) {
   69               handleException("Error initializing the file transport : " + e.getMessage(), e);
   70           }
   71       }
   72   
   73       /**
   74        * Send the given message over the VFS transport
   75        *
   76        * @param msgCtx the axis2 message context
   77        * @return the result of the send operation / handler
   78        * @throws AxisFault on error
   79        */
   80       public void sendMessage(MessageContext msgCtx, String targetAddress,
   81           OutTransportInfo outTransportInfo) throws AxisFault {
   82   
   83           VFSOutTransportInfo vfsOutInfo = null;
   84   
   85           if (targetAddress != null) {
   86               vfsOutInfo = new VFSOutTransportInfo(targetAddress);
   87           } else if (outTransportInfo != null && outTransportInfo instanceof VFSOutTransportInfo) {
   88               vfsOutInfo = (VFSOutTransportInfo) outTransportInfo;
   89           }
   90   
   91           if (vfsOutInfo != null) {
   92               FileObject replyFile = null;
   93               try {
   94                   
   95                   boolean wasError = true;
   96                   int retryCount = 0;
   97                   int maxRetryCount = VFSUtils.getMaxRetryCount(msgCtx, vfsOutInfo);
   98                   long reconnectionTimeout = VFSUtils.getReconnectTimout(msgCtx, vfsOutInfo);
   99                   boolean append = vfsOutInfo.isAppend();
  100                   
  101                   while(wasError == true) {
  102                     try {
  103                       retryCount++;
  104                       replyFile = fsManager.resolveFile(vfsOutInfo.getOutFileURI());
  105                       
  106                       if(replyFile == null) {
  107                         log.error("replyFile is null");
  108                         throw new FileSystemException("replyFile is null");
  109                       }
  110                       
  111                       wasError = false;
  112                                           
  113                     } catch(FileSystemException e) {
  114                       log.error("cannot resolve replyFile", e);
  115                       if(maxRetryCount <= retryCount)
  116                         handleException("cannot resolve replyFile repeatedly: " + e.getMessage(), e);
  117                     }
  118                   
  119                     if(wasError == true) {
  120                       try {
  121                         Thread.sleep(reconnectionTimeout);
  122                       } catch (InterruptedException e2) {
  123                         e2.printStackTrace();
  124                       }
  125                     }
  126                   }
  127                   
  128                   if (replyFile.exists()) {
  129   
  130                       if (replyFile.getType() == FileType.FOLDER) {
  131                           // we need to write a file containing the message to this folder
  132                           FileObject responseFile = fsManager.resolveFile(replyFile,
  133                               VFSUtils.getFileName(msgCtx, vfsOutInfo));
  134                           if (!responseFile.exists()) {
  135                               responseFile.createFile();
  136                           }
  137                           populateResponseFile(responseFile, msgCtx, append);
  138   
  139                       } else if (replyFile.getType() == FileType.FILE) {
  140                           populateResponseFile(replyFile, msgCtx, append);
  141                           
  142                       } else {
  143                           handleException("Unsupported reply file type : " + replyFile.getType() +
  144                               " for file : " + vfsOutInfo.getOutFileURI());
  145                       }
  146                   } else {
  147                       replyFile.createFile();
  148                       populateResponseFile(replyFile, msgCtx, append);
  149                   }
  150               } catch (FileSystemException e) {
  151                   handleException("Error resolving reply file : " +
  152                       vfsOutInfo.getOutFileURI(), e);
  153               } finally {
  154                   if (replyFile != null) {
  155                       try {
  156                           replyFile.close();
  157                       } catch (FileSystemException ignore) {}
  158                   }
  159               }
  160           } else {
  161               handleException("Unable to determine out transport information to send message");
  162           }
  163       }
  164   
  165       private void populateResponseFile(FileObject responseFile, MessageContext msgContext, boolean append) throws AxisFault {
  166           MessageFormatter messageFormatter = BaseUtils.getMessageFormatter(msgContext);
  167           OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
  168           try {
  169               CountingOutputStream os = new CountingOutputStream(responseFile.getContent().getOutputStream(append));
  170               try {
  171                   messageFormatter.writeTo(msgContext, format, os, true);
  172               } finally {
  173                   os.close();
  174               }
  175               // update metrics
  176               metrics.incrementMessagesSent();
  177               metrics.incrementBytesSent(os.getByteCount());
  178           } catch (FileSystemException e) {
  179               metrics.incrementFaultsSending();
  180               handleException("IO Error while creating response file : " + responseFile.getName(), e);
  181           } catch (IOException e) {
  182               metrics.incrementFaultsSending();
  183               handleException("IO Error while creating response file : " + responseFile.getName(), e);
  184           }
  185       }
  186   }

Home » synapse-1.2-src » org.apache.synapse.transport.vfs » [javadoc | source]