diff options
author | HÃ¥vard Pettersen <havardpe@gmail.com> | 2017-05-30 14:22:36 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-30 14:22:36 +0200 |
commit | 3c3a3a646d24f8594256986fe6a2fa71d915c289 (patch) | |
tree | 64340821f215fc617967053365a5b8cb19d69883 /vespajlib | |
parent | a9772aff3ee6ce7b1f76322b0617890545289592 (diff) | |
parent | 14953e4b25d7dfd92e38a5340b74c24c6a3f9cd7 (diff) |
Merge pull request #2562 from yahoo/arnej/add-TeeInputStream
add TeeInputStream
Diffstat (limited to 'vespajlib')
3 files changed, 219 insertions, 11 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java new file mode 100644 index 00000000000..f9e803df08f --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/io/TeeInputStream.java @@ -0,0 +1,97 @@ +package com.yahoo.io; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; + +/** + * Forwards input from a source InputStream while making a copy of it into an outputstream. + * Note that it also does read-ahead and copies up to 64K of data more than was used. + */ +class TeeInputStream extends InputStream { + final InputStream src; + final OutputStream dst; + + static final int CAPACITY = 65536; + + byte[] buf = new byte[CAPACITY]; + int readPos = 0; + int writePos = 0; + + private int inBuf() { return writePos - readPos; } + + private void fillBuf() throws IOException { + if (readPos == writePos) { + readPos = 0; + writePos = 0; + } + if (readPos * 3 > CAPACITY) { + int had = inBuf(); + System.arraycopy(buf, readPos, buf, 0, had); + readPos = 0; + writePos = had; + } + int wantToRead = CAPACITY - writePos; + if (inBuf() > 0) { + // if we have data already, do not block, read only what is available + wantToRead = Math.min(wantToRead, src.available()); + } + if (wantToRead > 0) { + int got = src.read(buf, writePos, wantToRead); + if (got > 0) { + dst.write(buf, writePos, got); + writePos += got; + } + } + } + + /** Construct a Tee */ + public TeeInputStream(InputStream from, OutputStream to) { + super(); + this.src = from; + this.dst = to; + } + + @Override + public int available() throws IOException { + return inBuf() + src.available(); + } + + @Override + public void close() throws IOException { + src.close(); + dst.close(); + } + + @Override + public int read() throws IOException { + fillBuf(); + if (inBuf() > 0) { + int r = buf[readPos++]; + return r & 0xff; + } + return -1; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (len <= 0) { + return 0; + } + fillBuf(); + int had = inBuf(); + if (had > 0) { + len = Math.min(len, had); + System.arraycopy(buf, readPos, b, off, len); + readPos += len; + return len; + } + return -1; + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java b/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java index 65201a02bae..7b93429acdb 100644 --- a/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java +++ b/vespajlib/src/main/java/com/yahoo/io/reader/NamedReader.java @@ -28,21 +28,32 @@ public class NamedReader extends Reader { public Reader getReader() { return reader; } /** Returns the name */ - public @Override String toString() { + @Override + public String toString() { return name; } // The rest is reader method implementations which delegates to the wrapped reader - public @Override int read(java.nio.CharBuffer charBuffer) throws java.io.IOException { return reader.read(charBuffer); } - public @Override int read() throws java.io.IOException { return reader.read(); } - public @Override int read(char[] chars) throws java.io.IOException { return reader.read(chars); } - public @Override int read(char[] chars, int i, int i1) throws java.io.IOException { return reader.read(chars,i,i1); } - public @Override long skip(long l) throws java.io.IOException { return reader.skip(l); } - public @Override boolean ready() throws java.io.IOException { return reader.ready(); } - public @Override boolean markSupported() { return reader.markSupported(); } - public @Override void mark(int i) throws java.io.IOException { reader.mark(i); } - public @Override void reset() throws java.io.IOException { reader.reset(); } - public @Override void close() throws java.io.IOException { reader.close(); } + @Override + public int read(java.nio.CharBuffer charBuffer) throws java.io.IOException { return reader.read(charBuffer); } + @Override + public int read() throws java.io.IOException { return reader.read(); } + @Override + public int read(char[] chars) throws java.io.IOException { return reader.read(chars); } + @Override + public int read(char[] chars, int i, int i1) throws java.io.IOException { return reader.read(chars,i,i1); } + @Override + public long skip(long l) throws java.io.IOException { return reader.skip(l); } + @Override + public boolean ready() throws java.io.IOException { return reader.ready(); } + @Override + public boolean markSupported() { return reader.markSupported(); } + @Override + public void mark(int i) throws java.io.IOException { reader.mark(i); } + @Override + public void reset() throws java.io.IOException { reader.reset(); } + @Override + public void close() throws java.io.IOException { reader.close(); } /** Convenience method for closing a list of readers. Does nothing if the given reader list is null. */ public static void closeAll(List<NamedReader> readers) { diff --git a/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java new file mode 100644 index 00000000000..ac438d8645b --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/io/TeeInputStreamTest.java @@ -0,0 +1,100 @@ +package com.yahoo.io; + +import java.io.*; +import java.nio.charset.StandardCharsets; + +import org.junit.Test; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.*; + +/** + * @author arnej + */ +public class TeeInputStreamTest { + + @Test + public void testSimpleInput() throws IOException { + byte[] input = "very simple input".getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream in = new ByteArrayInputStream(input); + ByteArrayOutputStream gotten = new ByteArrayOutputStream(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + TeeInputStream tee = new TeeInputStream(in, gotten); + int b = tee.read(); + assertThat(b, is((int)'v')); + output.write(b); + assertThat(gotten.toString(), is("very simple input")); + for (int i = 0; i < 16; i++) { + b = tee.read(); + // System.out.println("got["+i+"]: "+(char)b); + assertThat(b, is(greaterThan(0))); + output.write(b); + } + assertThat(tee.read(), is(-1)); + assertThat(gotten.toString(), is("very simple input")); + assertThat(output.toString(), is("very simple input")); + } + + private class Generator implements Runnable { + private OutputStream dst; + public Generator(OutputStream dst) { this.dst = dst; } + @Override + public void run() { + for (int i = 0; i < 123456789; i++) { + int b = i & 0x7f; + if (b < 32) continue; + if (b > 126) b = '\n'; + try { + dst.write(b); + } catch (IOException e) { + return; + } + } + } + } + + @Test + public void testPipedInput() throws IOException { + PipedOutputStream input = new PipedOutputStream(); + PipedInputStream in = new PipedInputStream(input); + ByteArrayOutputStream gotten = new ByteArrayOutputStream(); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + TeeInputStream tee = new TeeInputStream(in, gotten); + input.write("first input".getBytes(StandardCharsets.UTF_8)); + int b = tee.read(); + assertThat(b, is((int)'f')); + output.write(b); + assertThat(gotten.toString(), is("first input")); + input.write(" second input".getBytes(StandardCharsets.UTF_8)); + b = tee.read(); + assertThat(b, is((int)'i')); + output.write(b); + assertThat(gotten.toString(), is("first input second input")); + new Thread(new Generator(input)).start(); + b = tee.read(); + assertThat(b, is((int)'r')); + output.write(b); + byte[] ba = new byte[9]; + for (int i = 0; i < 12345; i++) { + b = tee.read(); + // System.out.println("got["+i+"]: "+(char)b); + assertThat(b, is(greaterThan(0))); + output.write(b); + int l = tee.read(ba); + assertThat(l, is(greaterThan(0))); + output.write(ba, 0, l); + l = tee.read(ba, 3, 3); + assertThat(l, is(greaterThan(0))); + output.write(ba, 3, l); + } + tee.close(); + String got = gotten.toString(); + // System.out.println("got length: "+got.length()); + // System.out.println("output length: "+output.toString().length()); + // System.out.println("got: "+got); + assertThat(got.length(), is(greaterThan(34567))); + assertTrue(got.startsWith(output.toString())); + } + +} |