Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.tcp;
   18   
   19   import java.io.FilterInputStream;
   20   import java.io.IOException;
   21   import java.io.InputStream;
   22   
   23   /**
   24    * An optimized buffered input stream for Tcp
   25    * 
   26    * @version $Revision: 1.1.1.1 $
   27    */
   28   public class TcpBufferedInputStream extends FilterInputStream {
   29       private static final int DEFAULT_BUFFER_SIZE = 8192;
   30       protected byte internalBuffer[];
   31       protected int count;
   32       protected int position;
   33   
   34       public TcpBufferedInputStream(InputStream in) {
   35           this(in, DEFAULT_BUFFER_SIZE);
   36       }
   37   
   38       public TcpBufferedInputStream(InputStream in, int size) {
   39           super(in);
   40           if (size <= 0) {
   41               throw new IllegalArgumentException("Buffer size <= 0");
   42           }
   43           internalBuffer = new byte[size];
   44       }
   45   
   46       protected void fill() throws IOException {
   47           byte[] buffer = internalBuffer;
   48           count = 0;
   49           position = 0;
   50           int n = in.read(buffer, position, buffer.length - position);
   51           if (n > 0) {
   52               count = n + position;
   53           }
   54       }
   55   
   56       public int read() throws IOException {
   57           if (position >= count) {
   58               fill();
   59               if (position >= count) {
   60                   return -1;
   61               }
   62           }
   63           return internalBuffer[position++] & 0xff;
   64       }
   65   
   66       private int readStream(byte[] b, int off, int len) throws IOException {
   67           int avail = count - position;
   68           if (avail <= 0) {
   69               if (len >= internalBuffer.length) {
   70                   return in.read(b, off, len);
   71               }
   72               fill();
   73               avail = count - position;
   74               if (avail <= 0) {
   75                   return -1;
   76               }
   77           }
   78           int cnt = (avail < len) ? avail : len;
   79           System.arraycopy(internalBuffer, position, b, off, cnt);
   80           position += cnt;
   81           return cnt;
   82       }
   83   
   84       public int read(byte b[], int off, int len) throws IOException {
   85           if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
   86               throw new IndexOutOfBoundsException();
   87           } else if (len == 0) {
   88               return 0;
   89           }
   90           int n = 0;
   91           for (;;) {
   92               int nread = readStream(b, off + n, len - n);
   93               if (nread <= 0) {
   94                   return (n == 0) ? nread : n;
   95               }
   96               n += nread;
   97               if (n >= len) {
   98                   return n;
   99               }
  100               // if not closed but no bytes available, return
  101               InputStream input = in;
  102               if (input != null && input.available() <= 0) {
  103                   return n;
  104               }
  105           }
  106       }
  107   
  108       public long skip(long n) throws IOException {
  109           if (n <= 0) {
  110               return 0;
  111           }
  112           long avail = count - position;
  113           if (avail <= 0) {
  114               return in.skip(n);
  115           }
  116           long skipped = (avail < n) ? avail : n;
  117           position += skipped;
  118           return skipped;
  119       }
  120   
  121       public int available() throws IOException {
  122           return in.available() + (count - position);
  123       }
  124   
  125       public boolean markSupported() {
  126           return false;
  127       }
  128   
  129       public void close() throws IOException {
  130           if (in != null) {
  131               in.close();
  132           }
  133       }
  134   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]