summaryrefslogtreecommitdiffstats
path: root/vespajlib
diff options
context:
space:
mode:
authorHÃ¥vard Pettersen <havardpe@gmail.com>2017-05-30 14:22:36 +0200
committerGitHub <noreply@github.com>2017-05-30 14:22:36 +0200
commit3c3a3a646d24f8594256986fe6a2fa71d915c289 (patch)
tree64340821f215fc617967053365a5b8cb19d69883 /vespajlib
parenta9772aff3ee6ce7b1f76322b0617890545289592 (diff)
parent14953e4b25d7dfd92e38a5340b74c24c6a3f9cd7 (diff)
Merge pull request #2562 from yahoo/arnej/add-TeeInputStream
add TeeInputStream
Diffstat (limited to 'vespajlib')
-rw-r--r--vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java97
-rw-r--r--vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java33
-rw-r--r--vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java100
3 files changed, 219 insertions, 11 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java
new file mode 100644
index 00000000000..f9e803df08f
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java
@@ -0,0 +1,97 @@
+package com.yahoo.io;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+/**
+ * Forwards input from a source InputStream while making a copy of it into an outputstream.
+ * Note that it also does read-ahead and copies up to 64K of data more than was used.
+ */
+class TeeInputStream extends InputStream {
+ final InputStream src;
+ final OutputStream dst;
+
+ static final int CAPACITY = 65536;
+
+ byte[] buf = new byte[CAPACITY];
+ int readPos = 0;
+ int writePos = 0;
+
+ private int inBuf() { return writePos - readPos; }
+
+ private void fillBuf() throws IOException {
+ if (readPos == writePos) {
+ readPos = 0;
+ writePos = 0;
+ }
+ if (readPos * 3 > CAPACITY) {
+ int had = inBuf();
+ System.arraycopy(buf, readPos, buf, 0, had);
+ readPos = 0;
+ writePos = had;
+ }
+ int wantToRead = CAPACITY - writePos;
+ if (inBuf() > 0) {
+ // if we have data already, do not block, read only what is available
+ wantToRead = Math.min(wantToRead, src.available());
+ }
+ if (wantToRead > 0) {
+ int got = src.read(buf, writePos, wantToRead);
+ if (got > 0) {
+ dst.write(buf, writePos, got);
+ writePos += got;
+ }
+ }
+ }
+
+ /** Construct a Tee */
+ public TeeInputStream(InputStream from, OutputStream to) {
+ super();
+ this.src = from;
+ this.dst = to;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return inBuf() + src.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ src.close();
+ dst.close();
+ }
+
+ @Override
+ public int read() throws IOException {
+ fillBuf();
+ if (inBuf() > 0) {
+ int r = buf[readPos++];
+ return r & 0xff;
+ }
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (len <= 0) {
+ return 0;
+ }
+ fillBuf();
+ int had = inBuf();
+ if (had > 0) {
+ len = Math.min(len, had);
+ System.arraycopy(buf, readPos, b, off, len);
+ readPos += len;
+ return len;
+ }
+ return -1;
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java b/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java
index 65201a02bae..7b93429acdb 100644
--- a/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java
+++ b/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java
@@ -28,21 +28,32 @@ public class NamedReader extends Reader {
public Reader getReader() { return reader; }
/** Returns the name */
- public @Override String toString() {
+ @Override
+ public String toString() {
return name;
}
// The rest is reader method implementations which delegates to the wrapped reader
- public @Override int read(java.nio.CharBuffer charBuffer) throws java.io.IOException { return reader.read(charBuffer); }
- public @Override int read() throws java.io.IOException { return reader.read(); }
- public @Override int read(char[] chars) throws java.io.IOException { return reader.read(chars); }
- public @Override int read(char[] chars, int i, int i1) throws java.io.IOException { return reader.read(chars,i,i1); }
- public @Override long skip(long l) throws java.io.IOException { return reader.skip(l); }
- public @Override boolean ready() throws java.io.IOException { return reader.ready(); }
- public @Override boolean markSupported() { return reader.markSupported(); }
- public @Override void mark(int i) throws java.io.IOException { reader.mark(i); }
- public @Override void reset() throws java.io.IOException { reader.reset(); }
- public @Override void close() throws java.io.IOException { reader.close(); }
+ @Override
+ public int read(java.nio.CharBuffer charBuffer) throws java.io.IOException { return reader.read(charBuffer); }
+ @Override
+ public int read() throws java.io.IOException { return reader.read(); }
+ @Override
+ public int read(char[] chars) throws java.io.IOException { return reader.read(chars); }
+ @Override
+ public int read(char[] chars, int i, int i1) throws java.io.IOException { return reader.read(chars,i,i1); }
+ @Override
+ public long skip(long l) throws java.io.IOException { return reader.skip(l); }
+ @Override
+ public boolean ready() throws java.io.IOException { return reader.ready(); }
+ @Override
+ public boolean markSupported() { return reader.markSupported(); }
+ @Override
+ public void mark(int i) throws java.io.IOException { reader.mark(i); }
+ @Override
+ public void reset() throws java.io.IOException { reader.reset(); }
+ @Override
+ public void close() throws java.io.IOException { reader.close(); }
/** Convenience method for closing a list of readers. Does nothing if the given reader list is null. */
public static void closeAll(List<NamedReader> readers) {
diff --git a/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java
new file mode 100644
index 00000000000..ac438d8645b
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java
@@ -0,0 +1,100 @@
+package com.yahoo.io;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+/**
+ * @author arnej
+ */
+public class TeeInputStreamTest {
+
+ @Test
+ public void testSimpleInput() throws IOException {
+ byte[] input = "very simple input".getBytes(StandardCharsets.UTF_8);
+ ByteArrayInputStream in = new ByteArrayInputStream(input);
+ ByteArrayOutputStream gotten = new ByteArrayOutputStream();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ TeeInputStream tee = new TeeInputStream(in, gotten);
+ int b = tee.read();
+ assertThat(b, is((int)'v'));
+ output.write(b);
+ assertThat(gotten.toString(), is("very simple input"));
+ for (int i = 0; i < 16; i++) {
+ b = tee.read();
+ // System.out.println("got["+i+"]: "+(char)b);
+ assertThat(b, is(greaterThan(0)));
+ output.write(b);
+ }
+ assertThat(tee.read(), is(-1));
+ assertThat(gotten.toString(), is("very simple input"));
+ assertThat(output.toString(), is("very simple input"));
+ }
+
+ private class Generator implements Runnable {
+ private OutputStream dst;
+ public Generator(OutputStream dst) { this.dst = dst; }
+ @Override
+ public void run() {
+ for (int i = 0; i < 123456789; i++) {
+ int b = i & 0x7f;
+ if (b < 32) continue;
+ if (b > 126) b = '\n';
+ try {
+ dst.write(b);
+ } catch (IOException e) {
+ return;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testPipedInput() throws IOException {
+ PipedOutputStream input = new PipedOutputStream();
+ PipedInputStream in = new PipedInputStream(input);
+ ByteArrayOutputStream gotten = new ByteArrayOutputStream();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ TeeInputStream tee = new TeeInputStream(in, gotten);
+ input.write("first input".getBytes(StandardCharsets.UTF_8));
+ int b = tee.read();
+ assertThat(b, is((int)'f'));
+ output.write(b);
+ assertThat(gotten.toString(), is("first input"));
+ input.write(" second input".getBytes(StandardCharsets.UTF_8));
+ b = tee.read();
+ assertThat(b, is((int)'i'));
+ output.write(b);
+ assertThat(gotten.toString(), is("first input second input"));
+ new Thread(new Generator(input)).start();
+ b = tee.read();
+ assertThat(b, is((int)'r'));
+ output.write(b);
+ byte[] ba = new byte[9];
+ for (int i = 0; i < 12345; i++) {
+ b = tee.read();
+ // System.out.println("got["+i+"]: "+(char)b);
+ assertThat(b, is(greaterThan(0)));
+ output.write(b);
+ int l = tee.read(ba);
+ assertThat(l, is(greaterThan(0)));
+ output.write(ba, 0, l);
+ l = tee.read(ba, 3, 3);
+ assertThat(l, is(greaterThan(0)));
+ output.write(ba, 3, l);
+ }
+ tee.close();
+ String got = gotten.toString();
+ // System.out.println("got length: "+got.length());
+ // System.out.println("output length: "+output.toString().length());
+ // System.out.println("got: "+got);
+ assertThat(got.length(), is(greaterThan(34567)));
+ assertTrue(got.startsWith(output.toString()));
+ }
+
+}