aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-28 15:00:59 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-28 15:00:59 +0200
commit34a24e7d7deccff5224619a927a88ba90e3c07f9 (patch)
tree65853257c44f087f9979d3d91cee029ec6b76fa8 /vespa-feed-client
parent982f6a310ffa4ee2db74781170622f890615fb5c (diff)
Expand JSON readere buffer when too small for current document
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java26
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java1
2 files changed, 24 insertions, 3 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 68e102ab079..653457fdd9f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -201,9 +201,9 @@ public class JsonFeeder implements Closeable {
private final byte[] b = new byte[1];
private final InputStream in;
- private final byte[] data;
- private final int size;
private final Object lock = new Object();
+ private byte[] data;
+ private int size;
private IOException thrown = null;
private long tail = 0;
private long pos = 0;
@@ -231,6 +231,9 @@ public class JsonFeeder implements Closeable {
try {
int ready;
synchronized (lock) {
+ if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write.
+ expand();
+
while ((ready = (int) (head - pos)) == 0 && ! done)
lock.wait();
}
@@ -256,6 +259,23 @@ public class JsonFeeder implements Closeable {
return parserAndExecutor.next();
}
+ private void expand() {
+ int newSize = size * 2;
+ if (newSize <= size)
+ throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much");
+
+ byte[] newData = new byte[newSize];
+ int offset = (int) (tail % size);
+ int newOffset = (int) (tail % newSize);
+ int toWrite = size - offset;
+ System.arraycopy(data, offset, newData, newOffset, toWrite);
+ if (toWrite < size)
+ System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite);
+ size = newSize;
+ data = newData;
+ lock.notify();
+ }
+
private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
private byte[] copy(long start, long end) {
int length = (int) (end - start);
@@ -287,7 +307,7 @@ public class JsonFeeder implements Closeable {
while (true) {
int free;
synchronized (lock) {
- while ((free = (int) (tail + size - head)) <= 0 && !done)
+ while ((free = (int) (tail + size - head)) <= 0 && ! done)
lock.wait();
}
if (done) break;
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
index da0c45c94b5..88a9a71c73e 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -127,6 +127,7 @@ class JsonFeederTest {
"}\n";
feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8)),
+ 3, // Mini-buffer, which needs to expand.
new JsonFeeder.ResultCallback() { })
.get();
client.assertRemoveDocumentIds("abc1");