Method from org.apache.activemq.transport.tcp.TcpTransportServer Detail: |
public void bind() throws IOException {
URI bind = getBindLocation();
String host = bind.getHost();
host = (host == null || host.length() == 0) ? "localhost" : host;
InetAddress addr = InetAddress.getByName(host);
try {
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
configureServerSocket(this.serverSocket);
} catch (IOException e) {
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
}
try {
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
.getFragment()));
} catch (URISyntaxException e) {
// it could be that the host name contains invalid characters such
// as _ on unix platforms
// so lets try use the IP address instead
try {
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e2) {
throw IOExceptionSupport.create(e2);
}
}
}
|
protected Transport createTransport(Socket socket,
WireFormat format) throws IOException {
return new TcpTransport(format, socket);
}
Allow derived classes to override the Transport implementation that this
transport server creates. |
protected void doStart() throws Exception {
if(useQueueForAccept) {
Runnable run = new Runnable() {
public void run() {
try {
while (!isStopped() && !isStopping()) {
Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
if (sock != null) {
handleSocket(sock);
}
}
} catch (InterruptedException e) {
LOG.info("socketQueue interuppted - stopping");
if (!isStopping()) {
onAcceptError(e);
}
}
}
};
socketHandlerThread = new Thread(null, run,
"ActiveMQ Transport Server Thread Handler: " + toString(),
getStackSize());
socketHandlerThread.setDaemon(true);
socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
socketHandlerThread.start();
}
super.doStart();
}
|
protected void doStop(ServiceStopper stopper) throws Exception {
super.doStop(stopper);
if (serverSocket != null) {
serverSocket.close();
}
}
|
public int getBacklog() {
return backlog;
}
|
public int getConnectionTimeout() {
return connectionTimeout;
}
|
public String getLogWriterName() {
return logWriterName;
}
|
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
|
public long getMaxInactivityDurationInitalDelay() {
return this.maxInactivityDurationInitalDelay;
}
|
public int getMaximumConnections() {
return maximumConnections;
}
|
public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion;
}
|
public int getSoTimeout() {
return soTimeout;
}
|
public InetSocketAddress getSocketAddress() {
return (InetSocketAddress)serverSocket.getLocalSocketAddress();
}
|
public int getSocketBufferSize() {
return socketBufferSize;
}
|
public WireFormatFactory getWireFormatFactory() {
return wireFormatFactory;
}
|
protected final void handleSocket(Socket socket) {
try {
if (this.currentTransportCount >= this.maximumConnections) {
throw new ExceededMaximumConnectionsException("Exceeded the maximum " +
"number of allowed client connections. See the 'maximumConnections' " +
"property on the TCP transport configuration URI in the ActiveMQ " +
"configuration file (e.g., activemq.xml)");
} else {
HashMap< String, Object > options = new HashMap< String, Object >();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("maxInactivityDurationInitalDelay",
Long.valueOf(maxInactivityDurationInitalDelay));
options.put("minmumWireFormatVersion",
Integer.valueOf(minmumWireFormatVersion));
options.put("trace", Boolean.valueOf(trace));
options.put("soTimeout", Integer.valueOf(soTimeout));
options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
options.put("logWriterName", logWriterName);
options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
options.put("startLogging", Boolean.valueOf(startLogging));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
if (transport instanceof ServiceSupport) {
((ServiceSupport) transport).addServiceListener(this);
}
Transport configuredTransport =
transportFactory.serverConfigure( transport, format, options);
getAcceptListener().onAccept(configuredTransport);
}
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
LOG.warn("run()", e);
onAcceptError(e);
}
}
}
|
public boolean isDynamicManagement() {
return dynamicManagement;
}
|
public boolean isStartLogging() {
return startLogging;
}
|
public boolean isTrace() {
return trace;
}
|
public boolean isUseQueueForAccept() {
return useQueueForAccept;
}
|
protected String resolveHostName(ServerSocket socket,
InetAddress bindAddress) throws UnknownHostException {
String result = null;
if (socket.isBound()) {
if (socket.getInetAddress().isAnyLocalAddress()) {
// make it more human readable and useful, an alternative to 0.0.0.0
result = InetAddress.getLocalHost().getHostName();
} else {
result = socket.getInetAddress().getCanonicalHostName();
}
} else {
result = bindAddress.getCanonicalHostName();
}
return result;
}
|
public void run() {
while (!isStopped()) {
Socket socket = null;
try {
socket = serverSocket.accept();
if (socket != null) {
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
if (useQueueForAccept) {
socketQueue.put(socket);
}else {
handleSocket(socket);
}
}
}
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
LOG.warn("run()", e);
onAcceptError(e);
}
}
}
}
pull Sockets from the ServerSocket |
public void setBacklog(int backlog) {
this.backlog = backlog;
}
|
public void setBrokerInfo(BrokerInfo brokerInfo) {
}
Associates a broker info with the transport server so that the transport
can do discovery advertisements of the broker. |
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
|
public void setDynamicManagement(boolean useJmx) {
this.dynamicManagement = useJmx;
}
|
public void setLogWriterName(String logFormat) {
this.logWriterName = logFormat;
}
|
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
|
public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
}
|
public void setMaximumConnections(int maximumConnections) {
this.maximumConnections = maximumConnections;
}
|
public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
this.minmumWireFormatVersion = minmumWireFormatVersion;
}
|
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}
|
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}
|
public void setStartLogging(boolean startLogging) {
this.startLogging = startLogging;
}
|
public void setTrace(boolean trace) {
this.trace = trace;
}
|
public void setTransportOption(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
|
public void setUseQueueForAccept(boolean useQueueForAccept) {
this.useQueueForAccept = useQueueForAccept;
}
|
public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
this.wireFormatFactory = wireFormatFactory;
}
|
public void started(Service service) {
this.currentTransportCount++;
}
|
public void stopped(Service service) {
this.currentTransportCount--;
}
|
public String toString() {
return "" + getBindLocation();
}
|