Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.nio;
   18   
   19   import java.io.DataInputStream;
   20   import java.io.DataOutputStream;
   21   import java.io.EOFException;
   22   import java.io.IOException;
   23   import java.net.Socket;
   24   import java.net.URI;
   25   import java.net.UnknownHostException;
   26   import java.nio.ByteBuffer;
   27   import java.nio.channels.SelectionKey;
   28   import java.nio.channels.SocketChannel;
   29   
   30   import javax.net.SocketFactory;
   31   
   32   import org.apache.activemq.command.Command;
   33   import org.apache.activemq.transport.Transport;
   34   import org.apache.activemq.transport.tcp.TcpTransport;
   35   import org.apache.activemq.util.IOExceptionSupport;
   36   import org.apache.activemq.util.ServiceStopper;
   37   import org.apache.activemq.wireformat.WireFormat;
   38   
   39   /**
   40    * An implementation of the {@link Transport} interface using raw tcp/ip
   41    * 
   42    * @version $Revision$
   43    */
   44   public class NIOTransport extends TcpTransport {
   45   
   46       // private static final Log log = LogFactory.getLog(NIOTransport.class);
   47       private SocketChannel channel;
   48       private SelectorSelection selection;
   49       private ByteBuffer inputBuffer;
   50       private ByteBuffer currentBuffer;
   51       private int nextFrameSize;
   52   
   53       public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
   54           super(wireFormat, socketFactory, remoteLocation, localLocation);
   55       }
   56   
   57       public NIOTransport(WireFormat wireFormat, Socket socket) throws IOException {
   58           super(wireFormat, socket);
   59       }
   60   
   61       protected void initializeStreams() throws IOException {
   62           channel = socket.getChannel();
   63           channel.configureBlocking(false);
   64   
   65           // listen for events telling us when the socket is readable.
   66           selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
   67               public void onSelect(SelectorSelection selection) {
   68                   serviceRead();
   69               }
   70   
   71               public void onError(SelectorSelection selection, Throwable error) {
   72                   if (error instanceof IOException) {
   73                       onException((IOException)error);
   74                   } else {
   75                       onException(IOExceptionSupport.create(error));
   76                   }
   77               }
   78           });
   79   
   80           // Send the data via the channel
   81           // inputBuffer = ByteBuffer.allocateDirect(8*1024);
   82           inputBuffer = ByteBuffer.allocate(8 * 1024);
   83           currentBuffer = inputBuffer;
   84           nextFrameSize = -1;
   85           currentBuffer.limit(4);
   86           this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
   87   
   88       }
   89   
   90       private void serviceRead() {
   91           try {
   92               while (true) {
   93   
   94                   int readSize = channel.read(currentBuffer);
   95                   if (readSize == -1) {
   96                       onException(new EOFException());
   97                       selection.close();
   98                       break;
   99                   }
  100                   if (readSize == 0) {
  101                       break;
  102                   }
  103   
  104                   if (currentBuffer.hasRemaining()) {
  105                       continue;
  106                   }
  107   
  108                   // Are we trying to figure out the size of the next frame?
  109                   if (nextFrameSize == -1) {
  110                       assert inputBuffer == currentBuffer;
  111   
  112                       // If the frame is too big to fit in our direct byte buffer,
  113                       // Then allocate a non direct byte buffer of the right size
  114                       // for it.
  115                       inputBuffer.flip();
  116                       nextFrameSize = inputBuffer.getInt() + 4;
  117                       if (nextFrameSize > inputBuffer.capacity()) {
  118                           currentBuffer = ByteBuffer.allocate(nextFrameSize);
  119                           currentBuffer.putInt(nextFrameSize);
  120                       } else {
  121                           inputBuffer.limit(nextFrameSize);
  122                       }
  123   
  124                   } else {
  125                       currentBuffer.flip();
  126   
  127                       Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
  128                       doConsume((Command)command);
  129   
  130                       nextFrameSize = -1;
  131                       inputBuffer.clear();
  132                       inputBuffer.limit(4);
  133                       currentBuffer = inputBuffer;
  134                   }
  135   
  136               }
  137   
  138           } catch (IOException e) {
  139               onException(e);
  140           } catch (Throwable e) {
  141               onException(IOExceptionSupport.create(e));
  142           }
  143       }
  144   
  145       protected void doStart() throws Exception {
  146           connect();
  147           selection.setInterestOps(SelectionKey.OP_READ);
  148           selection.enable();
  149       }
  150   
  151       protected void doStop(ServiceStopper stopper) throws Exception {
  152           selection.close();
  153           super.doStop(stopper);
  154       }
  155   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]