diff options
8 files changed, 216 insertions, 55 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java index 748c2951a6a..1662ed5b46a 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/handler/UnsafeContentInputStream.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Objects; @@ -19,6 +20,8 @@ public class UnsafeContentInputStream extends InputStream { private final ReadableContentChannel content; private ByteBuffer buf = ByteBuffer.allocate(0); + private byte [] marked; + private int readSinceMarked; /** * <p>Constructs a new ContentInputStream that reads from the given {@link ReadableContentChannel}.</p> @@ -37,7 +40,15 @@ public class UnsafeContentInputStream extends InputStream { if (buf == null) { return -1; } - return ((int)buf.get()) & 0xFF; + byte b = buf.get(); + if (marked != null) { + if (readSinceMarked < marked.length) { + marked[readSinceMarked++] = b; + } else { + marked = null; + } + } + return ((int)b) & 0xFF; } @Override @@ -79,4 +90,28 @@ public class UnsafeContentInputStream extends InputStream { } } + + @Override + public synchronized void mark(int readlimit) { + marked = new byte[readlimit]; + readSinceMarked = 0; + } + + @Override + public synchronized void reset() throws IOException { + if (marked == null) { + throw new IOException("mark has not been called, or too much has been read since marked."); + } + ByteBuffer newBuf = ByteBuffer.allocate(readSinceMarked + buf.remaining()); + newBuf.put(marked, 0, readSinceMarked); + newBuf.put(buf); + newBuf.flip(); + buf = newBuf; + marked = null; + } + + @Override + public boolean markSupported() { + return true; + } } diff --git a/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java b/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java index c00fab6cb56..c96450c1bd2 100644 --- a/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java +++ b/jdisc_core/src/test/java/com/yahoo/jdisc/handler/UnsafeContentInputStreamTestCase.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.handler; +import com.yahoo.text.Utf8; import org.junit.Test; import java.io.BufferedReader; @@ -8,11 +9,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.ByteBuffer; -import java.util.concurrent.Future; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; /** * @author Simon Thoresen Hult @@ -32,7 +33,37 @@ public class UnsafeContentInputStreamTestCase { assertNull(reader.readLine()); } - @SuppressWarnings("deprecation") + @Test + public void testMark() throws IOException { + BufferedContentChannel channel = new BufferedContentChannel(); + FastContentWriter writer = new FastContentWriter(channel); + writer.write("Hello "); + writer.write("World!"); + writer.close(); + + InputStream stream = asInputStream(channel); + assertTrue(stream.markSupported()); + int first = stream.read(); + assertEquals('H', first); + stream.mark(10); + byte [] buf = new byte[8]; + stream.read(buf); + assertEquals("ello Wor", Utf8.toString(buf)); + stream.reset(); + stream.mark(5); + buf = new byte [9]; + stream.read(buf); + assertEquals("ello Worl", Utf8.toString(buf)); + try { + stream.reset(); + fail("UnsafeContentInputStream.reset expected to fail when your read past readLimit."); + } catch (IOException e) { + assertEquals("mark has not been called, or too much has been read since marked.", e.getMessage()); + } catch (Throwable t) { + fail("Did not expect " + t); + } + } + @Test public void requireThatCompletionsAreCalledWithDeprecatedContentWriter() throws IOException { BufferedContentChannel channel = new BufferedContentChannel(); @@ -63,7 +94,6 @@ public class UnsafeContentInputStreamTestCase { assertTrue(writer.isDone()); } - @SuppressWarnings("deprecation") @Test public void requireThatCloseDrainsStreamWithDeprecatedContentWriter() { BufferedContentChannel channel = new BufferedContentChannel(); diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java index 03916949cae..a932ca935e0 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java @@ -84,7 +84,7 @@ public class FeedHandlerV3 extends LoggingRequestHandler { SourceSessionParams sourceSessionParams = sourceSessionParams(request); clientFeederByClientId.put(clientId, new ClientFeederV3(retainSource(sessionCache, sourceSessionParams), - new FeedReaderFactory(), + new FeedReaderFactory(true), //TODO make error debugging configurable docTypeManager, clientId, metric, diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java index 6a3229e86b7..81b08d5fb25 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedReaderFactory.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.http.server; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.json.JsonFeedReader; +import com.yahoo.text.Utf8; import com.yahoo.vespa.http.client.config.FeedParams; import com.yahoo.vespaxmlparser.FeedReader; import com.yahoo.vespaxmlparser.VespaXMLFeedReader; @@ -14,6 +15,12 @@ import java.io.InputStream; * @author dybis */ public class FeedReaderFactory { + private static final int MARK_READLIMIT = 200; + + private final boolean debug; + public FeedReaderFactory(boolean debug) { + this.debug = debug; + } /** * Creates FeedReader @@ -28,10 +35,22 @@ public class FeedReaderFactory { FeedParams.DataFormat dataFormat) { switch (dataFormat) { case XML_UTF8: + byte [] peek = null; + int bytesPeeked = 0; try { + if (debug && inputStream.markSupported()) { + peek = new byte[MARK_READLIMIT]; + inputStream.mark(MARK_READLIMIT); + bytesPeeked = inputStream.read(peek); + inputStream.reset(); + } return new VespaXMLFeedReader(inputStream, docTypeManager); } catch (Exception e) { - throw new RuntimeException("Could not create VespaXMLFeedReader", e); + if (bytesPeeked > 0) { + throw new RuntimeException("Could not create VespaXMLFeedReader. First characters are: '" + Utf8.toString(peek, 0, bytesPeeked) + "'", e); + } else { + throw new RuntimeException("Could not create VespaXMLFeedReader.", e); + } } case JSON_UTF8: return new JsonFeedReader(inputStream, docTypeManager); diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java index c9f255f026e..c8ae79deebd 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStream.java @@ -13,6 +13,7 @@ public class ByteLimitedInputStream extends InputStream { private final InputStream wrappedStream; private int remaining; + private int remainingWhenMarked; public ByteLimitedInputStream(InputStream wrappedStream, int limit) { this.wrappedStream = wrappedStream; @@ -78,4 +79,21 @@ public class ByteLimitedInputStream extends InputStream { } } + @Override + public synchronized void mark(int readlimit) { + wrappedStream.mark(readlimit); + remainingWhenMarked = remaining; + } + + @Override + public synchronized void reset() throws IOException { + wrappedStream.reset(); + remaining = remainingWhenMarked; + } + + @Override + public boolean markSupported() { + return wrappedStream.markSupported(); + } + } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java new file mode 100644 index 00000000000..47f057013b7 --- /dev/null +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedReaderFactoryTestCase.java @@ -0,0 +1,40 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.server; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.text.Utf8; +import com.yahoo.vespa.http.client.config.FeedParams; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FeedReaderFactoryTestCase { + DocumentTypeManager manager = new DocumentTypeManager(); + + private InputStream createStream(String s) { + return new ByteArrayInputStream(Utf8.toBytes(s)); + } + + @Test + public void testXmlExceptionWithDebug() { + try { + new FeedReaderFactory(true).createReader(createStream("Some malformed xml"), manager, FeedParams.DataFormat.XML_UTF8); + fail(); + } catch (RuntimeException e) { + assertEquals("Could not create VespaXMLFeedReader. First characters are: 'Some malformed xml'", e.getMessage()); + } + } + @Test + public void testXmlException() { + try { + new FeedReaderFactory(false).createReader(createStream("Some malformed xml"), manager, FeedParams.DataFormat.XML_UTF8); + fail(); + } catch (RuntimeException e) { + assertEquals("Could not create VespaXMLFeedReader.", e.getMessage()); + } + } +} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java index 3aa3cdcb3a8..3dd8145ec73 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/util/ByteLimitedInputStreamTestCase.java @@ -8,8 +8,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> @@ -29,63 +29,78 @@ public class ByteLimitedInputStreamTestCase { public void requireThatBasicsWork() throws IOException { ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); - assertThat(stream.available(), is(9)); - assertThat(stream.read(), is(97)); - assertThat(stream.available(), is(8)); - assertThat(stream.read(), is(98)); - assertThat(stream.available(), is(7)); - assertThat(stream.read(), is(99)); - assertThat(stream.available(), is(6)); - assertThat(stream.read(), is(100)); - assertThat(stream.available(), is(5)); - assertThat(stream.read(), is(101)); - assertThat(stream.available(), is(4)); - assertThat(stream.read(), is(102)); - assertThat(stream.available(), is(3)); - assertThat(stream.read(), is(103)); - assertThat(stream.available(), is(2)); - assertThat(stream.read(), is(104)); - assertThat(stream.available(), is(1)); - assertThat(stream.read(), is(105)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); - assertThat(stream.read(), is(-1)); - assertThat(stream.available(), is(0)); + assertEquals(9, stream.available()); + assertEquals(97, stream.read()); + assertEquals(8, stream.available()); + assertEquals(98, stream.read()); + assertEquals(7, stream.available()); + assertEquals(99, stream.read()); + assertEquals(6, stream.available()); + assertEquals(100, stream.read()); + assertEquals(5, stream.available()); + assertEquals(101, stream.read()); + assertEquals(4, stream.available()); + assertEquals(102, stream.read()); + assertEquals(3, stream.available()); + assertEquals(103, stream.read()); + assertEquals(2, stream.available()); + assertEquals(104, stream.read()); + assertEquals(1, stream.available()); + assertEquals(105, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); + assertEquals(-1, stream.read()); + assertEquals(0, stream.available()); } @Test public void requireThatChunkedReadWorks() throws IOException { ByteLimitedInputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); - assertThat(stream.available(), is(9)); + assertEquals(9, stream.available()); byte[] toBuf = new byte[4]; - assertThat(stream.read(toBuf), is(4)); - assertThat(toBuf[0], is((byte) 97)); - assertThat(toBuf[1], is((byte) 98)); - assertThat(toBuf[2], is((byte) 99)); - assertThat(toBuf[3], is((byte) 100)); - assertThat(stream.available(), is(5)); + assertEquals(4, stream.read(toBuf)); + assertEquals(97, toBuf[0]); + assertEquals(98, toBuf[1]); + assertEquals(99, toBuf[2]); + assertEquals(100, toBuf[3]); + assertEquals(5, stream.available()); - assertThat(stream.read(toBuf), is(4)); - assertThat(toBuf[0], is((byte) 101)); - assertThat(toBuf[1], is((byte) 102)); - assertThat(toBuf[2], is((byte) 103)); - assertThat(toBuf[3], is((byte) 104)); - assertThat(stream.available(), is(1)); + assertEquals(4, stream.read(toBuf)); + assertEquals(101, toBuf[0]); + assertEquals(102, toBuf[1]); + assertEquals(103, toBuf[2]); + assertEquals(104, toBuf[3]); + assertEquals(1, stream.available()); - assertThat(stream.read(toBuf), is(1)); - assertThat(toBuf[0], is((byte) 105)); - assertThat(stream.available(), is(0)); + assertEquals(1, stream.read(toBuf)); + assertEquals(105, toBuf[0]); + assertEquals(0, stream.available()); - assertThat(stream.read(toBuf), is(-1)); - assertThat(stream.available(), is(0)); + assertEquals(-1, stream.read(toBuf)); + assertEquals(0, stream.available()); + } + + @Test + public void requireMarkWorks() throws IOException { + InputStream stream = create("abcdefghijklmnopqr".getBytes(StandardCharsets.US_ASCII), 9); + assertEquals(97, stream.read()); + assertTrue(stream.markSupported()); + stream.mark(5); + assertEquals(98, stream.read()); + assertEquals(99, stream.read()); + stream.reset(); + assertEquals(98, stream.read()); + assertEquals(99, stream.read()); + assertEquals(100, stream.read()); + assertEquals(101, stream.read()); } } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java index 9a61af7266f..df1d5505632 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespaxmlparser/MockFeedReaderFactory.java @@ -13,6 +13,10 @@ import java.io.InputStream; */ public class MockFeedReaderFactory extends FeedReaderFactory { + public MockFeedReaderFactory() { + super(true); + } + @Override public FeedReader createReader( InputStream inputStream, |