diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2022-01-25 08:22:37 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2022-01-25 08:22:37 +0100 |
commit | 845cd4468a85e4977f8f9b5b72a4b25458c84848 (patch) | |
tree | 1cab90f22113d7d3ef57c501789d47200e92373d /vespa-feed-client-api/src | |
parent | c9ac4edeec6a905bedbfb6d605acb3dd7034262c (diff) |
Ensure no thread is left alive, and input stream is closed
Diffstat (limited to 'vespa-feed-client-api/src')
-rw-r--r-- | vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 044f3ae1d14..129a77f3c29 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -125,6 +125,7 @@ public class JsonFeeder implements Closeable { * </pre> * Note that {@code "id"} is an alias for the document put operation. * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + * The input stream will be closed upon exhaustion, or error. */ public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { return feedMany(jsonStream, 1 << 26, resultCallback); @@ -143,8 +144,7 @@ public class JsonFeeder implements Closeable { CompletableFuture<Result> result; AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); - try { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); + try (RingBufferStream buffer = new RingBufferStream(jsonStream, size)) { while ((result = buffer.next()) != null) { pending.incrementAndGet(); result.whenCompleteAsync((r, t) -> { @@ -225,7 +225,9 @@ public class JsonFeeder implements Closeable { this.data = new byte[size]; this.size = size; - new Thread(this::fill, "feed-reader").start(); + Thread filler = new Thread(this::fill, "feed-reader"); + filler.setDaemon(true); + filler.start(); this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); } |