1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq.transport.nio; 18 19 import java.io.EOFException; 20 import java.io.IOException; 21 import java.io.InterruptedIOException; 22 import java.io.OutputStream; 23 import java.nio.ByteBuffer; 24 import java.nio.channels.WritableByteChannel; 25 26 /** 27 * An optimized buffered outputstream for Tcp 28 * 29 * @version $Revision: 1.1.1.1 $ 30 */ 31 32 public class NIOOutputStream extends OutputStream { 33 34 private static final int BUFFER_SIZE = 8192; 35 36 private final WritableByteChannel out; 37 private final byte[] buffer; 38 private final ByteBuffer byteBuffer; 39 40 private int count; 41 private boolean closed; 42 43 /** 44 * Constructor 45 * 46 * @param out 47 */ 48 public NIOOutputStream(WritableByteChannel out) { 49 this(out, BUFFER_SIZE); 50 } 51 52 /** 53 * Creates a new buffered output stream to write data to the specified 54 * underlying output stream with the specified buffer size. 55 * 56 * @param out the underlying output stream. 57 * @param size the buffer size. 58 * @throws IllegalArgumentException if size <= 0. 59 */ 60 public NIOOutputStream(WritableByteChannel out, int size) { 61 this.out = out; 62 if (size <= 0) { 63 throw new IllegalArgumentException("Buffer size <= 0"); 64 } 65 buffer = new byte[size]; 66 byteBuffer = ByteBuffer.wrap(buffer); 67 } 68 69 /** 70 * write a byte on to the stream 71 * 72 * @param b - byte to write 73 * @throws IOException 74 */ 75 public void write(int b) throws IOException { 76 checkClosed(); 77 if (availableBufferToWrite() < 1) { 78 flush(); 79 } 80 buffer[count++] = (byte)b; 81 } 82 83 /** 84 * write a byte array to the stream 85 * 86 * @param b the byte buffer 87 * @param off the offset into the buffer 88 * @param len the length of data to write 89 * @throws IOException 90 */ 91 public void write(byte b[], int off, int len) throws IOException { 92 checkClosed(); 93 if (availableBufferToWrite() < len) { 94 flush(); 95 } 96 if (buffer.length >= len) { 97 System.arraycopy(b, off, buffer, count, len); 98 count += len; 99 } else { 100 write(ByteBuffer.wrap(b, off, len)); 101 } 102 } 103 104 /** 105 * flush the data to the output stream This doesn't call flush on the 106 * underlying outputstream, because Tcp is particularly efficent at doing 107 * this itself .... 108 * 109 * @throws IOException 110 */ 111 public void flush() throws IOException { 112 if (count > 0 && out != null) { 113 byteBuffer.position(0); 114 byteBuffer.limit(count); 115 write(byteBuffer); 116 count = 0; 117 } 118 } 119 120 /** 121 * close this stream 122 * 123 * @throws IOException 124 */ 125 public void close() throws IOException { 126 super.close(); 127 closed = true; 128 } 129 130 /** 131 * Checks that the stream has not been closed 132 * 133 * @throws IOException 134 */ 135 protected void checkClosed() throws IOException { 136 if (closed) { 137 throw new EOFException("Cannot write to the stream any more it has already been closed"); 138 } 139 } 140 141 /** 142 * @return the amount free space in the buffer 143 */ 144 private int availableBufferToWrite() { 145 return buffer.length - count; 146 } 147 148 protected void write(ByteBuffer data) throws IOException { 149 int remaining = data.remaining(); 150 int lastRemaining = remaining - 1; 151 long delay = 1; 152 while (remaining > 0) { 153 154 // We may need to do a little bit of sleeping to avoid a busy loop. 155 // Slow down if no data was written out.. 156 if (remaining == lastRemaining) { 157 try { 158 // Use exponential rollback to increase sleep time. 159 Thread.sleep(delay); 160 delay *= 2; 161 if (delay > 1000) { 162 delay = 1000; 163 } 164 } catch (InterruptedException e) { 165 throw new InterruptedIOException(); 166 } 167 } else { 168 delay = 1; 169 } 170 lastRemaining = remaining; 171 172 // Since the write is non-blocking, all the data may not have been 173 // written. 174 out.write(data); 175 remaining = data.remaining(); 176 } 177 } 178 179 }