summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-api
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2022-01-25 08:22:37 +0100
committerJon Marius Venstad <venstad@gmail.com>2022-01-25 08:22:37 +0100
commit845cd4468a85e4977f8f9b5b72a4b25458c84848 (patch)
tree1cab90f22113d7d3ef57c501789d47200e92373d /vespa-feed-client-api
parentc9ac4edeec6a905bedbfb6d605acb3dd7034262c (diff)
Ensure no thread is left alive, and input stream is closed
Diffstat (limited to 'vespa-feed-client-api')
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java8
1 files changed, 5 insertions, 3 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 044f3ae1d14..129a77f3c29 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -125,6 +125,7 @@ public class JsonFeeder implements Closeable {
* </pre>
* Note that {@code "id"} is an alias for the document put operation.
* Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ * The input stream will be closed upon exhaustion, or error.
*/
public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
return feedMany(jsonStream, 1 << 26, resultCallback);
@@ -143,8 +144,7 @@ public class JsonFeeder implements Closeable {
CompletableFuture<Result> result;
AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
- try {
- RingBufferStream buffer = new RingBufferStream(jsonStream, size);
+ try (RingBufferStream buffer = new RingBufferStream(jsonStream, size)) {
while ((result = buffer.next()) != null) {
pending.incrementAndGet();
result.whenCompleteAsync((r, t) -> {
@@ -225,7 +225,9 @@ public class JsonFeeder implements Closeable {
this.data = new byte[size];
this.size = size;
- new Thread(this::fill, "feed-reader").start();
+ Thread filler = new Thread(this::fill, "feed-reader");
+ filler.setDaemon(true);
+ filler.start();
this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this));
}