Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.nio;
   18   
   19   import java.io.EOFException;
   20   import java.io.IOException;
   21   import java.io.InterruptedIOException;
   22   import java.io.OutputStream;
   23   import java.nio.ByteBuffer;
   24   import java.nio.channels.WritableByteChannel;
   25   
   26   /**
   27    * An optimized buffered outputstream for Tcp
   28    * 
   29    * @version $Revision: 1.1.1.1 $
   30    */
   31   
   32   public class NIOOutputStream extends OutputStream {
   33   
   34       private static final int BUFFER_SIZE = 8192;
   35   
   36       private final WritableByteChannel out;
   37       private final byte[] buffer;
   38       private final ByteBuffer byteBuffer;
   39   
   40       private int count;
   41       private boolean closed;
   42   
   43       /**
   44        * Constructor
   45        * 
   46        * @param out
   47        */
   48       public NIOOutputStream(WritableByteChannel out) {
   49           this(out, BUFFER_SIZE);
   50       }
   51   
   52       /**
   53        * Creates a new buffered output stream to write data to the specified
   54        * underlying output stream with the specified buffer size.
   55        * 
   56        * @param out the underlying output stream.
   57        * @param size the buffer size.
   58        * @throws IllegalArgumentException if size <= 0.
   59        */
   60       public NIOOutputStream(WritableByteChannel out, int size) {
   61           this.out = out;
   62           if (size <= 0) {
   63               throw new IllegalArgumentException("Buffer size <= 0");
   64           }
   65           buffer = new byte[size];
   66           byteBuffer = ByteBuffer.wrap(buffer);
   67       }
   68   
   69       /**
   70        * write a byte on to the stream
   71        * 
   72        * @param b - byte to write
   73        * @throws IOException
   74        */
   75       public void write(int b) throws IOException {
   76           checkClosed();
   77           if (availableBufferToWrite() < 1) {
   78               flush();
   79           }
   80           buffer[count++] = (byte)b;
   81       }
   82   
   83       /**
   84        * write a byte array to the stream
   85        * 
   86        * @param b the byte buffer
   87        * @param off the offset into the buffer
   88        * @param len the length of data to write
   89        * @throws IOException
   90        */
   91       public void write(byte b[], int off, int len) throws IOException {
   92           checkClosed();
   93           if (availableBufferToWrite() < len) {
   94               flush();
   95           }
   96           if (buffer.length >= len) {
   97               System.arraycopy(b, off, buffer, count, len);
   98               count += len;
   99           } else {
  100               write(ByteBuffer.wrap(b, off, len));
  101           }
  102       }
  103   
  104       /**
  105        * flush the data to the output stream This doesn't call flush on the
  106        * underlying outputstream, because Tcp is particularly efficent at doing
  107        * this itself ....
  108        * 
  109        * @throws IOException
  110        */
  111       public void flush() throws IOException {
  112           if (count > 0 && out != null) {
  113               byteBuffer.position(0);
  114               byteBuffer.limit(count);
  115               write(byteBuffer);
  116               count = 0;
  117           }
  118       }
  119   
  120       /**
  121        * close this stream
  122        * 
  123        * @throws IOException
  124        */
  125       public void close() throws IOException {
  126           super.close();
  127           closed = true;
  128       }
  129   
  130       /**
  131        * Checks that the stream has not been closed
  132        * 
  133        * @throws IOException
  134        */
  135       protected void checkClosed() throws IOException {
  136           if (closed) {
  137               throw new EOFException("Cannot write to the stream any more it has already been closed");
  138           }
  139       }
  140   
  141       /**
  142        * @return the amount free space in the buffer
  143        */
  144       private int availableBufferToWrite() {
  145           return buffer.length - count;
  146       }
  147   
  148       protected void write(ByteBuffer data) throws IOException {
  149           int remaining = data.remaining();
  150           int lastRemaining = remaining - 1;
  151           long delay = 1;
  152           while (remaining > 0) {
  153   
  154               // We may need to do a little bit of sleeping to avoid a busy loop.
  155               // Slow down if no data was written out..
  156               if (remaining == lastRemaining) {
  157                   try {
  158                       // Use exponential rollback to increase sleep time.
  159                       Thread.sleep(delay);
  160                       delay *= 2;
  161                       if (delay > 1000) {
  162                           delay = 1000;
  163                       }
  164                   } catch (InterruptedException e) {
  165                       throw new InterruptedIOException();
  166                   }
  167               } else {
  168                   delay = 1;
  169               }
  170               lastRemaining = remaining;
  171   
  172               // Since the write is non-blocking, all the data may not have been
  173               // written.
  174               out.write(data);
  175               remaining = data.remaining();
  176           }
  177       }
  178   
  179   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » nio » [javadoc | source]