1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.transport.tcp; 18 19 import java.io.DataInputStream; 20 import java.io.DataOutputStream; 21 import java.io.IOException; 22 import java.io.InterruptedIOException; 23 import java.net.InetAddress; 24 import java.net.InetSocketAddress; 25 import java.net.Socket; 26 import java.net.SocketException; 27 import java.net.SocketTimeoutException; 28 import java.net.URI; 29 import java.net.UnknownHostException; 30 import java.util.HashMap; 31 import java.util.Map; 32 import java.util.concurrent.CountDownLatch; 33 import java.util.concurrent.SynchronousQueue; 34 import java.util.concurrent.ThreadFactory; 35 import java.util.concurrent.ThreadPoolExecutor; 36 import java.util.concurrent.TimeUnit; 37 import java.util.concurrent.atomic.AtomicReference; 38 39 import javax.net.SocketFactory; 40 41 import org.apache.activemq.Service; 42 import org.apache.activemq.transport.Transport; 43 import org.apache.activemq.transport.TransportLoggerFactory; 44 import org.apache.activemq.transport.TransportThreadSupport; 45 import org.apache.activemq.util.IntrospectionSupport; 46 import org.apache.activemq.util.ServiceStopper; 47 import org.apache.activemq.wireformat.WireFormat; 48 import org.apache.commons.logging.Log; 49 import org.apache.commons.logging.LogFactory; 50 51 /** 52 * An implementation of the {@link Transport} interface using raw tcp/ip 53 * 54 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 55 * @version $Revision$ 56 */ 57 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 58 private static final Log LOG = LogFactory.getLog(TcpTransport.class); 59 private static final ThreadPoolExecutor SOCKET_CLOSE; 60 protected final URI remoteLocation; 61 protected final URI localLocation; 62 protected final WireFormat wireFormat; 63 64 protected int connectionTimeout = 30000; 65 protected int soTimeout; 66 protected int socketBufferSize = 64 * 1024; 67 protected int ioBufferSize = 8 * 1024; 68 protected boolean closeAsync=true; 69 protected Socket socket; 70 protected DataOutputStream dataOut; 71 protected DataInputStream dataIn; 72 protected TcpBufferedOutputStream buffOut = null; 73 /** 74 * trace=true -> the Transport stack where this TcpTransport 75 * object will be, will have a TransportLogger layer 76 * trace=false -> the Transport stack where this TcpTransport 77 * object will be, will NOT have a TransportLogger layer, and therefore 78 * will never be able to print logging messages. 79 * This parameter is most probably set in Connection or TransportConnector URIs. 80 */ 81 protected boolean trace = false; 82 /** 83 * Name of the LogWriter implementation to use. 84 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. 85 * This parameter is most probably set in Connection or TransportConnector URIs. 86 */ 87 protected String logWriterName = TransportLoggerFactory.defaultLogWriterName; 88 /** 89 * Specifies if the TransportLogger will be manageable by JMX or not. 90 * Also, as long as there is at least 1 TransportLogger which is manageable, 91 * a TransportLoggerControl MBean will me created. 92 */ 93 protected boolean dynamicManagement = false; 94 /** 95 * startLogging=true -> the TransportLogger object of the Transport stack 96 * will initially write messages to the log. 97 * startLogging=false -> the TransportLogger object of the Transport stack 98 * will initially NOT write messages to the log. 99 * This parameter only has an effect if trace == true. 100 * This parameter is most probably set in Connection or TransportConnector URIs. 101 */ 102 protected boolean startLogging = true; 103 /** 104 * Specifies the port that will be used by the JMX server to manage 105 * the TransportLoggers. 106 * This should only be set in an URI by a client (producer or consumer) since 107 * a broker will already create a JMX server. 108 * It is useful for people who test a broker and clients in the same machine 109 * and want to control both via JMX; a different port will be needed. 110 */ 111 protected int jmxPort = 1099; 112 protected boolean useLocalHost = true; 113 protected int minmumWireFormatVersion; 114 protected SocketFactory socketFactory; 115 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>(); 116 117 private Map<String, Object> socketOptions; 118 private Boolean keepAlive; 119 private Boolean tcpNoDelay; 120 private Thread runnerThread; 121 private volatile int receiveCounter; 122 123 /** 124 * Connect to a remote Node - e.g. a Broker 125 * 126 * @param wireFormat 127 * @param socketFactory 128 * @param remoteLocation 129 * @param localLocation - e.g. local InetAddress and local port 130 * @throws IOException 131 * @throws UnknownHostException 132 */ 133 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, 134 URI localLocation) throws UnknownHostException, IOException { 135 this.wireFormat = wireFormat; 136 this.socketFactory = socketFactory; 137 try { 138 this.socket = socketFactory.createSocket(); 139 } catch (SocketException e) { 140 this.socket = null; 141 } 142 this.remoteLocation = remoteLocation; 143 this.localLocation = localLocation; 144 setDaemon(false); 145 } 146 147 /** 148 * Initialize from a server Socket 149 * 150 * @param wireFormat 151 * @param socket 152 * @throws IOException 153 */ 154 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { 155 this.wireFormat = wireFormat; 156 this.socket = socket; 157 this.remoteLocation = null; 158 this.localLocation = null; 159 setDaemon(true); 160 } 161 162 /** 163 * A one way asynchronous send 164 */ 165 public void oneway(Object command) throws IOException { 166 checkStarted(); 167 wireFormat.marshal(command, dataOut); 168 dataOut.flush(); 169 } 170 171 /** 172 * @return pretty print of 'this' 173 */ 174 public String toString() { 175 return "tcp://" + socket.getInetAddress() + ":" + socket.getPort(); 176 } 177 178 /** 179 * reads packets from a Socket 180 */ 181 public void run() { 182 LOG.trace("TCP consumer thread for " + this + " starting"); 183 this.runnerThread=Thread.currentThread(); 184 try { 185 while (!isStopped()) { 186 doRun(); 187 } 188 } catch (IOException e) { 189 stoppedLatch.get().countDown(); 190 onException(e); 191 } catch (Throwable e){ 192 stoppedLatch.get().countDown(); 193 IOException ioe=new IOException("Unexpected error occured"); 194 ioe.initCause(e); 195 onException(ioe); 196 }finally { 197 stoppedLatch.get().countDown(); 198 } 199 } 200 201 protected void doRun() throws IOException { 202 try { 203 Object command = readCommand(); 204 doConsume(command); 205 } catch (SocketTimeoutException e) { 206 } catch (InterruptedIOException e) { 207 } 208 } 209 210 protected Object readCommand() throws IOException { 211 return wireFormat.unmarshal(dataIn); 212 } 213 214 // Properties 215 // ------------------------------------------------------------------------- 216 217 public boolean isTrace() { 218 return trace; 219 } 220 221 public void setTrace(boolean trace) { 222 this.trace = trace; 223 } 224 225 public String getLogWriterName() { 226 return logWriterName; 227 } 228 229 public void setLogWriterName(String logFormat) { 230 this.logWriterName = logFormat; 231 } 232 233 public boolean isDynamicManagement() { 234 return dynamicManagement; 235 } 236 237 public void setDynamicManagement(boolean useJmx) { 238 this.dynamicManagement = useJmx; 239 } 240 241 public boolean isStartLogging() { 242 return startLogging; 243 } 244 245 public void setStartLogging(boolean startLogging) { 246 this.startLogging = startLogging; 247 } 248 249 public int getJmxPort() { 250 return jmxPort; 251 } 252 253 public void setJmxPort(int jmxPort) { 254 this.jmxPort = jmxPort; 255 } 256 257 public int getMinmumWireFormatVersion() { 258 return minmumWireFormatVersion; 259 } 260 261 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 262 this.minmumWireFormatVersion = minmumWireFormatVersion; 263 } 264 265 public boolean isUseLocalHost() { 266 return useLocalHost; 267 } 268 269 /** 270 * Sets whether 'localhost' or the actual local host name should be used to 271 * make local connections. On some operating systems such as Macs its not 272 * possible to connect as the local host name so localhost is better. 273 */ 274 public void setUseLocalHost(boolean useLocalHost) { 275 this.useLocalHost = useLocalHost; 276 } 277 278 public int getSocketBufferSize() { 279 return socketBufferSize; 280 } 281 282 /** 283 * Sets the buffer size to use on the socket 284 */ 285 public void setSocketBufferSize(int socketBufferSize) { 286 this.socketBufferSize = socketBufferSize; 287 } 288 289 public int getSoTimeout() { 290 return soTimeout; 291 } 292 293 /** 294 * Sets the socket timeout 295 */ 296 public void setSoTimeout(int soTimeout) { 297 this.soTimeout = soTimeout; 298 } 299 300 public int getConnectionTimeout() { 301 return connectionTimeout; 302 } 303 304 /** 305 * Sets the timeout used to connect to the socket 306 */ 307 public void setConnectionTimeout(int connectionTimeout) { 308 this.connectionTimeout = connectionTimeout; 309 } 310 311 public Boolean getKeepAlive() { 312 return keepAlive; 313 } 314 315 /** 316 * Enable/disable TCP KEEP_ALIVE mode 317 */ 318 public void setKeepAlive(Boolean keepAlive) { 319 this.keepAlive = keepAlive; 320 } 321 322 public Boolean getTcpNoDelay() { 323 return tcpNoDelay; 324 } 325 326 /** 327 * Enable/disable the TCP_NODELAY option on the socket 328 */ 329 public void setTcpNoDelay(Boolean tcpNoDelay) { 330 this.tcpNoDelay = tcpNoDelay; 331 } 332 333 /** 334 * @return the ioBufferSize 335 */ 336 public int getIoBufferSize() { 337 return this.ioBufferSize; 338 } 339 340 /** 341 * @param ioBufferSize the ioBufferSize to set 342 */ 343 public void setIoBufferSize(int ioBufferSize) { 344 this.ioBufferSize = ioBufferSize; 345 } 346 347 /** 348 * @return the closeAsync 349 */ 350 public boolean isCloseAsync() { 351 return closeAsync; 352 } 353 354 /** 355 * @param closeAsync the closeAsync to set 356 */ 357 public void setCloseAsync(boolean closeAsync) { 358 this.closeAsync = closeAsync; 359 } 360 361 // Implementation methods 362 // ------------------------------------------------------------------------- 363 protected String resolveHostName(String host) throws UnknownHostException { 364 String localName = InetAddress.getLocalHost().getHostName(); 365 if (localName != null && isUseLocalHost()) { 366 if (localName.equals(host)) { 367 return "localhost"; 368 } 369 } 370 return host; 371 } 372 373 /** 374 * Configures the socket for use 375 * 376 * @param sock 377 * @throws SocketException 378 */ 379 protected void initialiseSocket(Socket sock) throws SocketException { 380 if (socketOptions != null) { 381 IntrospectionSupport.setProperties(socket, socketOptions); 382 } 383 384 try { 385 sock.setReceiveBufferSize(socketBufferSize); 386 sock.setSendBufferSize(socketBufferSize); 387 } catch (SocketException se) { 388 LOG.warn("Cannot set socket buffer size = " + socketBufferSize); 389 LOG.debug("Cannot set socket buffer size. Reason: " + se, se); 390 } 391 sock.setSoTimeout(soTimeout); 392 393 if (keepAlive != null) { 394 sock.setKeepAlive(keepAlive.booleanValue()); 395 } 396 if (tcpNoDelay != null) { 397 sock.setTcpNoDelay(tcpNoDelay.booleanValue()); 398 } 399 } 400 401 protected void doStart() throws Exception { 402 connect(); 403 stoppedLatch.set(new CountDownLatch(1)); 404 super.doStart(); 405 } 406 407 protected void connect() throws Exception { 408 409 if (socket == null && socketFactory == null) { 410 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); 411 } 412 413 InetSocketAddress localAddress = null; 414 InetSocketAddress remoteAddress = null; 415 416 if (localLocation != null) { 417 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), 418 localLocation.getPort()); 419 } 420 421 if (remoteLocation != null) { 422 String host = resolveHostName(remoteLocation.getHost()); 423 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); 424 } 425 426 if (socket != null) { 427 428 if (localAddress != null) { 429 socket.bind(localAddress); 430 } 431 432 // If it's a server accepted socket.. we don't need to connect it 433 // to a remote address. 434 if (remoteAddress != null) { 435 if (connectionTimeout >= 0) { 436 socket.connect(remoteAddress, connectionTimeout); 437 } else { 438 socket.connect(remoteAddress); 439 } 440 } 441 442 } else { 443 // For SSL sockets.. you can't create an unconnected socket :( 444 // This means the timout option are not supported either. 445 if (localAddress != null) { 446 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), 447 localAddress.getAddress(), localAddress.getPort()); 448 } else { 449 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); 450 } 451 } 452 453 initialiseSocket(socket); 454 initializeStreams(); 455 } 456 457 protected void doStop(ServiceStopper stopper) throws Exception { 458 if (LOG.isDebugEnabled()) { 459 LOG.debug("Stopping transport " + this); 460 } 461 462 // Closing the streams flush the sockets before closing.. if the socket 463 // is hung.. then this hangs the close. 464 // closeStreams(); 465 if (socket != null) { 466 if (closeAsync) { 467 //closing the socket can hang also 468 final CountDownLatch latch = new CountDownLatch(1); 469 470 SOCKET_CLOSE.execute(new Runnable() { 471 472 public void run() { 473 try { 474 socket.close(); 475 } catch (IOException e) { 476 LOG.debug("Caught exception closing socket",e); 477 }finally { 478 latch.countDown(); 479 } 480 } 481 482 }); 483 latch.await(1,TimeUnit.SECONDS); 484 }else { 485 try { 486 socket.close(); 487 } catch (IOException e) { 488 LOG.debug("Caught exception closing socket",e); 489 } 490 } 491 492 } 493 } 494 495 /** 496 * Override so that stop() blocks until the run thread is no longer running. 497 */ 498 @Override 499 public void stop() throws Exception { 500 super.stop(); 501 CountDownLatch countDownLatch = stoppedLatch.get(); 502 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) { 503 countDownLatch.await(1,TimeUnit.SECONDS); 504 } 505 } 506 507 protected void initializeStreams() throws Exception { 508 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { 509 @Override 510 public int read() throws IOException { 511 receiveCounter++; 512 return super.read(); 513 } 514 @Override 515 public int read(byte[] b, int off, int len) throws IOException { 516 receiveCounter++; 517 return super.read(b, off, len); 518 } 519 @Override 520 public long skip(long n) throws IOException { 521 receiveCounter++; 522 return super.skip(n); 523 } 524 @Override 525 protected void fill() throws IOException { 526 receiveCounter++; 527 super.fill(); 528 } 529 }; 530 this.dataIn = new DataInputStream(buffIn); 531 buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); 532 this.dataOut = new DataOutputStream(buffOut); 533 } 534 535 protected void closeStreams() throws IOException { 536 if (dataOut != null) { 537 dataOut.close(); 538 } 539 if (dataIn != null) { 540 dataIn.close(); 541 } 542 } 543 544 public void setSocketOptions(Map<String, Object> socketOptions) { 545 this.socketOptions = new HashMap<String, Object>(socketOptions); 546 } 547 548 public String getRemoteAddress() { 549 if (socket != null) { 550 return "" + socket.getRemoteSocketAddress(); 551 } 552 return null; 553 } 554 555 @Override 556 public <T> T narrow(Class<T> target) { 557 if (target == Socket.class) { 558 return target.cast(socket); 559 } else if ( target == TcpBufferedOutputStream.class) { 560 return target.cast(buffOut); 561 } 562 return super.narrow(target); 563 } 564 565 566 static { 567 SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 568 public Thread newThread(Runnable runnable) { 569 Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable); 570 thread.setPriority(Thread.MAX_PRIORITY); 571 thread.setDaemon(true); 572 return thread; 573 } 574 }); 575 } 576 577 578 public int getReceiveCounter() { 579 return receiveCounter; 580 } 581 }