diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-02-04 15:43:10 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-02-04 15:43:10 +0100 |
commit | 01de5e0e7d9836cf460ce13d41303be745ef49f6 (patch) | |
tree | c121f460bdb5f450279196b2b1b63b4075f1d979 /vespa-feed-client-api | |
parent | 1c30e549f27a2e84ecf0336dc2bc6d980295e5af (diff) |
Await inflight operations before returning from close
Diffstat (limited to 'vespa-feed-client-api')
-rw-r--r-- | vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java | 27 |
1 files changed, 23 insertions, 4 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 129a77f3c29..fdbfae53321 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -45,6 +45,8 @@ public class JsonFeeder implements Closeable { }); private final FeedClient client; private final OperationParameters protoParameters; + private final AtomicInteger globalInflightOperations = new AtomicInteger(0); + private volatile boolean closed = false; private JsonFeeder(FeedClient client, OperationParameters protoParameters) { this.client = client; @@ -84,6 +86,8 @@ public class JsonFeeder implements Closeable { * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. */ public CompletableFuture<Result> feedSingle(String json) { + if (closed) throw new IllegalStateException("Already closed"); + globalInflightOperations.incrementAndGet(); CompletableFuture<Result> result = new CompletableFuture<>(); try { SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); @@ -93,9 +97,11 @@ public class JsonFeeder implements Closeable { } else { result.complete(operationResult); } + globalInflightOperations.decrementAndGet(); }, resultExecutor); } catch (Exception e) { resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); + globalInflightOperations.decrementAndGet(); } return result; } @@ -140,24 +146,27 @@ public class JsonFeeder implements Closeable { } CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { + if (closed) throw new IllegalStateException("Already closed"); CompletableFuture<Void> overallResult = new CompletableFuture<>(); CompletableFuture<Result> result; - AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation + AtomicInteger localInflightOperations = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); try (RingBufferStream buffer = new RingBufferStream(jsonStream, size)) { while ((result = buffer.next()) != null) { - pending.incrementAndGet(); + localInflightOperations.incrementAndGet(); + globalInflightOperations.incrementAndGet(); result.whenCompleteAsync((r, t) -> { if (!finalCallbackInvoked.get()) { invokeCallback(resultCallback, c -> c.onNextResult(r, (FeedException) t)); } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + if (localInflightOperations.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { invokeCallback(resultCallback, ResultCallback::onComplete); overallResult.complete(null); } + globalInflightOperations.decrementAndGet(); }, resultExecutor); } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + if (localInflightOperations.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { resultExecutor.execute(() -> { invokeCallback(resultCallback, ResultCallback::onComplete); overallResult.complete(null); @@ -187,6 +196,8 @@ public class JsonFeeder implements Closeable { private static final JsonFactory factory = new JsonFactory(); @Override public void close() throws IOException { + closed = true; + awaitInflightOperations(); client.close(); resultExecutor.shutdown(); try { @@ -198,6 +209,14 @@ public class JsonFeeder implements Closeable { } } + private void awaitInflightOperations() { + try { + while (globalInflightOperations.get() > 0) Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + private FeedException wrapException(Exception e) { if (e instanceof FeedException) return (FeedException) e; if (e instanceof IOException) { |