Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   
   18   package org.apache.activemq.transport.tcp;
   19   
   20   import java.io.FilterOutputStream;
   21   import java.io.IOException;
   22   import java.io.OutputStream;
   23   
   24   /**
   25    * An optimized buffered outputstream for Tcp
   26    * 
   27    * @version $Revision: 1.1.1.1 $
   28    */
   29   
   30   public class TcpBufferedOutputStream extends FilterOutputStream {
   31       private static final int BUFFER_SIZE = 8192;
   32       private byte[] buffer;
   33       private int bufferlen;
   34       private int count;
   35       private volatile long writeTimestamp = -1;//concurrent reads of this value
   36       
   37   
   38       /**
   39        * Constructor
   40        * 
   41        * @param out
   42        */
   43       public TcpBufferedOutputStream(OutputStream out) {
   44           this(out, BUFFER_SIZE);
   45       }
   46   
   47       /**
   48        * Creates a new buffered output stream to write data to the specified
   49        * underlying output stream with the specified buffer size.
   50        * 
   51        * @param out the underlying output stream.
   52        * @param size the buffer size.
   53        * @throws IllegalArgumentException if size <= 0.
   54        */
   55       public TcpBufferedOutputStream(OutputStream out, int size) {
   56           super(out);
   57           if (size <= 0) {
   58               throw new IllegalArgumentException("Buffer size <= 0");
   59           }
   60           buffer = new byte[size];
   61           bufferlen = size;
   62       }
   63   
   64       /**
   65        * write a byte on to the stream
   66        * 
   67        * @param b - byte to write
   68        * @throws IOException
   69        */
   70       public void write(int b) throws IOException {
   71           if ((bufferlen - count) < 1) {
   72               flush();
   73           }
   74           buffer[count++] = (byte)b;
   75       }
   76   
   77       /**
   78        * write a byte array to the stream
   79        * 
   80        * @param b the byte buffer
   81        * @param off the offset into the buffer
   82        * @param len the length of data to write
   83        * @throws IOException
   84        */
   85       public void write(byte b[], int off, int len) throws IOException {
   86           if (b != null) {
   87               if ((bufferlen - count) < len) {
   88                   flush();
   89               }
   90               if (buffer.length >= len) {
   91                   System.arraycopy(b, off, buffer, count, len);
   92                   count += len;
   93               } else {
   94                   try {
   95                       writeTimestamp = System.currentTimeMillis();
   96                       out.write(b, off, len);
   97                   } finally {
   98                       writeTimestamp = -1;
   99                   }
  100               }
  101           }
  102       }
  103   
  104       /**
  105        * flush the data to the output stream This doesn't call flush on the
  106        * underlying outputstream, because Tcp is particularly efficent at doing
  107        * this itself ....
  108        * 
  109        * @throws IOException
  110        */
  111       public void flush() throws IOException {
  112           if (count > 0 && out != null) {
  113               try {
  114                   writeTimestamp = System.currentTimeMillis();
  115                   out.write(buffer, 0, count);
  116               } finally {
  117               	writeTimestamp = -1;
  118               }
  119               count = 0;
  120           }
  121       }
  122   
  123       /**
  124        * close this stream
  125        * 
  126        * @throws IOException
  127        */
  128       public void close() throws IOException {
  129           super.close();
  130       }
  131   
  132       public boolean isWriting() {
  133           return writeTimestamp > 0;
  134       }
  135       
  136       public long getWriteTimestamp() {
  137       	return writeTimestamp;
  138       }
  139   
  140   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » tcp » [javadoc | source]