diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-15 17:26:30 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-16 13:08:32 +0200 |
commit | 5696b08dd6e956c4b4cb4c29e63999d3a974dca5 (patch) | |
tree | a577874c765729ee733421116d40bb5d574b1bbb /vespa-feed-client/src/main | |
parent | 335bb432c5befa014f8616ffa07328e46f52aba1 (diff) |
Report exceptional feed failures as FeedException.
Change signature of ResultCallback to use FeedException.
Rename 'JsonParseException' => 'OperationParseException'.
Add exception for response/result parse failure.
Diffstat (limited to 'vespa-feed-client/src/main')
8 files changed, 180 insertions, 109 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java index e5d45a2f211..7ae44fe8e9f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -63,12 +63,12 @@ class ApacheCluster implements Cluster { endpoint.client.execute(request, new FutureCallback<SimpleHttpResponse>() { @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(new FeedException(ex)); } @Override public void cancelled() { vessel.cancel(false); } }); } catch (Throwable thrown) { - vessel.completeExceptionally(thrown); + vessel.completeExceptionally(new FeedException(thrown)); } vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index 250809a48b9..952edfb5464 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -12,13 +12,21 @@ import java.util.concurrent.CompletableFuture; */ public interface FeedClient extends Closeable { - /** Send a document put with the given parameters, returning a future with the result of the operation. */ + /** + * Send a document put with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + * */ CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params); - /** Send a document update with the given parameters, returning a future with the result of the operation. */ + /** + * Send a document update with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + * */ CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params); - /** Send a document remove with the given parameters, returning a future with the result of the operation. */ + /** Send a document remove with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + * */ CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params); /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java index e1c6c733e9c..54e11d3a185 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java @@ -1,6 +1,8 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import java.util.Optional; + /** * Signals that an error occurred during feeding * @@ -8,10 +10,38 @@ package ai.vespa.feed.client; */ public class FeedException extends RuntimeException { - public FeedException(String message) { super(message); } + private final DocumentId documentId; + + public FeedException(String message) { + super(message); + this.documentId = null; + } + + public FeedException(DocumentId documentId, String message) { + super(message); + this.documentId = documentId; + } + + public FeedException(String message, Throwable cause) { + super(message, cause); + this.documentId = null; + } + + public FeedException(Throwable cause) { + super(cause); + this.documentId = null; + } + + public FeedException(DocumentId documentId, Throwable cause) { + super(cause); + this.documentId = documentId; + } - public FeedException(String message, Throwable cause) { super(message, cause); } + public FeedException(DocumentId documentId, String message, Throwable cause) { + super(message, cause); + this.documentId = documentId; + } - public FeedException(Throwable cause) { super(cause); } + public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index b160cced4b9..2269c56cde4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import java.io.IOException; -import java.io.UncheckedIOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; @@ -102,7 +101,10 @@ class HttpFeedClient implements FeedClient { try { JsonParser parser = factory.createParser(response.body()); if (parser.nextToken() != JsonToken.START_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); + throw new ResultParseException( + documentId, + "Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + + new String(response.body(), UTF_8)); String name; while ((name = parser.nextFieldName()) != null) { @@ -114,15 +116,20 @@ class HttpFeedClient implements FeedClient { } if (parser.currentToken() != JsonToken.END_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); + throw new ResultParseException( + documentId, + "Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + + new String(response.body(), UTF_8)); } catch (IOException e) { - throw new UncheckedIOException(e); + throw new ResultParseException(documentId, e); } if (type == null) // Not a Vespa response, but a failure in the HTTP layer. - throw new FeedException("Status " + response.code() + " executing '" + request + - "': " + (message == null ? new String(response.body(), UTF_8) : message)); + throw new ResultParseException( + documentId, + "Status " + response.code() + " executing '" + request + "': " + + (message == null ? new String(response.body(), UTF_8) : message)); return new Result(type, documentId, message, trace); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index b3a7aca1808..0ba373eef18 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -10,7 +10,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; -import java.io.UncheckedIOException; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -58,13 +57,13 @@ public class JsonFeeder implements Closeable { * @param result Non-null if operation completed successfully * @param error Non-null if operation failed */ - default void onNextResult(Result result, Throwable error) { } + default void onNextResult(Result result, FeedException error) { } /** * Invoked if an unrecoverable error occurred during feed processing, * after which no other {@link ResultCallback} methods are invoked. */ - default void onError(Throwable error) { } + default void onError(FeedException error) { } /** * Invoked when all feed operations are either completed successfully or failed. @@ -81,6 +80,7 @@ public class JsonFeeder implements Closeable { * "fields": { ... document fields ... } * } * </pre> + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. */ public CompletableFuture<Result> feedSingle(String json) { CompletableFuture<Result> result = new CompletableFuture<>(); @@ -94,7 +94,7 @@ public class JsonFeeder implements Closeable { } }, resultExecutor); } catch (Exception e) { - resultExecutor.execute(() -> result.completeExceptionally(e)); + resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); } return result; } @@ -123,27 +123,32 @@ public class JsonFeeder implements Closeable { * ] * </pre> * Note that {@code "id"} is an alias for the document put operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. */ public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { return feedMany(jsonStream, 1 << 26, resultCallback); } + /** + * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance. + * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details. + */ public CompletableFuture<Void> feedMany(InputStream jsonStream) { return feedMany(jsonStream, new ResultCallback() { }); } CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); CompletableFuture<Void> overallResult = new CompletableFuture<>(); CompletableFuture<Result> result; AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); try { + RingBufferStream buffer = new RingBufferStream(jsonStream, size); while ((result = buffer.next()) != null) { pending.incrementAndGet(); result.whenCompleteAsync((r, t) -> { if (!finalCallbackInvoked.get()) { - resultCallback.onNextResult(r, t); + resultCallback.onNextResult(r, (FeedException) t); } if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { resultCallback.onComplete(); @@ -160,8 +165,9 @@ public class JsonFeeder implements Closeable { } catch (Exception e) { if (finalCallbackInvoked.compareAndSet(false, true)) { resultExecutor.execute(() -> { - resultCallback.onError(e); - overallResult.completeExceptionally(e); + FeedException wrapped = wrapException(e); + resultCallback.onError(wrapped); + overallResult.completeExceptionally(wrapped); }); } } @@ -182,6 +188,14 @@ public class JsonFeeder implements Closeable { } } + private FeedException wrapException(Exception e) { + if (e instanceof FeedException) return (FeedException) e; + if (e instanceof IOException) { + return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e); + } + return new FeedException(e); + } + private class RingBufferStream extends InputStream { private final byte[] b = new byte[1]; @@ -189,22 +203,21 @@ public class JsonFeeder implements Closeable { private final byte[] data; private final int size; private final Object lock = new Object(); - private Throwable thrown = null; + private IOException thrown = null; private long tail = 0; private long pos = 0; private long head = 0; private boolean done = false; private final OperationParserAndExecutor parserAndExecutor; - RingBufferStream(InputStream in, int size) { + RingBufferStream(InputStream in, int size) throws IOException { this.in = in; this.data = new byte[size]; this.size = size; new Thread(this::fill, "feed-reader").start(); - try { this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); } - catch (IOException e) { throw new UncheckedIOException(e); } + this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); } @Override @@ -220,7 +233,7 @@ public class JsonFeeder implements Closeable { while ((ready = (int) (head - pos)) == 0 && ! done) lock.wait(); } - if (thrown != null) throw new RuntimeException("Error reading input", thrown); + if (thrown != null) throw thrown; if (ready == 0) return -1; ready = min(ready, len); @@ -273,7 +286,7 @@ public class JsonFeeder implements Closeable { while (true) { int free; synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) + while ((free = (int) (tail + size - head)) <= 0 && !done) lock.wait(); } if (done) break; @@ -288,18 +301,22 @@ public class JsonFeeder implements Closeable { lock.notify(); } } - } - catch (Throwable t) { + } catch (InterruptedException e) { synchronized (lock) { done = true; - thrown = t; + thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage()); + } + } catch (IOException e) { + synchronized (lock) { + done = true; + thrown = e; } } } private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { - RingBufferBackedOperationParserAndExecutor(JsonParser parser) throws IOException { super(parser, true); } + RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); } @Override String getDocumentJson(long start, long end) { @@ -334,7 +351,7 @@ public class JsonFeeder implements Closeable { private final boolean multipleOperations; private boolean arrayPrefixParsed; - protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException { + protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) { this.parser = parser; this.multipleOperations = multipleOperations; } @@ -342,82 +359,78 @@ public class JsonFeeder implements Closeable { abstract String getDocumentJson(long start, long end); CompletableFuture<Result> next() throws IOException { - try { - if (multipleOperations && !arrayPrefixParsed){ - expect(START_ARRAY); - arrayPrefixParsed = true; - } + if (multipleOperations && !arrayPrefixParsed){ + expect(START_ARRAY); + arrayPrefixParsed = true; + } - JsonToken token = parser.nextToken(); - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && !multipleOperations) return null; - else if (token == START_OBJECT); - else throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; - } - end = parser.getTokenLocation().getByteOffset() + 1; - break; + JsonToken token = parser.nextToken(); + if (token == END_ARRAY && multipleOperations) return null; + else if (token == null && !multipleOperations) return null; + else if (token == START_OBJECT); + else throw new OperationParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; } - default: throw new JsonParseException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + end = parser.getTokenLocation().getByteOffset() + 1; + break; } - break; + default: throw new OperationParseException("Unexpected field name '" + parser.getText() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + break; - case END_OBJECT: - break loop; + case END_OBJECT: + break loop; - default: - throw new JsonParseException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - } - if (id == null) - throw new JsonParseException("No document id for document at offset " + start); - - if (end < start) - throw new JsonParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - String payload = getDocumentJson(start, end); - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new JsonParseException("Unexpected operation type '" + type + "'"); + default: + throw new OperationParseException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); } - } catch (com.fasterxml.jackson.core.JacksonException e) { - throw new JsonParseException("Failed to parse JSON", e); + } + if (id == null) + throw new OperationParseException("No document id for document at offset " + start); + + if (end < start) + throw new OperationParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); + String payload = getDocumentJson(start, end); + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new OperationParseException("Unexpected operation type '" + type + "'"); } } private void expect(JsonToken token) throws IOException { if (parser.nextToken() != token) - throw new JsonParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); } private String readString() throws IOException { String value = parser.nextTextValue(); if (value == null) - throw new JsonParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; @@ -426,7 +439,7 @@ public class JsonFeeder implements Closeable { private boolean readBoolean() throws IOException { Boolean value = parser.nextBooleanValue(); if (value == null) - throw new JsonParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; @@ -439,7 +452,6 @@ public class JsonFeeder implements Closeable { } - public static class Builder { final FeedClient client; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java deleted file mode 100644 index 8edf74ec275..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonParseException.java +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -/** - * Signals that supplied JSON is invalid - * - * @author bjorncs - */ -public class JsonParseException extends FeedException { - - public JsonParseException(String message) { super(message); } - - public JsonParseException(String message, Throwable cause) { super(message, cause); } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java new file mode 100644 index 00000000000..15ba024bb4e --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java @@ -0,0 +1,15 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +/** + * Signals that supplied JSON for a document/operation is invalid + * + * @author bjorncs + */ +public class OperationParseException extends FeedException { + + public OperationParseException(String message) { super(message); } + + public OperationParseException(String message, Throwable cause) { super(message, cause); } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java new file mode 100644 index 00000000000..3fd5143e2f4 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java @@ -0,0 +1,14 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +/** + * Signals that the client was unable to parse the result/response from container + * + * @author bjorncs + */ +public class ResultParseException extends FeedException { + + public ResultParseException(DocumentId documentId, String message) { super(documentId, message); } + + public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); } +} |