summaryrefslogtreecommitdiffstats
path: root/vespajlib/src/main/java/com/yahoo/io/GrowableBufferOutputStream.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespajlib/src/main/java/com/yahoo/io/GrowableBufferOutputStream.java')
-rw-r--r--vespajlib/src/main/java/com/yahoo/io/GrowableBufferOutputStream.java186
1 files changed, 186 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/io/GrowableBufferOutputStream.java b/vespajlib/src/main/java/com/yahoo/io/GrowableBufferOutputStream.java
new file mode 100644
index 00000000000..85b249432d4
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/io/GrowableBufferOutputStream.java
@@ -0,0 +1,186 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.io;
+
+import java.nio.channels.WritableByteChannel;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Stack;
+import java.util.LinkedList;
+import java.util.Iterator;
+import java.nio.ByteBuffer;
+
+
+/**
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class GrowableBufferOutputStream extends OutputStream {
+// private static final int MINIMUM_BUFFERSIZE = (64 * 1024);
+ private ByteBuffer lastBuffer;
+ private ByteBuffer directBuffer;
+ private LinkedList<ByteBuffer> bufferList = new LinkedList<>();
+ private Stack<ByteBuffer> recycledBuffers = new Stack<>();
+
+ private int bufferSize;
+ private int maxBuffers;
+
+ public GrowableBufferOutputStream(int bufferSize, int maxBuffers) {
+ this.bufferSize = bufferSize;
+ this.maxBuffers = maxBuffers;
+ lastBuffer = ByteBuffer.allocate(bufferSize);
+ directBuffer = ByteBuffer.allocateDirect(bufferSize);
+ }
+
+ @Override
+ public void write(byte[] cbuf, int off, int len) throws IOException {
+ if (lastBuffer.remaining() >= len) {
+ lastBuffer.put(cbuf, off, len);
+ return;
+ }
+
+ int residue = len;
+
+ while (residue > 0) {
+ int newOffset = len - residue;
+ int toWrite = Math.min(lastBuffer.remaining(), residue);
+
+ lastBuffer.put(cbuf, newOffset, toWrite);
+ residue -= toWrite;
+ if (residue != 0) {
+ extend();
+ }
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b,0,b.length);
+ }
+
+ @Override
+ public String toString() {
+ return "GrowableBufferOutputStream, writable size " + writableSize()
+ + " bytes, " + numWritableBuffers() + " buffers, last buffer"
+ + " position " + lastBuffer.position() + ", last buffer limit "
+ + lastBuffer.limit();
+ }
+
+ public void write(int b) {
+ if (lastBuffer.remaining() == 0) {
+ extend();
+ }
+ lastBuffer.put((byte) b);
+ }
+
+ @Override
+ public void flush() {
+ // if the last buffer is untouched we do not need to do anything; if
+ // it has been touched we call extend(), which enqueues the buffer
+ // and allocates or recycles a buffer for us
+ if (lastBuffer.position() > 0) {
+ extend();
+ }
+ }
+
+ @Override
+ public void close() {
+ flush();
+ }
+
+ public int channelWrite(WritableByteChannel channel) throws IOException {
+ ByteBuffer buffer;
+ int totalWritten = 0;
+
+ while (!bufferList.isEmpty()) {
+ buffer = bufferList.getFirst();
+ int written = 0;
+
+ synchronized (directBuffer) {
+ directBuffer.clear();
+ directBuffer.put(buffer);
+ directBuffer.flip();
+ written = channel.write(directBuffer);
+ int left = directBuffer.remaining();
+
+ if (left > 0) {
+ int oldpos = buffer.position();
+
+ buffer.position(oldpos - left);
+ }
+ totalWritten += written;
+ }
+
+ // if we've completed writing this buffer we can dispose of it
+ if (buffer.remaining() == 0) {
+ bufferList.removeFirst();
+ recycleBuffer(buffer);
+ }
+
+ // if we didn't write any bytes we terminate
+ if (written == 0) {
+ break;
+ }
+ }
+
+ return totalWritten;
+ }
+
+ public int numWritableBuffers() {
+ return bufferList.size();
+ }
+
+ public void clear() {
+ flush();
+ bufferList.clear();
+ }
+
+ public void clearCache() {
+ recycledBuffers.clear();
+ }
+
+ public void clearAll() {
+ clear();
+ clearCache();
+ }
+
+ public int writableSize() {
+ Iterator<ByteBuffer> it = bufferList.iterator();
+ int size = 0;
+
+ while (it.hasNext()) {
+ size += (it.next()).remaining();
+ }
+
+ return size;
+ }
+
+ public ByteBuffer[] getWritableBuffers() {
+ flush();
+ ByteBuffer[] result = new ByteBuffer[numWritableBuffers()];
+ return bufferList.toArray(result);
+ }
+
+ private void extend() {
+ enqueueBuffer(lastBuffer);
+
+ if (recycledBuffers.empty()) {
+ lastBuffer = ByteBuffer.allocate(bufferSize);
+ } else {
+ lastBuffer = recycledBuffers.pop();
+ lastBuffer.clear();
+ }
+ }
+
+ private void enqueueBuffer(ByteBuffer buffer) {
+ buffer.flip();
+ bufferList.addLast(buffer);
+ }
+
+ private void recycleBuffer(ByteBuffer buffer) {
+ if (recycledBuffers.size() >= maxBuffers) {
+ return;
+ }
+ recycledBuffers.push(buffer);
+ }
+
+}