diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-01-05 14:28:23 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2022-01-05 14:28:23 +0100 |
commit | a5128bc58bb7da3912604c52a4aafbd3535e98eb (patch) | |
tree | 1c64e0d603f8a24a050858b8a4df8c464b0b0988 /vespa-feed-client-api | |
parent | e23031b46bbcbf312ff57905b32a180e0fc2ae21 (diff) |
Handle exceptions from result callback gracefully
Diffstat (limited to 'vespa-feed-client-api')
-rw-r--r-- | vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 41b432449df..044f3ae1d14 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -18,16 +18,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.logging.Level; +import java.util.logging.Logger; import static ai.vespa.feed.client.FeedClient.OperationType.PUT; import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; -import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; -import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; import static java.lang.Math.min; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @@ -38,6 +36,8 @@ import static java.util.Objects.requireNonNull; */ public class JsonFeeder implements Closeable { + private static final Logger log = Logger.getLogger(JsonFeeder.class.getName()); + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { Thread t = new Thread(r, "json-feeder-result-executor"); t.setDaemon(true); @@ -149,17 +149,17 @@ public class JsonFeeder implements Closeable { pending.incrementAndGet(); result.whenCompleteAsync((r, t) -> { if (!finalCallbackInvoked.get()) { - resultCallback.onNextResult(r, (FeedException) t); + invokeCallback(resultCallback, c -> c.onNextResult(r, (FeedException) t)); } if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultCallback.onComplete(); + invokeCallback(resultCallback, ResultCallback::onComplete); overallResult.complete(null); } }, resultExecutor); } if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { resultExecutor.execute(() -> { - resultCallback.onComplete(); + invokeCallback(resultCallback, ResultCallback::onComplete); overallResult.complete(null); }); } @@ -167,7 +167,7 @@ public class JsonFeeder implements Closeable { if (finalCallbackInvoked.compareAndSet(false, true)) { resultExecutor.execute(() -> { FeedException wrapped = wrapException(e); - resultCallback.onError(wrapped); + invokeCallback(resultCallback, c -> c.onError(wrapped)); overallResult.completeExceptionally(wrapped); }); } @@ -175,6 +175,15 @@ public class JsonFeeder implements Closeable { return overallResult; } + private static void invokeCallback(ResultCallback callback, Consumer<ResultCallback> invocation) { + try { + invocation.accept(callback); + } catch (Throwable t) { + // Just log the exception/error and keep result executor alive (don't rethrow) + log.log(Level.WARNING, "Got exception during invocation on ResultCallback: " + t, t); + } + } + private static final JsonFactory factory = new JsonFactory(); @Override public void close() throws IOException { |