1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.transport.tcp; 18 19 import java.io.IOException; 20 import java.net.InetAddress; 21 import java.net.InetSocketAddress; 22 import java.net.ServerSocket; 23 import java.net.Socket; 24 import java.net.SocketException; 25 import java.net.SocketTimeoutException; 26 import java.net.URI; 27 import java.net.URISyntaxException; 28 import java.net.UnknownHostException; 29 import java.util.HashMap; 30 import java.util.Map; 31 import java.util.concurrent.BlockingQueue; 32 import java.util.concurrent.LinkedBlockingQueue; 33 import java.util.concurrent.TimeUnit; 34 35 import javax.net.ServerSocketFactory; 36 37 import org.apache.activemq.Service; 38 import org.apache.activemq.ThreadPriorities; 39 import org.apache.activemq.command.BrokerInfo; 40 import org.apache.activemq.openwire.OpenWireFormatFactory; 41 import org.apache.activemq.transport.Transport; 42 import org.apache.activemq.transport.TransportLoggerFactory; 43 import org.apache.activemq.transport.TransportServer; 44 import org.apache.activemq.transport.TransportServerThreadSupport; 45 import org.apache.activemq.util.IOExceptionSupport; 46 import org.apache.activemq.util.IntrospectionSupport; 47 import org.apache.activemq.util.ServiceListener; 48 import org.apache.activemq.util.ServiceStopper; 49 import org.apache.activemq.util.ServiceSupport; 50 import org.apache.activemq.wireformat.WireFormat; 51 import org.apache.activemq.wireformat.WireFormatFactory; 52 import org.apache.commons.logging.Log; 53 import org.apache.commons.logging.LogFactory; 54 55 /** 56 * A TCP based implementation of {@link TransportServer} 57 * 58 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 59 * @version $Revision: 1.1 $ 60 */ 61 62 public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{ 63 64 private static final Log LOG = LogFactory.getLog(TcpTransportServer.class); 65 protected ServerSocket serverSocket; 66 protected int backlog = 5000; 67 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 68 protected final TcpTransportFactory transportFactory; 69 protected long maxInactivityDuration = 30000; 70 protected long maxInactivityDurationInitalDelay = 10000; 71 protected int minmumWireFormatVersion; 72 protected boolean useQueueForAccept=true; 73 74 /** 75 * trace=true -> the Transport stack where this TcpTransport 76 * object will be, will have a TransportLogger layer 77 * trace=false -> the Transport stack where this TcpTransport 78 * object will be, will NOT have a TransportLogger layer, and therefore 79 * will never be able to print logging messages. 80 * This parameter is most probably set in Connection or TransportConnector URIs. 81 */ 82 protected boolean trace = false; 83 84 protected int soTimeout = 0; 85 protected int socketBufferSize = 64 * 1024; 86 protected int connectionTimeout = 30000; 87 88 /** 89 * Name of the LogWriter implementation to use. 90 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. 91 * This parameter is most probably set in Connection or TransportConnector URIs. 92 */ 93 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName; 94 /** 95 * Specifies if the TransportLogger will be manageable by JMX or not. 96 * Also, as long as there is at least 1 TransportLogger which is manageable, 97 * a TransportLoggerControl MBean will me created. 98 */ 99 protected boolean dynamicManagement = false; 100 /** 101 * startLogging=true -> the TransportLogger object of the Transport stack 102 * will initially write messages to the log. 103 * startLogging=false -> the TransportLogger object of the Transport stack 104 * will initially NOT write messages to the log. 105 * This parameter only has an effect if trace == true. 106 * This parameter is most probably set in Connection or TransportConnector URIs. 107 */ 108 protected boolean startLogging = true; 109 protected Map<String, Object> transportOptions; 110 protected final ServerSocketFactory serverSocketFactory; 111 protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 112 protected Thread socketHandlerThread; 113 /** 114 * The maximum number of sockets allowed for this server 115 */ 116 protected int maximumConnections = Integer.MAX_VALUE; 117 protected int currentTransportCount=0; 118 119 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { 120 super(location); 121 this.transportFactory = transportFactory; 122 this.serverSocketFactory = serverSocketFactory; 123 124 } 125 126 public void bind() throws IOException { 127 URI bind = getBindLocation(); 128 129 String host = bind.getHost(); 130 host = (host == null || host.length() == 0) ? "localhost" : host; 131 InetAddress addr = InetAddress.getByName(host); 132 133 try { 134 135 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 136 configureServerSocket(this.serverSocket); 137 138 } catch (IOException e) { 139 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 140 } 141 try { 142 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind 143 .getFragment())); 144 } catch (URISyntaxException e) { 145 146 // it could be that the host name contains invalid characters such 147 // as _ on unix platforms 148 // so lets try use the IP address instead 149 try { 150 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment())); 151 } catch (URISyntaxException e2) { 152 throw IOExceptionSupport.create(e2); 153 } 154 } 155 } 156 157 private void configureServerSocket(ServerSocket socket) throws SocketException { 158 socket.setSoTimeout(2000); 159 if (transportOptions != null) { 160 IntrospectionSupport.setProperties(socket, transportOptions); 161 } 162 } 163 164 /** 165 * @return Returns the wireFormatFactory. 166 */ 167 public WireFormatFactory getWireFormatFactory() { 168 return wireFormatFactory; 169 } 170 171 /** 172 * @param wireFormatFactory The wireFormatFactory to set. 173 */ 174 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 175 this.wireFormatFactory = wireFormatFactory; 176 } 177 178 /** 179 * Associates a broker info with the transport server so that the transport 180 * can do discovery advertisements of the broker. 181 * 182 * @param brokerInfo 183 */ 184 public void setBrokerInfo(BrokerInfo brokerInfo) { 185 } 186 187 public long getMaxInactivityDuration() { 188 return maxInactivityDuration; 189 } 190 191 public void setMaxInactivityDuration(long maxInactivityDuration) { 192 this.maxInactivityDuration = maxInactivityDuration; 193 } 194 195 public long getMaxInactivityDurationInitalDelay() { 196 return this.maxInactivityDurationInitalDelay; 197 } 198 199 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 200 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 201 } 202 203 public int getMinmumWireFormatVersion() { 204 return minmumWireFormatVersion; 205 } 206 207 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 208 this.minmumWireFormatVersion = minmumWireFormatVersion; 209 } 210 211 public boolean isTrace() { 212 return trace; 213 } 214 215 public void setTrace(boolean trace) { 216 this.trace = trace; 217 } 218 219 public String getLogWriterName() { 220 return logWriterName; 221 } 222 223 public void setLogWriterName(String logFormat) { 224 this.logWriterName = logFormat; 225 } 226 227 public boolean isDynamicManagement() { 228 return dynamicManagement; 229 } 230 231 public void setDynamicManagement(boolean useJmx) { 232 this.dynamicManagement = useJmx; 233 } 234 235 public boolean isStartLogging() { 236 return startLogging; 237 } 238 239 240 public void setStartLogging(boolean startLogging) { 241 this.startLogging = startLogging; 242 } 243 244 /** 245 * @return the backlog 246 */ 247 public int getBacklog() { 248 return backlog; 249 } 250 251 /** 252 * @param backlog the backlog to set 253 */ 254 public void setBacklog(int backlog) { 255 this.backlog = backlog; 256 } 257 258 /** 259 * @return the useQueueForAccept 260 */ 261 public boolean isUseQueueForAccept() { 262 return useQueueForAccept; 263 } 264 265 /** 266 * @param useQueueForAccept the useQueueForAccept to set 267 */ 268 public void setUseQueueForAccept(boolean useQueueForAccept) { 269 this.useQueueForAccept = useQueueForAccept; 270 } 271 272 273 /** 274 * pull Sockets from the ServerSocket 275 */ 276 public void run() { 277 while (!isStopped()) { 278 Socket socket = null; 279 try { 280 socket = serverSocket.accept(); 281 if (socket != null) { 282 if (isStopped() || getAcceptListener() == null) { 283 socket.close(); 284 } else { 285 if (useQueueForAccept) { 286 socketQueue.put(socket); 287 }else { 288 handleSocket(socket); 289 } 290 } 291 } 292 } catch (SocketTimeoutException ste) { 293 // expect this to happen 294 } catch (Exception e) { 295 if (!isStopping()) { 296 onAcceptError(e); 297 } else if (!isStopped()) { 298 LOG.warn("run()", e); 299 onAcceptError(e); 300 } 301 } 302 } 303 } 304 305 /** 306 * Allow derived classes to override the Transport implementation that this 307 * transport server creates. 308 * 309 * @param socket 310 * @param format 311 * @return 312 * @throws IOException 313 */ 314 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 315 return new TcpTransport(format, socket); 316 } 317 318 /** 319 * @return pretty print of this 320 */ 321 public String toString() { 322 return "" + getBindLocation(); 323 } 324 325 /** 326 * @param socket 327 * @param inetAddress 328 * @return real hostName 329 * @throws UnknownHostException 330 */ 331 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 332 String result = null; 333 if (socket.isBound()) { 334 if (socket.getInetAddress().isAnyLocalAddress()) { 335 // make it more human readable and useful, an alternative to 0.0.0.0 336 result = InetAddress.getLocalHost().getHostName(); 337 } else { 338 result = socket.getInetAddress().getCanonicalHostName(); 339 } 340 } else { 341 result = bindAddress.getCanonicalHostName(); 342 } 343 return result; 344 } 345 346 protected void doStart() throws Exception { 347 if(useQueueForAccept) { 348 Runnable run = new Runnable() { 349 public void run() { 350 try { 351 while (!isStopped() && !isStopping()) { 352 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 353 if (sock != null) { 354 handleSocket(sock); 355 } 356 } 357 358 } catch (InterruptedException e) { 359 LOG.info("socketQueue interuppted - stopping"); 360 if (!isStopping()) { 361 onAcceptError(e); 362 } 363 } 364 365 } 366 367 }; 368 socketHandlerThread = new Thread(null, run, 369 "ActiveMQ Transport Server Thread Handler: " + toString(), 370 getStackSize()); 371 socketHandlerThread.setDaemon(true); 372 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1); 373 socketHandlerThread.start(); 374 } 375 super.doStart(); 376 377 } 378 379 protected void doStop(ServiceStopper stopper) throws Exception { 380 super.doStop(stopper); 381 if (serverSocket != null) { 382 serverSocket.close(); 383 } 384 } 385 386 public InetSocketAddress getSocketAddress() { 387 return (InetSocketAddress)serverSocket.getLocalSocketAddress(); 388 } 389 390 public void setTransportOption(Map<String, Object> transportOptions) { 391 this.transportOptions = transportOptions; 392 } 393 394 protected final void handleSocket(Socket socket) { 395 try { 396 if (this.currentTransportCount >= this.maximumConnections) { 397 throw new ExceededMaximumConnectionsException("Exceeded the maximum " + 398 "number of allowed client connections. See the 'maximumConnections' " + 399 "property on the TCP transport configuration URI in the ActiveMQ " + 400 "configuration file (e.g., activemq.xml)"); 401 402 } else { 403 HashMap<String, Object> options = new HashMap<String, Object>(); 404 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 405 options.put("maxInactivityDurationInitalDelay", 406 Long.valueOf(maxInactivityDurationInitalDelay)); 407 options.put("minmumWireFormatVersion", 408 Integer.valueOf(minmumWireFormatVersion)); 409 options.put("trace", Boolean.valueOf(trace)); 410 options.put("soTimeout", Integer.valueOf(soTimeout)); 411 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 412 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 413 options.put("logWriterName", logWriterName); 414 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 415 options.put("startLogging", Boolean.valueOf(startLogging)); 416 options.putAll(transportOptions); 417 418 WireFormat format = wireFormatFactory.createWireFormat(); 419 Transport transport = createTransport(socket, format); 420 421 if (transport instanceof ServiceSupport) { 422 ((ServiceSupport) transport).addServiceListener(this); 423 } 424 425 Transport configuredTransport = 426 transportFactory.serverConfigure( transport, format, options); 427 428 getAcceptListener().onAccept(configuredTransport); 429 } 430 } catch (SocketTimeoutException ste) { 431 // expect this to happen 432 } catch (Exception e) { 433 if (!isStopping()) { 434 onAcceptError(e); 435 } else if (!isStopped()) { 436 LOG.warn("run()", e); 437 onAcceptError(e); 438 } 439 } 440 441 } 442 443 public int getSoTimeout() { 444 return soTimeout; 445 } 446 447 public void setSoTimeout(int soTimeout) { 448 this.soTimeout = soTimeout; 449 } 450 451 public int getSocketBufferSize() { 452 return socketBufferSize; 453 } 454 455 public void setSocketBufferSize(int socketBufferSize) { 456 this.socketBufferSize = socketBufferSize; 457 } 458 459 public int getConnectionTimeout() { 460 return connectionTimeout; 461 } 462 463 public void setConnectionTimeout(int connectionTimeout) { 464 this.connectionTimeout = connectionTimeout; 465 } 466 467 /** 468 * @return the maximumConnections 469 */ 470 public int getMaximumConnections() { 471 return maximumConnections; 472 } 473 474 /** 475 * @param maximumConnections the maximumConnections to set 476 */ 477 public void setMaximumConnections(int maximumConnections) { 478 this.maximumConnections = maximumConnections; 479 } 480 481 482 public void started(Service service) { 483 this.currentTransportCount++; 484 } 485 486 public void stopped(Service service) { 487 this.currentTransportCount--; 488 } 489 }