diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-28 15:00:59 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-28 15:00:59 +0200 |
commit | 34a24e7d7deccff5224619a927a88ba90e3c07f9 (patch) | |
tree | 65853257c44f087f9979d3d91cee029ec6b76fa8 /vespa-feed-client | |
parent | 982f6a310ffa4ee2db74781170622f890615fb5c (diff) |
Expand JSON readere buffer when too small for current document
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java | 26 | ||||
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java | 1 |
2 files changed, 24 insertions, 3 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 68e102ab079..653457fdd9f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -201,9 +201,9 @@ public class JsonFeeder implements Closeable { private final byte[] b = new byte[1]; private final InputStream in; - private final byte[] data; - private final int size; private final Object lock = new Object(); + private byte[] data; + private int size; private IOException thrown = null; private long tail = 0; private long pos = 0; @@ -231,6 +231,9 @@ public class JsonFeeder implements Closeable { try { int ready; synchronized (lock) { + if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write. + expand(); + while ((ready = (int) (head - pos)) == 0 && ! done) lock.wait(); } @@ -256,6 +259,23 @@ public class JsonFeeder implements Closeable { return parserAndExecutor.next(); } + private void expand() { + int newSize = size * 2; + if (newSize <= size) + throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much"); + + byte[] newData = new byte[newSize]; + int offset = (int) (tail % size); + int newOffset = (int) (tail % newSize); + int toWrite = size - offset; + System.arraycopy(data, offset, newData, newOffset, toWrite); + if (toWrite < size) + System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite); + size = newSize; + data = newData; + lock.notify(); + } + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); private byte[] copy(long start, long end) { int length = (int) (end - start); @@ -287,7 +307,7 @@ public class JsonFeeder implements Closeable { while (true) { int free; synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && !done) + while ((free = (int) (tail + size - head)) <= 0 && ! done) lock.wait(); } if (done) break; diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index da0c45c94b5..88a9a71c73e 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -127,6 +127,7 @@ class JsonFeederTest { "}\n"; feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8)), + 3, // Mini-buffer, which needs to expand. new JsonFeeder.ResultCallback() { }) .get(); client.assertRemoveDocumentIds("abc1"); |