Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.tcp;
   18   
   19   import java.io.IOException;
   20   import java.net.InetAddress;
   21   import java.net.InetSocketAddress;
   22   import java.net.ServerSocket;
   23   import java.net.Socket;
   24   import java.net.SocketException;
   25   import java.net.SocketTimeoutException;
   26   import java.net.URI;
   27   import java.net.URISyntaxException;
   28   import java.net.UnknownHostException;
   29   import java.util.HashMap;
   30   import java.util.Map;
   31   import java.util.concurrent.BlockingQueue;
   32   import java.util.concurrent.LinkedBlockingQueue;
   33   import java.util.concurrent.TimeUnit;
   34   
   35   import javax.net.ServerSocketFactory;
   36   
   37   import org.apache.activemq.Service;
   38   import org.apache.activemq.ThreadPriorities;
   39   import org.apache.activemq.command.BrokerInfo;
   40   import org.apache.activemq.openwire.OpenWireFormatFactory;
   41   import org.apache.activemq.transport.Transport;
   42   import org.apache.activemq.transport.TransportLoggerFactory;
   43   import org.apache.activemq.transport.TransportServer;
   44   import org.apache.activemq.transport.TransportServerThreadSupport;
   45   import org.apache.activemq.util.IOExceptionSupport;
   46   import org.apache.activemq.util.IntrospectionSupport;
   47   import org.apache.activemq.util.ServiceListener;
   48   import org.apache.activemq.util.ServiceStopper;
   49   import org.apache.activemq.util.ServiceSupport;
   50   import org.apache.activemq.wireformat.WireFormat;
   51   import org.apache.activemq.wireformat.WireFormatFactory;
   52   import org.apache.commons.logging.Log;
   53   import org.apache.commons.logging.LogFactory;
   54   
   55   /**
   56    * A TCP based implementation of {@link TransportServer}
   57    * 
   58    * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
   59    * @version $Revision: 1.1 $
   60    */
   61   
   62   public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
   63   
   64       private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
   65       protected ServerSocket serverSocket;
   66       protected int backlog = 5000;
   67       protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
   68       protected final TcpTransportFactory transportFactory;
   69       protected long maxInactivityDuration = 30000;
   70       protected long maxInactivityDurationInitalDelay = 10000;
   71       protected int minmumWireFormatVersion;
   72       protected boolean useQueueForAccept=true;
   73          
   74       /**
   75        * trace=true -> the Transport stack where this TcpTransport
   76        * object will be, will have a TransportLogger layer
   77        * trace=false -> the Transport stack where this TcpTransport
   78        * object will be, will NOT have a TransportLogger layer, and therefore
   79        * will never be able to print logging messages.
   80        * This parameter is most probably set in Connection or TransportConnector URIs.
   81        */
   82       protected boolean trace = false;
   83   
   84       protected int soTimeout = 0;
   85       protected int socketBufferSize = 64 * 1024;
   86       protected int connectionTimeout =  30000;
   87   
   88       /**
   89        * Name of the LogWriter implementation to use.
   90        * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
   91        * This parameter is most probably set in Connection or TransportConnector URIs.
   92        */
   93       protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
   94       /**
   95        * Specifies if the TransportLogger will be manageable by JMX or not.
   96        * Also, as long as there is at least 1 TransportLogger which is manageable,
   97        * a TransportLoggerControl MBean will me created.
   98        */
   99       protected boolean dynamicManagement = false;
  100       /**
  101        * startLogging=true -> the TransportLogger object of the Transport stack
  102        * will initially write messages to the log.
  103        * startLogging=false -> the TransportLogger object of the Transport stack
  104        * will initially NOT write messages to the log.
  105        * This parameter only has an effect if trace == true.
  106        * This parameter is most probably set in Connection or TransportConnector URIs.
  107        */
  108       protected boolean startLogging = true;
  109       protected Map<String, Object> transportOptions;
  110       protected final ServerSocketFactory serverSocketFactory;
  111       protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
  112       protected Thread socketHandlerThread;
  113       /**
  114        * The maximum number of sockets allowed for this server
  115        */
  116       protected int maximumConnections = Integer.MAX_VALUE;
  117       protected int currentTransportCount=0;
  118     
  119       public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
  120           super(location);
  121           this.transportFactory = transportFactory;
  122           this.serverSocketFactory = serverSocketFactory;
  123           
  124       }
  125   
  126       public void bind() throws IOException {
  127           URI bind = getBindLocation();
  128   
  129           String host = bind.getHost();
  130           host = (host == null || host.length() == 0) ? "localhost" : host;
  131           InetAddress addr = InetAddress.getByName(host);
  132   
  133           try {
  134   
  135               this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
  136               configureServerSocket(this.serverSocket);
  137               
  138           } catch (IOException e) {
  139               throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
  140           }
  141           try {
  142               setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
  143                   .getFragment()));
  144           } catch (URISyntaxException e) {
  145   
  146               // it could be that the host name contains invalid characters such
  147               // as _ on unix platforms
  148               // so lets try use the IP address instead
  149               try {
  150                   setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
  151               } catch (URISyntaxException e2) {
  152                   throw IOExceptionSupport.create(e2);
  153               }
  154           }
  155       }
  156   
  157       private void configureServerSocket(ServerSocket socket) throws SocketException {
  158           socket.setSoTimeout(2000);
  159           if (transportOptions != null) {
  160               IntrospectionSupport.setProperties(socket, transportOptions);
  161           }
  162       }
  163   
  164       /**
  165        * @return Returns the wireFormatFactory.
  166        */
  167       public WireFormatFactory getWireFormatFactory() {
  168           return wireFormatFactory;
  169       }
  170   
  171       /**
  172        * @param wireFormatFactory The wireFormatFactory to set.
  173        */
  174       public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
  175           this.wireFormatFactory = wireFormatFactory;
  176       }
  177   
  178       /**
  179        * Associates a broker info with the transport server so that the transport
  180        * can do discovery advertisements of the broker.
  181        * 
  182        * @param brokerInfo
  183        */
  184       public void setBrokerInfo(BrokerInfo brokerInfo) {
  185       }
  186   
  187       public long getMaxInactivityDuration() {
  188           return maxInactivityDuration;
  189       }
  190   
  191       public void setMaxInactivityDuration(long maxInactivityDuration) {
  192           this.maxInactivityDuration = maxInactivityDuration;
  193       }
  194       
  195       public long getMaxInactivityDurationInitalDelay() {
  196           return this.maxInactivityDurationInitalDelay;
  197       }
  198   
  199       public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
  200           this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
  201       }
  202   
  203       public int getMinmumWireFormatVersion() {
  204           return minmumWireFormatVersion;
  205       }
  206   
  207       public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
  208           this.minmumWireFormatVersion = minmumWireFormatVersion;
  209       }
  210   
  211       public boolean isTrace() {
  212           return trace;
  213       }
  214   
  215       public void setTrace(boolean trace) {
  216           this.trace = trace;
  217       }
  218       
  219       public String getLogWriterName() {
  220           return logWriterName;
  221       }
  222   
  223       public void setLogWriterName(String logFormat) {
  224           this.logWriterName = logFormat;
  225       }        
  226   
  227       public boolean isDynamicManagement() {
  228           return dynamicManagement;
  229       }
  230   
  231       public void setDynamicManagement(boolean useJmx) {
  232           this.dynamicManagement = useJmx;
  233       }
  234   
  235       public boolean isStartLogging() {
  236           return startLogging;
  237       }
  238   
  239   
  240       public void setStartLogging(boolean startLogging) {
  241           this.startLogging = startLogging;
  242       }
  243       
  244       /**
  245        * @return the backlog
  246        */
  247       public int getBacklog() {
  248           return backlog;
  249       }
  250   
  251       /**
  252        * @param backlog the backlog to set
  253        */
  254       public void setBacklog(int backlog) {
  255           this.backlog = backlog;
  256       }
  257   
  258       /**
  259        * @return the useQueueForAccept
  260        */
  261       public boolean isUseQueueForAccept() {
  262           return useQueueForAccept;
  263       }
  264   
  265       /**
  266        * @param useQueueForAccept the useQueueForAccept to set
  267        */
  268       public void setUseQueueForAccept(boolean useQueueForAccept) {
  269           this.useQueueForAccept = useQueueForAccept;
  270       }
  271       
  272   
  273       /**
  274        * pull Sockets from the ServerSocket
  275        */
  276       public void run() {
  277           while (!isStopped()) {
  278               Socket socket = null;
  279               try {
  280                   socket = serverSocket.accept();
  281                   if (socket != null) {
  282                       if (isStopped() || getAcceptListener() == null) {
  283                           socket.close();
  284                       } else {
  285                           if (useQueueForAccept) {
  286                               socketQueue.put(socket);
  287                           }else {
  288                               handleSocket(socket);
  289                           }
  290                       }
  291                   }
  292               } catch (SocketTimeoutException ste) {
  293                   // expect this to happen
  294               } catch (Exception e) {
  295                   if (!isStopping()) {
  296                       onAcceptError(e);
  297                   } else if (!isStopped()) {
  298                       LOG.warn("run()", e);
  299                       onAcceptError(e);
  300                   }
  301               }
  302           }
  303       }
  304   
  305       /**
  306        * Allow derived classes to override the Transport implementation that this
  307        * transport server creates.
  308        * 
  309        * @param socket
  310        * @param format
  311        * @return
  312        * @throws IOException
  313        */
  314       protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
  315           return new TcpTransport(format, socket);
  316       }
  317   
  318       /**
  319        * @return pretty print of this
  320        */
  321       public String toString() {
  322           return "" + getBindLocation();
  323       }
  324   
  325       /**
  326        * @param socket 
  327        * @param inetAddress
  328        * @return real hostName
  329        * @throws UnknownHostException
  330        */
  331       protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
  332           String result = null;
  333           if (socket.isBound()) {
  334               if (socket.getInetAddress().isAnyLocalAddress()) {
  335                   // make it more human readable and useful, an alternative to 0.0.0.0
  336                   result = InetAddress.getLocalHost().getHostName();
  337               } else {
  338                   result = socket.getInetAddress().getCanonicalHostName();
  339               }
  340           } else {
  341               result = bindAddress.getCanonicalHostName();
  342           }
  343           return result;
  344       }
  345       
  346       protected void doStart() throws Exception {
  347           if(useQueueForAccept) {
  348               Runnable run = new Runnable() {
  349                   public void run() {
  350                       try {
  351                           while (!isStopped() && !isStopping()) {
  352                               Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
  353                               if (sock != null) {
  354                                   handleSocket(sock);
  355                               }
  356                           }
  357       
  358                       } catch (InterruptedException e) {
  359                           LOG.info("socketQueue interuppted - stopping");
  360                           if (!isStopping()) {
  361                               onAcceptError(e);
  362                           }
  363                       }
  364       
  365                   }
  366       
  367               };
  368               socketHandlerThread = new Thread(null, run,
  369                       "ActiveMQ Transport Server Thread Handler: " + toString(),
  370                       getStackSize());
  371               socketHandlerThread.setDaemon(true);
  372               socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
  373               socketHandlerThread.start();
  374           }
  375           super.doStart();
  376           
  377       }
  378   
  379       protected void doStop(ServiceStopper stopper) throws Exception {
  380           super.doStop(stopper);
  381           if (serverSocket != null) {
  382               serverSocket.close();
  383           }
  384       }
  385   
  386       public InetSocketAddress getSocketAddress() {
  387           return (InetSocketAddress)serverSocket.getLocalSocketAddress();
  388       }
  389   
  390       public void setTransportOption(Map<String, Object> transportOptions) {
  391           this.transportOptions = transportOptions;
  392       }
  393       
  394       protected final void handleSocket(Socket socket) {
  395           try {
  396               if (this.currentTransportCount >= this.maximumConnections) {
  397                   throw new ExceededMaximumConnectionsException("Exceeded the maximum " + 
  398                       "number of allowed client connections. See the 'maximumConnections' " + 
  399                       "property on the TCP transport configuration URI in the ActiveMQ " + 
  400                       "configuration file (e.g., activemq.xml)"); 
  401                   
  402               } else {
  403                   HashMap<String, Object> options = new HashMap<String, Object>();
  404                   options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
  405                   options.put("maxInactivityDurationInitalDelay", 
  406                       Long.valueOf(maxInactivityDurationInitalDelay));
  407                   options.put("minmumWireFormatVersion", 
  408                       Integer.valueOf(minmumWireFormatVersion));
  409                   options.put("trace", Boolean.valueOf(trace));
  410                   options.put("soTimeout", Integer.valueOf(soTimeout));
  411                   options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
  412                   options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
  413                   options.put("logWriterName", logWriterName);
  414                   options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
  415                   options.put("startLogging", Boolean.valueOf(startLogging));
  416                   options.putAll(transportOptions);
  417   
  418                   WireFormat format = wireFormatFactory.createWireFormat();
  419                   Transport transport = createTransport(socket, format);
  420   
  421                   if (transport instanceof ServiceSupport) {
  422                       ((ServiceSupport) transport).addServiceListener(this);
  423                   }
  424   
  425                   Transport configuredTransport = 
  426                       transportFactory.serverConfigure( transport, format, options);
  427   
  428                   getAcceptListener().onAccept(configuredTransport);
  429               }
  430           } catch (SocketTimeoutException ste) {
  431               // expect this to happen
  432           } catch (Exception e) {
  433               if (!isStopping()) {
  434                   onAcceptError(e);
  435               } else if (!isStopped()) {
  436                   LOG.warn("run()", e);
  437                   onAcceptError(e);
  438               }
  439           }
  440           
  441       }    
  442   
  443   	public int getSoTimeout() {
  444   		return soTimeout;
  445   	}
  446   
  447   	public void setSoTimeout(int soTimeout) {
  448   		this.soTimeout = soTimeout;
  449   	}
  450   
  451   	public int getSocketBufferSize() {
  452   		return socketBufferSize;
  453   	}
  454   
  455   	public void setSocketBufferSize(int socketBufferSize) {
  456   		this.socketBufferSize = socketBufferSize;
  457   	}
  458   
  459   	public int getConnectionTimeout() {
  460   		return connectionTimeout;
  461   	}
  462   
  463   	public void setConnectionTimeout(int connectionTimeout) {
  464   		this.connectionTimeout = connectionTimeout;
  465   	}
  466   
  467       /**
  468        * @return the maximumConnections
  469        */
  470       public int getMaximumConnections() {
  471           return maximumConnections;
  472       }
  473   
  474       /**
  475        * @param maximumConnections the maximumConnections to set
  476        */
  477       public void setMaximumConnections(int maximumConnections) {
  478           this.maximumConnections = maximumConnections;
  479       }
  480   
  481       
  482       public void started(Service service) {
  483          this.currentTransportCount++;
  484       }
  485   
  486       public void stopped(Service service) {
  487           this.currentTransportCount--;
  488       }
  489   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]