diff options
author | Arne H Juul <arnej@yahoo-inc.com> | 2017-05-29 14:36:47 +0200 |
---|---|---|
committer | Arne H Juul <arnej@yahoo-inc.com> | 2017-05-29 15:27:31 +0200 |
commit | 4f63b1db7661e06343841ab1283e58568240afd7 (patch) | |
tree | 97cc9466650826d14032c6e1a10b83eaaeb23b08 /vespajlib | |
parent | 133e0c0984d2ed165e34090125737da76c0b20f7 (diff) |
add TeeInputStream
Diffstat (limited to 'vespajlib')
-rw-r--r-- | vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java | 96 | ||||
-rw-r--r-- | vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java | 82 |
2 files changed, 178 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java new file mode 100644 index 00000000000..7ecd6f0ccea --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java @@ -0,0 +1,96 @@ +package com.yahoo.io; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; + +/** + * Forwards input from a source InputStream while making a copy of it into an outputstream. + * Note that it also does read-ahead and copies up to 64K of data more than was used. + */ +class TeeInputStream extends InputStream { + final InputStream src; + final OutputStream dst; + + static final int CAPACITY = 65536; + + byte[] buf = new byte[CAPACITY]; + int readPos = 0; + int writePos = 0; + + private int inBuf() { return writePos - readPos; } + + private void fillBuf(boolean block) throws IOException { + if (readPos == writePos) { + readPos = 0; + writePos = 0; + } + if (readPos * 3 > CAPACITY) { + int had = inBuf(); + System.arraycopy(buf, readPos, buf, 0, had); + readPos = 0; + writePos = had; + } + int wantToRead = CAPACITY - writePos; + if (inBuf() > 0 || !block) { + wantToRead = Math.min(wantToRead, src.available()); + } + if (wantToRead > 0) { + int got = src.read(buf, writePos, wantToRead); + if (got > 0) { + dst.write(buf, writePos, got); + writePos += got; + } + } + } + + /** Construct a Tee */ + public TeeInputStream(InputStream from, OutputStream to) { + super(); + this.src = from; + this.dst = to; + } + + public @Override int available() throws IOException { + return inBuf() + src.available(); + } + + public @Override void close() throws IOException { + fillBuf(false); + src.close(); + dst.close(); + } + + public @Override void mark(int readlimit) {} + public @Override boolean markSupported() { return false; } + public @Override void reset() {} + + public @Override int read() throws IOException { + fillBuf(true); + if (inBuf() > 0) { + int r = buf[readPos++]; + return r & 0xff; + } + return -1; + } + + public @Override int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public @Override int read(byte[] b, int off, int len) throws IOException { + if (len <= 0) { + return 0; + } + fillBuf(true); + int had = inBuf(); + if (had > 0) { + len = Math.min(len, had); + System.arraycopy(buf, readPos, b, off, len); + readPos += len; + return len; + } + return -1; + } + +} diff --git a/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java new file mode 100644 index 00000000000..315e8d5f8d4 --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java @@ -0,0 +1,82 @@ +package com.yahoo.io; + +import java.io.*; +import java.nio.charset.StandardCharsets; + +import org.junit.Test; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.*; + +/** + * @author arnej + */ +public class TeeInputStreamTest { + + @Test + public void testSimpleInput() throws IOException { + byte[] input = "very simple input".getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream in = new ByteArrayInputStream(input); + ByteArrayOutputStream gotten = new ByteArrayOutputStream(); + + TeeInputStream tee = new TeeInputStream(in, gotten); + int b = tee.read(); + assertThat(b, is((int)'v')); + assertThat(gotten.toString(), is("very simple input")); + for (int i = 0; i < 16; i++) { + b = tee.read(); + // System.out.println("got["+i+"]: "+(char)b); + assertThat(b, is(greaterThan(0))); + } + assertThat(tee.read(), is(-1)); + } + + private class Generator implements Runnable { + private OutputStream dst; + public Generator(OutputStream dst) { this.dst = dst; } + public @Override void run() { + for (int i = 0; i < 123456789; i++) { + int b = i & 0x7f; + if (b < 32) continue; + if (b > 126) b = '\n'; + try { + dst.write(b); + } catch (IOException e) { + return; + } + } + } + } + + @Test + public void testPipedInput() throws IOException { + PipedOutputStream input = new PipedOutputStream(); + PipedInputStream in = new PipedInputStream(input); + ByteArrayOutputStream gotten = new ByteArrayOutputStream(); + TeeInputStream tee = new TeeInputStream(in, gotten); + input.write("first input".getBytes(StandardCharsets.UTF_8)); + int b = tee.read(); + assertThat(b, is((int)'f')); + assertThat(gotten.toString(), is("first input")); + input.write(" second input".getBytes(StandardCharsets.UTF_8)); + b = tee.read(); + assertThat(b, is((int)'i')); + assertThat(gotten.toString(), is("first input second input")); + new Thread(new Generator(input)).start(); + b = tee.read(); + assertThat(b, is((int)'r')); + byte[] ba = new byte[9]; + for (int i = 0; i < 12345; i++) { + b = tee.read(); + // System.out.println("got["+i+"]: "+(char)b); + assertThat(b, is(greaterThan(0))); + assertThat(tee.read(ba), is(greaterThan(0))); + } + tee.close(); + String got = gotten.toString(); + // System.out.println("got length: "+got.length()); + // System.out.println("got: "+got); + assertThat(got.length(), is(greaterThan(34567))); + } + +} |