diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-18 11:22:25 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-18 11:22:25 +0000 |
commit | c6afcd087b79dabb47db611881c4d0f96a980d15 (patch) | |
tree | 23747c556a89a28c147abd77325411e516866da9 | |
parent | b04f090f8c55eb2906e890ca8260d77e27e6ccb3 (diff) |
Add suppport for mark/reset.
5 files changed, 160 insertions, 61 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java index 748c2951a6a..1662ed5b46a 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Objects; @@ -19,6 +20,8 @@ public class UnsafeContentInputStream extends InputStream { private final ReadableContentChannel content; private ByteBuffer buf = ByteBuffer.allocate(0); + private byte [] marked; + private int readSinceMarked; /** * <p>Constructs a new ContentInputStream that reads from the given {@link ReadableContentChannel}.</p> @@ -37,7 +40,15 @@ public class UnsafeContentInputStream extends InputStream { if (buf == null) { return -1; } - return ((int)buf.get()) & 0xFF; + byte b = buf.get(); + if (marked != null) { + if (readSinceMarked < marked.length) { + marked[readSinceMarked++] = b; + } else { + marked = null; + } + } + return ((int)b) & 0xFF; } @Override @@ -79,4 +90,28 @@ public class UnsafeContentInputStream extends InputStream { } } + + @Override + public synchronized void mark(int readlimit) { + marked = new byte[readlimit]; + readSinceMarked = 0; + } + + @Override + public synchronized void reset() throws IOException { + if (marked == null) { + throw new IOException("mark has not been called, or too much has been read since marked."); + } + ByteBuffer newBuf = ByteBuffer.allocate(readSinceMarked + buf.remaining()); + newBuf.put(marked, 0, readSinceMarked); + newBuf.put(buf); + newBuf.flip(); + buf = newBuf; + marked = null; + } + + @Override + public boolean markSupported() { + return true; + } } diff --git a/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java b/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java index c00fab6cb56..c96450c1bd2 100644 --- a/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java +++ b/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; +import com.yahoo.text.Utf8; import org.junit.Test; import java.io.BufferedReader; @@ -8,11 +9,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.ByteBuffer; -import java.util.concurrent.Future; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; /** * @author Simon Thoresen Hult @@ -32,7 +33,37 @@ public class UnsafeContentInputStreamTestCase { assertNull(reader.readLine()); } - @SuppressWarnings("deprecation") + @Test + public void testMark() throws IOException { + BufferedContentChannel channel = new BufferedContentChannel(); + FastContentWriter writer = new FastContentWriter(channel); + writer.write("Hello "); + writer.write("World!"); + writer.close(); + + InputStream stream = asInputStream(channel); + assertTrue(stream.markSupported()); + int first = stream.read(); + assertEquals('H', first); + stream.mark(10); + byte [] buf = new byte[8]; + stream.read(buf); + assertEquals("ello Wor", Utf8.toString(buf)); + stream.reset(); + stream.mark(5); + buf = new byte [9]; + stream.read(buf); + assertEquals("ello Worl", Utf8.toString(buf)); + try { + stream.reset(); + fail("UnsafeContentInputStream.reset expected to fail when your read past readLimit."); + } catch (IOException e) { + assertEquals("mark has not been called, or too much has been read since marked.", e.getMessage()); + } catch (Throwable t) { + fail("Did not expect " + t); + } + } + @Test public void requireThatCompletionsAreCalledWithDeprecatedContentWriter() throws IOException { BufferedContentChannel channel = new BufferedContentChannel(); @@ -63,7 +94,6 @@ public class UnsafeContentInputStreamTestCase { assertTrue(writer.isDone()); } - @SuppressWarnings("deprecation") @Test public void requireThatCloseDrainsStreamWithDeprecatedContentWriter() { BufferedContentChannel channel = new BufferedContentChannel(); diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java index 419e61fa1c0..749f569d718 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java @@ -16,6 +16,7 @@ import java.io.InputStream; * @author dybis */ public class FeedReaderFactory { + private static final int MARK_READLIMIT = 200; /** * Creates FeedReader @@ -30,17 +31,17 @@ public class FeedReaderFactory { FeedParams.DataFormat dataFormat) { switch (dataFormat) { case XML_UTF8: + byte [] peek = new byte[MARK_READLIMIT]; + int bytesPeeked = 0; try { + if (inputStream.markSupported()) { + inputStream.mark(MARK_READLIMIT); + bytesPeeked = inputStream.read(peek); + inputStream.reset(); + } return new VespaXMLFeedReader(inputStream, docTypeManager); } catch (Exception e) { - byte [] peek; - try { - peek = new byte[Math.min(200, inputStream.available())]; - inputStream.read(peek); - } catch (IOException io) { - peek = new byte [0]; - } - throw new RuntimeException("Could not create VespaXMLFeedReader. First characters are: " + Utf8.toString(peek), e); + throw new RuntimeException("Could not create VespaXMLFeedReader. First characters are: '" + Utf8.toString(peek, 0, bytesPeeked) + "'", e); } case JSON_UTF8: return new JsonFeedReader(inputStream, docTypeManager); diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java index c9f255f026e..c8ae79deebd 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java @@ -13,6 +13,7 @@ public class ByteLimitedInputStream extends InputStream { private final InputStream wrappedStream; private int remaining; + private int remainingWhenMarked; public ByteLimitedInputStream(InputStream wrappedStream, int limit) { this.wrappedStream = wrappedStream; @@ -78,4 +79,21 @@ public class ByteLimitedInputStream extends InputStream { } } + @Override + public synchronized void mark(int readlimit) { + wrappedStream.mark(readlimit); + remainingWhenMarked = remaining; + } + + @Override + public synchronized void reset() throws IOException { + wrappedStream.reset(); + remaining = remainingWhenMarked; + } + + @Override + public boolean markSupported() { + return wrappedStream.markSupported(); + } + } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java index 3aa3cdcb3a8..3dd8145ec73 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java @@ -8,8 +8,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> @@ -29,63 +29,78 @@ public class ByteLimitedInputStreamTestCase { public void requireThatBasicsWork() throws IOException { ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); - assertThat(stream.available(), is(9)); - assertThat(stream.read(), is(97)); - assertThat(stream.available(), is(8)); - assertThat(stream.read(), is(98)); - assertThat(stream.available(), is(7)); - assertThat(stream.read(), is(99)); - assertThat(stream.available(), is(6)); - assertThat(stream.read(), is(100)); - assertThat(stream.available(), is(5)); - assertThat(stream.read(), is(101)); - assertThat(stream.available(), is(4)); - assertThat(stream.read(), is(102)); - assertThat(stream.available(), is(3)); - assertThat(stream.read(), is(103)); - assertThat(stream.available(), is(2)); - assertThat(stream.read(), is(104)); - assertThat(stream.available(), is(1)); - assertThat(stream.read(), is(105)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); + assertEquals(9, stream.available()); + assertEquals(97, stream.read()); + assertEquals(8, stream.available()); + assertEquals(98, stream.read()); + assertEquals(7, stream.available()); + assertEquals(99, stream.read()); + assertEquals(6, stream.available()); + assertEquals(100, stream.read()); + assertEquals(5, stream.available()); + assertEquals(101, stream.read()); + assertEquals(4, stream.available()); + assertEquals(102, stream.read()); + assertEquals(3, stream.available()); + assertEquals(103, stream.read()); + assertEquals(2, stream.available()); + assertEquals(104, stream.read()); + assertEquals(1, stream.available()); + assertEquals(105, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); } @Test public void requireThatChunkedReadWorks() throws IOException { ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); - assertThat(stream.available(), is(9)); + assertEquals(9, stream.available()); byte[] toBuf = new byte[4]; - assertThat(stream.read(toBuf), is(4)); - assertThat(toBuf[0], is((byte) 97)); - assertThat(toBuf[1], is((byte) 98)); - assertThat(toBuf[2], is((byte) 99)); - assertThat(toBuf[3], is((byte) 100)); - assertThat(stream.available(), is(5)); + assertEquals(4, stream.read(toBuf)); + assertEquals(97, toBuf[0]); + assertEquals(98, toBuf[1]); + assertEquals(99, toBuf[2]); + assertEquals(100, toBuf[3]); + assertEquals(5, stream.available()); - assertThat(stream.read(toBuf), is(4)); - assertThat(toBuf[0], is((byte) 101)); - assertThat(toBuf[1], is((byte) 102)); - assertThat(toBuf[2], is((byte) 103)); - assertThat(toBuf[3], is((byte) 104)); - assertThat(stream.available(), is(1)); + assertEquals(4, stream.read(toBuf)); + assertEquals(101, toBuf[0]); + assertEquals(102, toBuf[1]); + assertEquals(103, toBuf[2]); + assertEquals(104, toBuf[3]); + assertEquals(1, stream.available()); - assertThat(stream.read(toBuf), is(1)); - assertThat(toBuf[0], is((byte) 105)); - assertThat(stream.available(), is(0)); + assertEquals(1, stream.read(toBuf)); + assertEquals(105, toBuf[0]); + assertEquals(0, stream.available()); - assertThat(stream.read(toBuf), is(-1)); - assertThat(stream.available(), is(0)); + assertEquals(-1, stream.read(toBuf)); + assertEquals(0, stream.available()); + } + + @Test + public void requireMarkWorks() throws IOException { + InputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); + assertEquals(97, stream.read()); + assertTrue(stream.markSupported()); + stream.mark(5); + assertEquals(98, stream.read()); + assertEquals(99, stream.read()); + stream.reset(); + assertEquals(98, stream.read()); + assertEquals(99, stream.read()); + assertEquals(100, stream.read()); + assertEquals(101, stream.read()); } } |