| Method from org.apache.activemq.transport.tcp.TcpTransport Detail: |
protected void closeStreams() throws IOException {
if (dataOut != null) {
dataOut.close();
}
if (dataIn != null) {
dataIn.close();
}
}
|
protected void connect() throws Exception {
if (socket == null && socketFactory == null) {
throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
}
InetSocketAddress localAddress = null;
InetSocketAddress remoteAddress = null;
if (localLocation != null) {
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
localLocation.getPort());
}
if (remoteLocation != null) {
String host = resolveHostName(remoteLocation.getHost());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
}
if (socket != null) {
if (localAddress != null) {
socket.bind(localAddress);
}
// If it's a server accepted socket.. we don't need to connect it
// to a remote address.
if (remoteAddress != null) {
if (connectionTimeout >= 0) {
socket.connect(remoteAddress, connectionTimeout);
} else {
socket.connect(remoteAddress);
}
}
} else {
// For SSL sockets.. you can't create an unconnected socket :(
// This means the timout option are not supported either.
if (localAddress != null) {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
localAddress.getAddress(), localAddress.getPort());
} else {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
}
}
initialiseSocket(socket);
initializeStreams();
}
|
protected void doRun() throws IOException {
try {
Object command = readCommand();
doConsume(command);
} catch (SocketTimeoutException e) {
} catch (InterruptedIOException e) {
}
}
|
protected void doStart() throws Exception {
connect();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}
|
protected void doStop(ServiceStopper stopper) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping transport " + this);
}
// Closing the streams flush the sockets before closing.. if the socket
// is hung.. then this hangs the close.
// closeStreams();
if (socket != null) {
//closing the socket can hang also
final CountDownLatch latch = new CountDownLatch(1);
SOCKET_CLOSE.execute(new Runnable() {
public void run() {
try {
socket.close();
} catch (IOException e) {
LOG.debug("Caught exception closing socket",e);
}finally {
latch.countDown();
}
}
});
latch.await(1,TimeUnit.SECONDS);
}
}
|
public int getConnectionTimeout() {
return connectionTimeout;
}
|
public int getIoBufferSize() {
return this.ioBufferSize;
}
|
public int getJmxPort() {
return jmxPort;
}
|
public Boolean getKeepAlive() {
return keepAlive;
}
|
public String getLogWriterName() {
return logWriterName;
}
|
public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion;
}
|
public String getRemoteAddress() {
if (socket != null) {
return "" + socket.getRemoteSocketAddress();
}
return null;
}
|
public int getSoTimeout() {
return soTimeout;
}
|
public int getSocketBufferSize() {
return socketBufferSize;
}
|
public Boolean getTcpNoDelay() {
return tcpNoDelay;
}
|
protected void initialiseSocket(Socket sock) throws SocketException {
if (socketOptions != null) {
IntrospectionSupport.setProperties(socket, socketOptions);
}
try {
sock.setReceiveBufferSize(socketBufferSize);
sock.setSendBufferSize(socketBufferSize);
} catch (SocketException se) {
LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
LOG.debug("Cannot set socket buffer size. Reason: " + se, se);
}
sock.setSoTimeout(soTimeout);
if (keepAlive != null) {
sock.setKeepAlive(keepAlive.booleanValue());
}
if (tcpNoDelay != null) {
sock.setTcpNoDelay(tcpNoDelay.booleanValue());
}
}
Configures the socket for use |
protected void initializeStreams() throws Exception {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
this.dataIn = new DataInputStream(buffIn);
TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(buffOut);
}
|
public boolean isDynamicManagement() {
return dynamicManagement;
}
|
public boolean isStartLogging() {
return startLogging;
}
|
public boolean isTrace() {
return trace;
}
|
public boolean isUseLocalHost() {
return useLocalHost;
}
|
public T narrow(Class target) {
if (target == Socket.class) {
return target.cast(socket);
}
return super.narrow(target);
}
|
public void oneway(Object command) throws IOException {
checkStarted();
wireFormat.marshal(command, dataOut);
dataOut.flush();
}
A one way asynchronous send |
protected Object readCommand() throws IOException {
return wireFormat.unmarshal(dataIn);
}
|
protected String resolveHostName(String host) throws UnknownHostException {
String localName = InetAddress.getLocalHost().getHostName();
if (localName != null && isUseLocalHost()) {
if (localName.equals(host)) {
return "localhost";
}
}
return host;
}
|
public void run() {
LOG.trace("TCP consumer thread starting");
this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
doRun();
}
} catch (IOException e) {
stoppedLatch.get().countDown();
onException(e);
} finally {
stoppedLatch.get().countDown();
}
}
reads packets from a Socket |
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
Sets the timeout used to connect to the socket |
public void setDynamicManagement(boolean useJmx) {
this.dynamicManagement = useJmx;
}
|
public void setIoBufferSize(int ioBufferSize) {
this.ioBufferSize = ioBufferSize;
}
|
public void setJmxPort(int jmxPort) {
this.jmxPort = jmxPort;
}
|
public void setKeepAlive(Boolean keepAlive) {
this.keepAlive = keepAlive;
}
Enable/disable TCP KEEP_ALIVE mode |
public void setLogWriterName(String logFormat) {
this.logWriterName = logFormat;
}
|
public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
this.minmumWireFormatVersion = minmumWireFormatVersion;
}
|
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}
|
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}
Sets the buffer size to use on the socket |
public void setSocketOptions(Map socketOptions) {
this.socketOptions = new HashMap< String, Object >(socketOptions);
}
|
public void setStartLogging(boolean startLogging) {
this.startLogging = startLogging;
}
|
public void setTcpNoDelay(Boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
Enable/disable the TCP_NODELAY option on the socket |
public void setTrace(boolean trace) {
this.trace = trace;
}
|
public void setUseLocalHost(boolean useLocalHost) {
this.useLocalHost = useLocalHost;
}
Sets whether 'localhost' or the actual local host name should be used to
make local connections. On some operating systems such as Macs its not
possible to connect as the local host name so localhost is better. |
public void stop() throws Exception {
super.stop();
CountDownLatch countDownLatch = stoppedLatch.get();
if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
countDownLatch.await(1,TimeUnit.SECONDS);
}
}
Override so that stop() blocks until the run thread is no longer running. |
public String toString() {
return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
}
|