Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]

    1   /**
    2    *
    3    * Licensed to the Apache Software Foundation (ASF) under one or more
    4    * contributor license agreements.  See the NOTICE file distributed with
    5    * this work for additional information regarding copyright ownership.
    6    * The ASF licenses this file to You under the Apache License, Version 2.0
    7    * (the "License"); you may not use this file except in compliance with
    8    * the License.  You may obtain a copy of the License at
    9    *
   10    * http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing, software
   13    * distributed under the License is distributed on an "AS IS" BASIS,
   14    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    * See the License for the specific language governing permissions and
   16    * limitations under the License.
   17    */
   18   package org.apache.activemq.transport.nio;
   19   
   20   import java.io.IOException;
   21   import java.io.InputStream;
   22   import java.nio.ByteBuffer;
   23   import java.nio.channels.Channel;
   24   import java.nio.channels.ClosedChannelException;
   25   import java.nio.channels.ReadableByteChannel;
   26   import java.nio.channels.SelectionKey;
   27   import java.nio.channels.Selector;
   28   import java.nio.channels.SocketChannel;
   29   
   30   /**
   31    * Implementation of InputStream using Java NIO channel,direct buffer and
   32    * Selector
   33    */
   34   public class NIOBufferedInputStream extends InputStream {
   35   
   36       private final static int BUFFER_SIZE = 8192;
   37   
   38       private SocketChannel sc = null;
   39   
   40       private ByteBuffer bb = null;
   41   
   42       private Selector rs = null;
   43   
   44       public NIOBufferedInputStream(ReadableByteChannel channel, int size)
   45               throws ClosedChannelException, IOException {
   46   
   47           if (size <= 0) {
   48               throw new IllegalArgumentException("Buffer size <= 0");
   49           }
   50   
   51           this.bb = ByteBuffer.allocateDirect(size);
   52           this.sc = (SocketChannel) channel;
   53   
   54           this.sc.configureBlocking(false);
   55   
   56           this.rs = Selector.open();
   57   
   58           sc.register(rs, SelectionKey.OP_READ);
   59   
   60           bb.position(0);
   61           bb.limit(0);
   62       }
   63   
   64       public NIOBufferedInputStream(ReadableByteChannel channel)
   65               throws ClosedChannelException, IOException {
   66           this(channel, BUFFER_SIZE);
   67       }
   68   
   69       public int available() throws IOException {
   70           if (!rs.isOpen())
   71               throw new IOException("Input Stream Closed");
   72   
   73           return bb.remaining();
   74       }
   75   
   76       public void close() throws IOException {
   77           if (rs.isOpen()) {
   78               rs.close();
   79   
   80               if (sc.isOpen()) {
   81                   sc.socket().shutdownInput();
   82                   sc.socket().close();
   83               }
   84   
   85               bb = null;
   86               sc = null;
   87           }
   88       }
   89   
   90       public int read() throws IOException {
   91           if (!rs.isOpen())
   92               throw new IOException("Input Stream Closed");
   93   
   94           if (!bb.hasRemaining()) {
   95               try {
   96                   fill(1);
   97               } catch (ClosedChannelException e) {
   98                   close();
   99                   return -1;
  100               }
  101           }
  102   
  103           return (bb.get() & 0xFF);
  104       }
  105   
  106       public int read(byte[] b, int off, int len) throws IOException {
  107           int bytesCopied = -1;
  108   
  109           if (!rs.isOpen())
  110               throw new IOException("Input Stream Closed");
  111   
  112           while (bytesCopied == -1) {
  113               if (bb.hasRemaining()) {
  114                   bytesCopied = (len < bb.remaining() ? len : bb.remaining());
  115                   bb.get(b, off, bytesCopied);
  116               } else {
  117                   try {
  118                       fill(1);
  119                   } catch (ClosedChannelException e) {
  120                       close();
  121                       return -1;
  122                   }
  123               }
  124           }
  125   
  126           return bytesCopied;
  127       }
  128   
  129       public long skip(long n) throws IOException {
  130           long skiped = 0;
  131   
  132           if (!rs.isOpen())
  133               throw new IOException("Input Stream Closed");
  134   
  135           while (n > 0) {
  136               if (n <= bb.remaining()) {
  137                   skiped += n;
  138                   bb.position(bb.position() + (int) n);
  139                   n = 0;
  140               } else {
  141                   skiped += bb.remaining();
  142                   n -= bb.remaining();
  143   
  144                   bb.position(bb.limit());
  145   
  146                   try {
  147                       fill((int) n);
  148                   } catch (ClosedChannelException e) {
  149                       close();
  150                       return skiped;
  151                   }
  152               }
  153           }
  154   
  155           return skiped;
  156       }
  157   
  158       private void fill(int n) throws IOException, ClosedChannelException {
  159           int bytesRead = -1;
  160   
  161           if ((n <= 0) || (n <= bb.remaining()))
  162               return;
  163   
  164           bb.compact();
  165   
  166           n = (bb.remaining() < n ? bb.remaining() : n);
  167   
  168           for (;;) {
  169               bytesRead = sc.read(bb);
  170   
  171               if (bytesRead == -1)
  172                   throw new ClosedChannelException();
  173   
  174               n -= bytesRead;
  175   
  176               if (n <= 0)
  177                   break;
  178   
  179               rs.select(0);
  180               rs.selectedKeys().clear();
  181           }
  182   
  183           bb.flip();
  184       }
  185   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]