summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-api
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2022-02-04 15:43:10 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2022-02-04 15:43:10 +0100
commit01de5e0e7d9836cf460ce13d41303be745ef49f6 (patch)
treec121f460bdb5f450279196b2b1b63b4075f1d979 /vespa-feed-client-api
parent1c30e549f27a2e84ecf0336dc2bc6d980295e5af (diff)
Await inflight operations before returning from close
Diffstat (limited to 'vespa-feed-client-api')
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java27
1 files changed, 23 insertions, 4 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 129a77f3c29..fdbfae53321 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -45,6 +45,8 @@ public class JsonFeeder implements Closeable {
});
private final FeedClient client;
private final OperationParameters protoParameters;
+ private final AtomicInteger globalInflightOperations = new AtomicInteger(0);
+ private volatile boolean closed = false;
private JsonFeeder(FeedClient client, OperationParameters protoParameters) {
this.client = client;
@@ -84,6 +86,8 @@ public class JsonFeeder implements Closeable {
* Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
*/
public CompletableFuture<Result> feedSingle(String json) {
+ if (closed) throw new IllegalStateException("Already closed");
+ globalInflightOperations.incrementAndGet();
CompletableFuture<Result> result = new CompletableFuture<>();
try {
SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
@@ -93,9 +97,11 @@ public class JsonFeeder implements Closeable {
} else {
result.complete(operationResult);
}
+ globalInflightOperations.decrementAndGet();
}, resultExecutor);
} catch (Exception e) {
resultExecutor.execute(() -> result.completeExceptionally(wrapException(e)));
+ globalInflightOperations.decrementAndGet();
}
return result;
}
@@ -140,24 +146,27 @@ public class JsonFeeder implements Closeable {
}
CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
+ if (closed) throw new IllegalStateException("Already closed");
CompletableFuture<Void> overallResult = new CompletableFuture<>();
CompletableFuture<Result> result;
- AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
+ AtomicInteger localInflightOperations = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
try (RingBufferStream buffer = new RingBufferStream(jsonStream, size)) {
while ((result = buffer.next()) != null) {
- pending.incrementAndGet();
+ localInflightOperations.incrementAndGet();
+ globalInflightOperations.incrementAndGet();
result.whenCompleteAsync((r, t) -> {
if (!finalCallbackInvoked.get()) {
invokeCallback(resultCallback, c -> c.onNextResult(r, (FeedException) t));
}
- if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ if (localInflightOperations.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
invokeCallback(resultCallback, ResultCallback::onComplete);
overallResult.complete(null);
}
+ globalInflightOperations.decrementAndGet();
}, resultExecutor);
}
- if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ if (localInflightOperations.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
resultExecutor.execute(() -> {
invokeCallback(resultCallback, ResultCallback::onComplete);
overallResult.complete(null);
@@ -187,6 +196,8 @@ public class JsonFeeder implements Closeable {
private static final JsonFactory factory = new JsonFactory();
@Override public void close() throws IOException {
+ closed = true;
+ awaitInflightOperations();
client.close();
resultExecutor.shutdown();
try {
@@ -198,6 +209,14 @@ public class JsonFeeder implements Closeable {
}
}
+ private void awaitInflightOperations() {
+ try {
+ while (globalInflightOperations.get() > 0) Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private FeedException wrapException(Exception e) {
if (e instanceof FeedException) return (FeedException) e;
if (e instanceof IOException) {