1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.synapse.transport.nhttp.util; 21 22 import org.apache.http.nio.util.ExpandableBuffer; 23 import org.apache.http.nio.util.ContentInputBuffer; 24 import org.apache.http.nio.util.ByteBufferAllocator; 25 import org.apache.http.nio.IOControl; 26 import org.apache.http.nio.ContentDecoder; 27 28 import java.io.IOException; 29 import java.io.InterruptedIOException; 30 31 /** 32 * A copy of the SharedInputBuffer implementation of Apache HttpComponents - HttpCore/NIO 33 * found at http://svn.apache.org/repos/asf/httpcomponents/httpcore/trunk/module-nio/ 34 * src/main/java/org/apache/http/nio/util/SharedInputBuffer.java 35 * 36 * To include the fix described here : http://svn.apache.org/viewvc/httpcomponents/httpcore/ 37 * trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java 38 * ?view=diff&r1=659956&r2=659957&pathrev=659957 39 * with the HttpCore version 4.0-beta1 40 * 41 * TODO : This class to be removed as soon as we update the HttpCore dependency from 4.0-beta1 42 */ 43 public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer { 44 45 private final IOControl ioctrl; 46 private final Object mutex; 47 48 private volatile boolean shutdown = false; 49 private volatile boolean endOfStream = false; 50 51 public SharedInputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) { 52 super(buffersize, allocator); 53 if (ioctrl == null) { 54 throw new IllegalArgumentException("I/O content control may not be null"); 55 } 56 this.ioctrl = ioctrl; 57 this.mutex = new Object(); 58 } 59 60 public void reset() { 61 if (this.shutdown) { 62 return; 63 } 64 synchronized (this.mutex) { 65 clear(); 66 this.endOfStream = false; 67 } 68 } 69 70 public int consumeContent(final ContentDecoder decoder) throws IOException { 71 if (this.shutdown) { 72 return -1; 73 } 74 synchronized (this.mutex) { 75 setInputMode(); 76 int totalRead = 0; 77 int bytesRead; 78 while ((bytesRead = decoder.read(this.buffer)) > 0) { 79 totalRead += bytesRead; 80 } 81 if (bytesRead == -1 || decoder.isCompleted()) { 82 this.endOfStream = true; 83 } 84 if (!this.buffer.hasRemaining()) { 85 this.ioctrl.suspendInput(); 86 } 87 this.mutex.notifyAll(); 88 89 if (totalRead > 0) { 90 return totalRead; 91 } else { 92 if (this.endOfStream) { 93 return -1; 94 } else { 95 return 0; 96 } 97 } 98 } 99 } 100 101 protected void waitForData() throws IOException { 102 synchronized (this.mutex) { 103 try { 104 while (!hasData() && !this.endOfStream) { 105 if (this.shutdown) { 106 throw new InterruptedIOException("Input operation aborted"); 107 } 108 this.ioctrl.requestInput(); 109 this.mutex.wait(); 110 } 111 } catch (InterruptedException ex) { 112 throw new IOException("Interrupted while waiting for more data"); 113 } 114 } 115 } 116 117 public void shutdown() { 118 if (this.shutdown) { 119 return; 120 } 121 this.shutdown = true; 122 synchronized (this.mutex) { 123 this.mutex.notifyAll(); 124 } 125 } 126 127 protected boolean isShutdown() { 128 return this.shutdown; 129 } 130 131 protected boolean isEndOfStream() { 132 return this.shutdown || (!hasData() && this.endOfStream); 133 } 134 135 public int read() throws IOException { 136 if (this.shutdown) { 137 return -1; 138 } 139 synchronized (this.mutex) { 140 if (!hasData()) { 141 waitForData(); 142 } 143 if (isEndOfStream()) { 144 return -1; 145 } 146 return this.buffer.get() & 0xff; 147 } 148 } 149 150 public int read(final byte[] b, int off, int len) throws IOException { 151 if (this.shutdown) { 152 return -1; 153 } 154 if (b == null) { 155 return 0; 156 } 157 synchronized (this.mutex) { 158 if (!hasData()) { 159 waitForData(); 160 } 161 if (isEndOfStream()) { 162 return -1; 163 } 164 setOutputMode(); 165 int chunk = len; 166 if (chunk > this.buffer.remaining()) { 167 chunk = this.buffer.remaining(); 168 } 169 this.buffer.get(b, off, chunk); 170 return chunk; 171 } 172 } 173 174 public int read(final byte[] b) throws IOException { 175 if (this.shutdown) { 176 return -1; 177 } 178 if (b == null) { 179 return 0; 180 } 181 return read(b, 0, b.length); 182 } 183 184 }