summaryrefslogtreecommitdiffstats
path: root/vespajlib
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahoo-inc.com>2017-05-29 14:36:47 +0200
committerArne H Juul <arnej@yahoo-inc.com>2017-05-29 15:27:31 +0200
commit4f63b1db7661e06343841ab1283e58568240afd7 (patch)
tree97cc9466650826d14032c6e1a10b83eaaeb23b08 /vespajlib
parent133e0c0984d2ed165e34090125737da76c0b20f7 (diff)
add TeeInputStream
Diffstat (limited to 'vespajlib')
-rw-r--r--vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java96
-rw-r--r--vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java82
2 files changed, 178 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java
new file mode 100644
index 00000000000..7ecd6f0ccea
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java
@@ -0,0 +1,96 @@
+package com.yahoo.io;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+/**
+ * Forwards input from a source InputStream while making a copy of it into an outputstream.
+ * Note that it also does read-ahead and copies up to 64K of data more than was used.
+ */
+class TeeInputStream extends InputStream {
+ final InputStream src;
+ final OutputStream dst;
+
+ static final int CAPACITY = 65536;
+
+ byte[] buf = new byte[CAPACITY];
+ int readPos = 0;
+ int writePos = 0;
+
+ private int inBuf() { return writePos - readPos; }
+
+ private void fillBuf(boolean block) throws IOException {
+ if (readPos == writePos) {
+ readPos = 0;
+ writePos = 0;
+ }
+ if (readPos * 3 > CAPACITY) {
+ int had = inBuf();
+ System.arraycopy(buf, readPos, buf, 0, had);
+ readPos = 0;
+ writePos = had;
+ }
+ int wantToRead = CAPACITY - writePos;
+ if (inBuf() > 0 || !block) {
+ wantToRead = Math.min(wantToRead, src.available());
+ }
+ if (wantToRead > 0) {
+ int got = src.read(buf, writePos, wantToRead);
+ if (got > 0) {
+ dst.write(buf, writePos, got);
+ writePos += got;
+ }
+ }
+ }
+
+ /** Construct a Tee */
+ public TeeInputStream(InputStream from, OutputStream to) {
+ super();
+ this.src = from;
+ this.dst = to;
+ }
+
+ public @Override int available() throws IOException {
+ return inBuf() + src.available();
+ }
+
+ public @Override void close() throws IOException {
+ fillBuf(false);
+ src.close();
+ dst.close();
+ }
+
+ public @Override void mark(int readlimit) {}
+ public @Override boolean markSupported() { return false; }
+ public @Override void reset() {}
+
+ public @Override int read() throws IOException {
+ fillBuf(true);
+ if (inBuf() > 0) {
+ int r = buf[readPos++];
+ return r & 0xff;
+ }
+ return -1;
+ }
+
+ public @Override int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ public @Override int read(byte[] b, int off, int len) throws IOException {
+ if (len <= 0) {
+ return 0;
+ }
+ fillBuf(true);
+ int had = inBuf();
+ if (had > 0) {
+ len = Math.min(len, had);
+ System.arraycopy(buf, readPos, b, off, len);
+ readPos += len;
+ return len;
+ }
+ return -1;
+ }
+
+}
diff --git a/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java
new file mode 100644
index 00000000000..315e8d5f8d4
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java
@@ -0,0 +1,82 @@
+package com.yahoo.io;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+/**
+ * @author arnej
+ */
+public class TeeInputStreamTest {
+
+ @Test
+ public void testSimpleInput() throws IOException {
+ byte[] input = "very simple input".getBytes(StandardCharsets.UTF_8);
+ ByteArrayInputStream in = new ByteArrayInputStream(input);
+ ByteArrayOutputStream gotten = new ByteArrayOutputStream();
+
+ TeeInputStream tee = new TeeInputStream(in, gotten);
+ int b = tee.read();
+ assertThat(b, is((int)'v'));
+ assertThat(gotten.toString(), is("very simple input"));
+ for (int i = 0; i < 16; i++) {
+ b = tee.read();
+ // System.out.println("got["+i+"]: "+(char)b);
+ assertThat(b, is(greaterThan(0)));
+ }
+ assertThat(tee.read(), is(-1));
+ }
+
+ private class Generator implements Runnable {
+ private OutputStream dst;
+ public Generator(OutputStream dst) { this.dst = dst; }
+ public @Override void run() {
+ for (int i = 0; i < 123456789; i++) {
+ int b = i & 0x7f;
+ if (b < 32) continue;
+ if (b > 126) b = '\n';
+ try {
+ dst.write(b);
+ } catch (IOException e) {
+ return;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testPipedInput() throws IOException {
+ PipedOutputStream input = new PipedOutputStream();
+ PipedInputStream in = new PipedInputStream(input);
+ ByteArrayOutputStream gotten = new ByteArrayOutputStream();
+ TeeInputStream tee = new TeeInputStream(in, gotten);
+ input.write("first input".getBytes(StandardCharsets.UTF_8));
+ int b = tee.read();
+ assertThat(b, is((int)'f'));
+ assertThat(gotten.toString(), is("first input"));
+ input.write(" second input".getBytes(StandardCharsets.UTF_8));
+ b = tee.read();
+ assertThat(b, is((int)'i'));
+ assertThat(gotten.toString(), is("first input second input"));
+ new Thread(new Generator(input)).start();
+ b = tee.read();
+ assertThat(b, is((int)'r'));
+ byte[] ba = new byte[9];
+ for (int i = 0; i < 12345; i++) {
+ b = tee.read();
+ // System.out.println("got["+i+"]: "+(char)b);
+ assertThat(b, is(greaterThan(0)));
+ assertThat(tee.read(ba), is(greaterThan(0)));
+ }
+ tee.close();
+ String got = gotten.toString();
+ // System.out.println("got length: "+got.length());
+ // System.out.println("got: "+got);
+ assertThat(got.length(), is(greaterThan(34567)));
+ }
+
+}