Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.tcp;
   18   
   19   import java.io.IOException;
   20   import java.net.URI;
   21   import java.net.URISyntaxException;
   22   import java.net.UnknownHostException;
   23   import java.util.HashMap;
   24   import java.util.Map;
   25   
   26   import javax.net.ServerSocketFactory;
   27   import javax.net.SocketFactory;
   28   
   29   import org.apache.activemq.openwire.OpenWireFormat;
   30   import org.apache.activemq.transport.InactivityMonitor;
   31   import org.apache.activemq.transport.Transport;
   32   import org.apache.activemq.transport.TransportFactory;
   33   import org.apache.activemq.transport.TransportLoggerFactory;
   34   import org.apache.activemq.transport.TransportServer;
   35   import org.apache.activemq.transport.WireFormatNegotiator;
   36   import org.apache.activemq.util.IOExceptionSupport;
   37   import org.apache.activemq.util.IntrospectionSupport;
   38   import org.apache.activemq.util.URISupport;
   39   import org.apache.activemq.wireformat.WireFormat;
   40   import org.apache.commons.logging.Log;
   41   import org.apache.commons.logging.LogFactory;
   42   
   43   /**
   44    * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
   45    * @version $Revision$
   46    */
   47   public class TcpTransportFactory extends TransportFactory {
   48       private static final Log LOG = LogFactory.getLog(TcpTransportFactory.class);
   49   
   50       public TransportServer doBind(final URI location) throws IOException {
   51           try {
   52               Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
   53   
   54               ServerSocketFactory serverSocketFactory = createServerSocketFactory();
   55               TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
   56               server.setWireFormatFactory(createWireFormatFactory(options));
   57               IntrospectionSupport.setProperties(server, options);
   58               Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
   59               server.setTransportOption(transportOptions);
   60               server.bind();
   61   
   62               return server;
   63           } catch (URISyntaxException e) {
   64               throw IOExceptionSupport.create(e);
   65           }
   66       }
   67   
   68       /**
   69        * Allows subclasses of TcpTransportFactory to create custom instances of
   70        * TcpTransportServer.
   71        * 
   72        * @param location
   73        * @param serverSocketFactory
   74        * @return
   75        * @throws IOException
   76        * @throws URISyntaxException
   77        */
   78       protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
   79           return new TcpTransportServer(this, location, serverSocketFactory);
   80       }
   81   
   82       public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
   83   
   84           TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
   85           IntrospectionSupport.setProperties(tcpTransport, options);
   86   
   87           Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
   88           tcpTransport.setSocketOptions(socketOptions);
   89           
   90           if (tcpTransport.isTrace()) {
   91               try {
   92                   transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
   93                           tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
   94               } catch (Throwable e) {
   95                   LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
   96               }
   97           }
   98   
   99           boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true"));
  100           if (useInactivityMonitor && isUseInactivityMonitor(transport)) {
  101               transport = new InactivityMonitor(transport, format);
  102               IntrospectionSupport.setProperties(transport, options);
  103           }
  104           
  105   
  106           // Only need the WireFormatNegotiator if using openwire
  107           if (format instanceof OpenWireFormat) {
  108               transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
  109           }
  110   
  111           return super.compositeConfigure(transport, format, options);
  112       }
  113   
  114       private String getOption(Map options, String key, String def) {
  115           String rc = (String) options.remove(key);
  116           if( rc == null ) {
  117               rc = def;
  118           }
  119           return rc;
  120       }
  121   
  122       /**
  123        * Returns true if the inactivity monitor should be used on the transport
  124        */
  125       protected boolean isUseInactivityMonitor(Transport transport) {
  126           return true;
  127       }
  128   
  129       protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
  130           URI localLocation = null;
  131           String path = location.getPath();
  132           // see if the path is a local URI location
  133           if (path != null && path.length() > 0) {
  134               int localPortIndex = path.indexOf(':');
  135               try {
  136                   Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
  137                   String localString = location.getScheme() + ":/" + path;
  138                   localLocation = new URI(localString);
  139               } catch (Exception e) {
  140                   LOG.warn("path isn't a valid local location for TcpTransport to use", e);
  141               }
  142           }
  143           SocketFactory socketFactory = createSocketFactory();
  144           return createTcpTransport(wf, socketFactory, location, localLocation);
  145       }
  146   
  147       /**
  148        * Allows subclasses of TcpTransportFactory to provide a create custom
  149        * TcpTransport intances.
  150        * 
  151        * @param location
  152        * @param wf
  153        * @param socketFactory
  154        * @param localLocation
  155        * @return
  156        * @throws UnknownHostException
  157        * @throws IOException
  158        */
  159       protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
  160           return new TcpTransport(wf, socketFactory, location, localLocation);
  161       }
  162   
  163       protected ServerSocketFactory createServerSocketFactory() throws IOException {
  164           return ServerSocketFactory.getDefault();
  165       }
  166   
  167       protected SocketFactory createSocketFactory() throws IOException {
  168           return SocketFactory.getDefault();
  169       }
  170   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]