Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.tcp;
   18   
   19   import java.io.DataInputStream;
   20   import java.io.DataOutputStream;
   21   import java.io.IOException;
   22   import java.io.InterruptedIOException;
   23   import java.net.InetAddress;
   24   import java.net.InetSocketAddress;
   25   import java.net.Socket;
   26   import java.net.SocketException;
   27   import java.net.SocketTimeoutException;
   28   import java.net.URI;
   29   import java.net.UnknownHostException;
   30   import java.util.HashMap;
   31   import java.util.Map;
   32   import java.util.concurrent.CountDownLatch;
   33   import java.util.concurrent.SynchronousQueue;
   34   import java.util.concurrent.ThreadFactory;
   35   import java.util.concurrent.ThreadPoolExecutor;
   36   import java.util.concurrent.TimeUnit;
   37   import java.util.concurrent.atomic.AtomicReference;
   38   
   39   import javax.net.SocketFactory;
   40   
   41   import org.apache.activemq.Service;
   42   import org.apache.activemq.transport.Transport;
   43   import org.apache.activemq.transport.TransportLoggerFactory;
   44   import org.apache.activemq.transport.TransportThreadSupport;
   45   import org.apache.activemq.util.IntrospectionSupport;
   46   import org.apache.activemq.util.ServiceStopper;
   47   import org.apache.activemq.wireformat.WireFormat;
   48   import org.apache.commons.logging.Log;
   49   import org.apache.commons.logging.LogFactory;
   50   
   51   /**
   52    * An implementation of the {@link Transport} interface using raw tcp/ip
   53    * 
   54    * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
   55    * @version $Revision$
   56    */
   57   public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
   58       private static final Log LOG = LogFactory.getLog(TcpTransport.class);
   59       private static final ThreadPoolExecutor SOCKET_CLOSE;
   60       protected final URI remoteLocation;
   61       protected final URI localLocation;
   62       protected final WireFormat wireFormat;
   63   
   64       protected int connectionTimeout = 30000;
   65       protected int soTimeout;
   66       protected int socketBufferSize = 64 * 1024;
   67       protected int ioBufferSize = 8 * 1024;
   68       protected boolean closeAsync=true;
   69       protected Socket socket;
   70       protected DataOutputStream dataOut;
   71       protected DataInputStream dataIn;
   72       protected TcpBufferedOutputStream buffOut = null;
   73       /**
   74        * trace=true -> the Transport stack where this TcpTransport
   75        * object will be, will have a TransportLogger layer
   76        * trace=false -> the Transport stack where this TcpTransport
   77        * object will be, will NOT have a TransportLogger layer, and therefore
   78        * will never be able to print logging messages.
   79        * This parameter is most probably set in Connection or TransportConnector URIs.
   80        */
   81       protected boolean trace = false;
   82       /**
   83        * Name of the LogWriter implementation to use.
   84        * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
   85        * This parameter is most probably set in Connection or TransportConnector URIs.
   86        */
   87       protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
   88       /**
   89        * Specifies if the TransportLogger will be manageable by JMX or not.
   90        * Also, as long as there is at least 1 TransportLogger which is manageable,
   91        * a TransportLoggerControl MBean will me created.
   92        */
   93       protected boolean dynamicManagement = false;
   94       /**
   95        * startLogging=true -> the TransportLogger object of the Transport stack
   96        * will initially write messages to the log.
   97        * startLogging=false -> the TransportLogger object of the Transport stack
   98        * will initially NOT write messages to the log.
   99        * This parameter only has an effect if trace == true.
  100        * This parameter is most probably set in Connection or TransportConnector URIs.
  101        */
  102       protected boolean startLogging = true;
  103       /**
  104        * Specifies the port that will be used by the JMX server to manage
  105        * the TransportLoggers.
  106        * This should only be set in an URI by a client (producer or consumer) since
  107        * a broker will already create a JMX server.
  108        * It is useful for people who test a broker and clients in the same machine
  109        * and want to control both via JMX; a different port will be needed.
  110        */
  111       protected int jmxPort = 1099;
  112       protected boolean useLocalHost = true;
  113       protected int minmumWireFormatVersion;
  114       protected SocketFactory socketFactory;
  115       protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
  116   
  117       private Map<String, Object> socketOptions;
  118       private Boolean keepAlive;
  119       private Boolean tcpNoDelay;
  120       private Thread runnerThread;
  121       private volatile int receiveCounter;
  122   
  123       /**
  124        * Connect to a remote Node - e.g. a Broker
  125        * 
  126        * @param wireFormat
  127        * @param socketFactory
  128        * @param remoteLocation
  129        * @param localLocation - e.g. local InetAddress and local port
  130        * @throws IOException
  131        * @throws UnknownHostException
  132        */
  133       public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
  134                           URI localLocation) throws UnknownHostException, IOException {
  135           this.wireFormat = wireFormat;
  136           this.socketFactory = socketFactory;
  137           try {
  138               this.socket = socketFactory.createSocket();
  139           } catch (SocketException e) {
  140               this.socket = null;
  141           }
  142           this.remoteLocation = remoteLocation;
  143           this.localLocation = localLocation;
  144           setDaemon(false);
  145       }
  146   
  147       /**
  148        * Initialize from a server Socket
  149        * 
  150        * @param wireFormat
  151        * @param socket
  152        * @throws IOException
  153        */
  154       public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
  155           this.wireFormat = wireFormat;
  156           this.socket = socket;
  157           this.remoteLocation = null;
  158           this.localLocation = null;
  159           setDaemon(true);
  160       }
  161   
  162       /**
  163        * A one way asynchronous send
  164        */
  165       public void oneway(Object command) throws IOException {
  166           checkStarted();
  167           wireFormat.marshal(command, dataOut);
  168           dataOut.flush();
  169       }
  170   
  171       /**
  172        * @return pretty print of 'this'
  173        */
  174       public String toString() {
  175           return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
  176       }
  177   
  178       /**
  179        * reads packets from a Socket
  180        */
  181       public void run() {
  182           LOG.trace("TCP consumer thread for " + this + " starting");
  183           this.runnerThread=Thread.currentThread();
  184           try {
  185               while (!isStopped()) {
  186                   doRun();
  187               }
  188           } catch (IOException e) {
  189               stoppedLatch.get().countDown();
  190               onException(e);
  191           } catch (Throwable e){
  192               stoppedLatch.get().countDown();
  193               IOException ioe=new IOException("Unexpected error occured");
  194               ioe.initCause(e);
  195               onException(ioe);
  196           }finally {
  197               stoppedLatch.get().countDown();
  198           }
  199       }
  200   
  201       protected void doRun() throws IOException {
  202           try {
  203               Object command = readCommand();
  204               doConsume(command);
  205           } catch (SocketTimeoutException e) {
  206           } catch (InterruptedIOException e) {
  207           }
  208       }
  209   
  210       protected Object readCommand() throws IOException {
  211           return wireFormat.unmarshal(dataIn);
  212       }
  213   
  214       // Properties
  215       // -------------------------------------------------------------------------
  216   
  217       public boolean isTrace() {
  218           return trace;
  219       }
  220   
  221       public void setTrace(boolean trace) {
  222           this.trace = trace;
  223       }
  224       
  225       public String getLogWriterName() {
  226           return logWriterName;
  227       }
  228   
  229       public void setLogWriterName(String logFormat) {
  230           this.logWriterName = logFormat;
  231       }
  232   
  233       public boolean isDynamicManagement() {
  234           return dynamicManagement;
  235       }
  236   
  237       public void setDynamicManagement(boolean useJmx) {
  238           this.dynamicManagement = useJmx;
  239       }
  240   
  241       public boolean isStartLogging() {
  242           return startLogging;
  243       }
  244   
  245       public void setStartLogging(boolean startLogging) {
  246           this.startLogging = startLogging;
  247       }
  248   
  249       public int getJmxPort() {
  250           return jmxPort;
  251       }
  252   
  253       public void setJmxPort(int jmxPort) {
  254           this.jmxPort = jmxPort;
  255       }
  256       
  257       public int getMinmumWireFormatVersion() {
  258           return minmumWireFormatVersion;
  259       }
  260   
  261       public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
  262           this.minmumWireFormatVersion = minmumWireFormatVersion;
  263       }
  264   
  265       public boolean isUseLocalHost() {
  266           return useLocalHost;
  267       }
  268   
  269       /**
  270        * Sets whether 'localhost' or the actual local host name should be used to
  271        * make local connections. On some operating systems such as Macs its not
  272        * possible to connect as the local host name so localhost is better.
  273        */
  274       public void setUseLocalHost(boolean useLocalHost) {
  275           this.useLocalHost = useLocalHost;
  276       }
  277   
  278       public int getSocketBufferSize() {
  279           return socketBufferSize;
  280       }
  281   
  282       /**
  283        * Sets the buffer size to use on the socket
  284        */
  285       public void setSocketBufferSize(int socketBufferSize) {
  286           this.socketBufferSize = socketBufferSize;
  287       }
  288   
  289       public int getSoTimeout() {
  290           return soTimeout;
  291       }
  292   
  293       /**
  294        * Sets the socket timeout
  295        */
  296       public void setSoTimeout(int soTimeout) {
  297           this.soTimeout = soTimeout;
  298       }
  299   
  300       public int getConnectionTimeout() {
  301           return connectionTimeout;
  302       }
  303   
  304       /**
  305        * Sets the timeout used to connect to the socket
  306        */
  307       public void setConnectionTimeout(int connectionTimeout) {
  308           this.connectionTimeout = connectionTimeout;
  309       }
  310   
  311       public Boolean getKeepAlive() {
  312           return keepAlive;
  313       }
  314   
  315       /**
  316        * Enable/disable TCP KEEP_ALIVE mode
  317        */
  318       public void setKeepAlive(Boolean keepAlive) {
  319           this.keepAlive = keepAlive;
  320       }
  321   
  322       public Boolean getTcpNoDelay() {
  323           return tcpNoDelay;
  324       }
  325   
  326       /**
  327        * Enable/disable the TCP_NODELAY option on the socket
  328        */
  329       public void setTcpNoDelay(Boolean tcpNoDelay) {
  330           this.tcpNoDelay = tcpNoDelay;
  331       }
  332   
  333       /**
  334        * @return the ioBufferSize
  335        */
  336       public int getIoBufferSize() {
  337           return this.ioBufferSize;
  338       }
  339   
  340       /**
  341        * @param ioBufferSize the ioBufferSize to set
  342        */
  343       public void setIoBufferSize(int ioBufferSize) {
  344           this.ioBufferSize = ioBufferSize;
  345       }
  346       
  347       /**
  348        * @return the closeAsync
  349        */
  350       public boolean isCloseAsync() {
  351           return closeAsync;
  352       }
  353   
  354       /**
  355        * @param closeAsync the closeAsync to set
  356        */
  357       public void setCloseAsync(boolean closeAsync) {
  358           this.closeAsync = closeAsync;
  359       }
  360   
  361       // Implementation methods
  362       // -------------------------------------------------------------------------
  363       protected String resolveHostName(String host) throws UnknownHostException {
  364           String localName = InetAddress.getLocalHost().getHostName();
  365           if (localName != null && isUseLocalHost()) {
  366               if (localName.equals(host)) {
  367                   return "localhost";
  368               }
  369           }
  370           return host;
  371       }
  372   
  373       /**
  374        * Configures the socket for use
  375        * 
  376        * @param sock
  377        * @throws SocketException
  378        */
  379       protected void initialiseSocket(Socket sock) throws SocketException {
  380           if (socketOptions != null) {
  381               IntrospectionSupport.setProperties(socket, socketOptions);
  382           }
  383   
  384           try {
  385               sock.setReceiveBufferSize(socketBufferSize);
  386               sock.setSendBufferSize(socketBufferSize);
  387           } catch (SocketException se) {
  388               LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
  389               LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
  390           }
  391           sock.setSoTimeout(soTimeout);
  392   
  393           if (keepAlive != null) {
  394               sock.setKeepAlive(keepAlive.booleanValue());
  395           }
  396           if (tcpNoDelay != null) {
  397               sock.setTcpNoDelay(tcpNoDelay.booleanValue());
  398           }
  399       }
  400   
  401       protected void doStart() throws Exception {
  402           connect();
  403           stoppedLatch.set(new CountDownLatch(1));
  404           super.doStart();
  405       }
  406   
  407       protected void connect() throws Exception {
  408   
  409           if (socket == null && socketFactory == null) {
  410               throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
  411           }
  412   
  413           InetSocketAddress localAddress = null;
  414           InetSocketAddress remoteAddress = null;
  415   
  416           if (localLocation != null) {
  417               localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
  418                                                    localLocation.getPort());
  419           }
  420   
  421           if (remoteLocation != null) {
  422               String host = resolveHostName(remoteLocation.getHost());
  423               remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
  424           }
  425   
  426           if (socket != null) {
  427   
  428               if (localAddress != null) {
  429                   socket.bind(localAddress);
  430               }
  431   
  432               // If it's a server accepted socket.. we don't need to connect it
  433               // to a remote address.
  434               if (remoteAddress != null) {
  435                   if (connectionTimeout >= 0) {
  436                       socket.connect(remoteAddress, connectionTimeout);
  437                   } else {
  438                       socket.connect(remoteAddress);
  439                   }
  440               }
  441   
  442           } else {
  443               // For SSL sockets.. you can't create an unconnected socket :(
  444               // This means the timout option are not supported either.
  445               if (localAddress != null) {
  446                   socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
  447                                                       localAddress.getAddress(), localAddress.getPort());
  448               } else {
  449                   socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
  450               }
  451           }
  452   
  453           initialiseSocket(socket);
  454           initializeStreams();
  455       }
  456   
  457       protected void doStop(ServiceStopper stopper) throws Exception {
  458           if (LOG.isDebugEnabled()) {
  459               LOG.debug("Stopping transport " + this);
  460           }
  461   
  462           // Closing the streams flush the sockets before closing.. if the socket
  463           // is hung.. then this hangs the close.
  464           // closeStreams();
  465           if (socket != null) {
  466               if (closeAsync) {
  467                   //closing the socket can hang also 
  468                   final CountDownLatch latch = new CountDownLatch(1);
  469                   
  470                   SOCKET_CLOSE.execute(new Runnable() {
  471       
  472                       public void run() {
  473                           try {
  474                               socket.close();
  475                           } catch (IOException e) {
  476                               LOG.debug("Caught exception closing socket",e);
  477                           }finally {
  478                               latch.countDown();
  479                           }
  480                       }
  481                       
  482                   });
  483                   latch.await(1,TimeUnit.SECONDS);
  484               }else {
  485                   try {
  486                       socket.close();
  487                   } catch (IOException e) {
  488                       LOG.debug("Caught exception closing socket",e);
  489                   }
  490               }
  491              
  492           }
  493       }
  494   
  495       /**
  496        * Override so that stop() blocks until the run thread is no longer running.
  497        */
  498       @Override
  499       public void stop() throws Exception {
  500           super.stop();
  501           CountDownLatch countDownLatch = stoppedLatch.get();
  502           if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
  503               countDownLatch.await(1,TimeUnit.SECONDS);
  504           }
  505       }
  506   
  507       protected void initializeStreams() throws Exception {
  508           TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
  509               @Override
  510               public int read() throws IOException {
  511                   receiveCounter++;
  512                   return super.read();
  513               }
  514               @Override
  515               public int read(byte[] b, int off, int len) throws IOException {
  516                   receiveCounter++;
  517                   return super.read(b, off, len);
  518               }
  519               @Override
  520               public long skip(long n) throws IOException {
  521                   receiveCounter++;
  522                   return super.skip(n);
  523               }
  524               @Override
  525               protected void fill() throws IOException {
  526                   receiveCounter++;
  527                   super.fill();
  528               }
  529           };
  530           this.dataIn = new DataInputStream(buffIn);
  531           buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
  532           this.dataOut = new DataOutputStream(buffOut);
  533       }
  534   
  535       protected void closeStreams() throws IOException {
  536           if (dataOut != null) {
  537               dataOut.close();
  538           }
  539           if (dataIn != null) {
  540               dataIn.close();
  541           }
  542       }
  543   
  544       public void setSocketOptions(Map<String, Object> socketOptions) {
  545           this.socketOptions = new HashMap<String, Object>(socketOptions);
  546       }
  547   
  548       public String getRemoteAddress() {
  549           if (socket != null) {
  550               return "" + socket.getRemoteSocketAddress();
  551           }
  552           return null;
  553       }
  554       
  555       @Override
  556       public <T> T narrow(Class<T> target) {
  557           if (target == Socket.class) {
  558               return target.cast(socket);
  559           } else if ( target == TcpBufferedOutputStream.class) {
  560               return target.cast(buffOut);
  561           }
  562           return super.narrow(target);
  563       }
  564       
  565   
  566       static {
  567           SOCKET_CLOSE =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
  568               public Thread newThread(Runnable runnable) {
  569                   Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
  570                   thread.setPriority(Thread.MAX_PRIORITY);
  571                   thread.setDaemon(true);
  572                   return thread;
  573               }
  574           });
  575       }
  576   
  577   
  578       public int getReceiveCounter() {
  579           return receiveCounter;
  580       }
  581   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]