summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-api
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2022-01-05 14:28:23 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2022-01-05 14:28:23 +0100
commita5128bc58bb7da3912604c52a4aafbd3535e98eb (patch)
tree1c64e0d603f8a24a050858b8a4df8c464b0b0988 /vespa-feed-client-api
parente23031b46bbcbf312ff57905b32a180e0fc2ae21 (diff)
Handle exceptions from result callback gracefully
Diffstat (limited to 'vespa-feed-client-api')
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java27
1 files changed, 18 insertions, 9 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 41b432449df..044f3ae1d14 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -18,16 +18,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
@@ -38,6 +36,8 @@ import static java.util.Objects.requireNonNull;
*/
public class JsonFeeder implements Closeable {
+ private static final Logger log = Logger.getLogger(JsonFeeder.class.getName());
+
private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "json-feeder-result-executor");
t.setDaemon(true);
@@ -149,17 +149,17 @@ public class JsonFeeder implements Closeable {
pending.incrementAndGet();
result.whenCompleteAsync((r, t) -> {
if (!finalCallbackInvoked.get()) {
- resultCallback.onNextResult(r, (FeedException) t);
+ invokeCallback(resultCallback, c -> c.onNextResult(r, (FeedException) t));
}
if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
- resultCallback.onComplete();
+ invokeCallback(resultCallback, ResultCallback::onComplete);
overallResult.complete(null);
}
}, resultExecutor);
}
if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
resultExecutor.execute(() -> {
- resultCallback.onComplete();
+ invokeCallback(resultCallback, ResultCallback::onComplete);
overallResult.complete(null);
});
}
@@ -167,7 +167,7 @@ public class JsonFeeder implements Closeable {
if (finalCallbackInvoked.compareAndSet(false, true)) {
resultExecutor.execute(() -> {
FeedException wrapped = wrapException(e);
- resultCallback.onError(wrapped);
+ invokeCallback(resultCallback, c -> c.onError(wrapped));
overallResult.completeExceptionally(wrapped);
});
}
@@ -175,6 +175,15 @@ public class JsonFeeder implements Closeable {
return overallResult;
}
+ private static void invokeCallback(ResultCallback callback, Consumer<ResultCallback> invocation) {
+ try {
+ invocation.accept(callback);
+ } catch (Throwable t) {
+ // Just log the exception/error and keep result executor alive (don't rethrow)
+ log.log(Level.WARNING, "Got exception during invocation on ResultCallback: " + t, t);
+ }
+ }
+
private static final JsonFactory factory = new JsonFactory();
@Override public void close() throws IOException {