diff options
author | Bjørn Christian Seime <bjorncs@oath.com> | 2018-04-03 17:37:17 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@oath.com> | 2018-04-03 17:37:17 +0200 |
commit | 2f45e213261e98a272a171b6fb5d07cf77496f33 (patch) | |
tree | 98f12ffd9157f6e1519c23fcc74da22b4d1267c9 /vespa-http-client | |
parent | 81b0d89f8e9b788da56c9cea3b5cabb3d00c42dc (diff) |
Add reporting of endpoint errors to FeedClient.ResultCallback
Extend the ResultCallback interface with the method onEndpointException
which users may implement to observe endpoint failures.
Diffstat (limited to 'vespa-http-client')
8 files changed, 193 insertions, 3 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java index 299541a5a2d..fc6605eeba0 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java @@ -43,13 +43,26 @@ public interface FeedClient extends AutoCloseable { /** - * This callback is executed when new results are arriving. Don't do any heavy lifting in this thread (no IO, disk, - * or heavy CPU usage). This call back will run in a different thread than your main program so use e.g. + * This callback is executed when new results are arriving or an error occur. + * Don't do any heavy lifting in this thread (no IO, disk, or heavy CPU usage). + * This call back will run in a different thread than your main program so use e.g. * AtomicInteger for counters and follow general guides for thread-safe programming. * There is an example implementation in class SimpleLoggerResultCallback. */ interface ResultCallback { void onCompletion(String docId, Result documentResult); + + /** + * Called with an exception whenever an endpoint specific error occurs during feeding. + * The error may or may not be transient - the operation will in both cases be retried until it's successful. + * This callback is intended for application level monitoring (logging, metrics, altering etc). + * Document specific errors will be reported back through {@link #onCompletion(String, Result)}. + * + * @see FeedEndpointException + * @param exception An exception specifying endpoint and cause. See {@link FeedEndpointException} for details. + */ + // TODO Vespa 7: Remove empty default implementation + default void onEndpointException(FeedEndpointException exception) {} } /** diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java new file mode 100644 index 00000000000..7ef585e814f --- /dev/null +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java @@ -0,0 +1,23 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client; + +import com.yahoo.vespa.http.client.config.Endpoint; + +/** + * An exception thrown when the client is unable to connect to a feed endpoint. + * + * @author bjorncs + */ +public class FeedConnectException extends FeedEndpointException { + + public FeedConnectException(Throwable cause, Endpoint endpoint) { + super(createMessage(cause, endpoint), cause, endpoint); + } + + private static String createMessage(Throwable cause, Endpoint endpoint) { + return String.format("Handshake to endpoint '%s:%d' failed: %s", + endpoint.getHostname(), + endpoint.getPort(), + cause.getMessage()); + } +} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java new file mode 100644 index 00000000000..d3e4b4ed762 --- /dev/null +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java @@ -0,0 +1,25 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client; + +import com.yahoo.vespa.http.client.config.Endpoint; + +/** + * An exception type for endpoint specific errors. + * + * @see FeedConnectException + * @see FeedProtocolException + * @author bjorncs + */ +public abstract class FeedEndpointException extends RuntimeException { + private final Endpoint endpoint; + + protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) { + super(message, cause); + this.endpoint = endpoint; + } + + public Endpoint getEndpoint() { + return endpoint; + } + +} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java new file mode 100644 index 00000000000..dd5aa902c1a --- /dev/null +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java @@ -0,0 +1,42 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client; + +import com.yahoo.vespa.http.client.config.Endpoint; + +/** + * An exception thrown when a feed endpoint returns an error during feeding. + * + * @author bjorncs + */ +public class FeedProtocolException extends FeedEndpointException { + + private final int httpStatusCode; + private final String httpResponseMessage; + + public FeedProtocolException(int httpStatusCode, + String httpResponseMessage, + Throwable cause, + Endpoint endpoint) { + super(createMessage(httpStatusCode, httpResponseMessage, endpoint), cause, endpoint); + this.httpStatusCode = httpStatusCode; + this.httpResponseMessage = httpResponseMessage; + } + + private static String createMessage(int httpStatusCode, + String httpResponseMessage, + Endpoint endpoint) { + return String.format("Endpoint '%s:%d' returned an error on handshake: %d - %s", + endpoint.getHostname(), + endpoint.getPort(), + httpStatusCode, + httpResponseMessage); + } + + public int getHttpStatusCode() { + return httpStatusCode; + } + + public String getHttpResponseMessage() { + return httpResponseMessage; + } +} diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java index 98dd067b7c5..c7731f5ab48 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.communication; import com.google.common.annotations.Beta; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import com.yahoo.vespa.http.client.FeedEndpointException; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import com.yahoo.vespa.http.client.core.EndpointResult; @@ -58,6 +59,10 @@ class EndpointResultQueue { resultReceived(result, clusterId, true); } + void onEndpointError(FeedEndpointException e) { + operationProcessor.onEndpointError(e); + } + private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) { operationProcessor.resultReceived(result, clusterId); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index bff9d2186e6..26dc1d1bd73 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -2,6 +2,8 @@ package com.yahoo.vespa.http.client.core.communication; import com.google.common.annotations.Beta; +import com.yahoo.vespa.http.client.FeedConnectException; +import com.yahoo.vespa.http.client.FeedProtocolException; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; @@ -321,9 +323,11 @@ class IOThread implements Runnable, AutoCloseable { executeProblemsCounter.incrementAndGet(); log.info("Handshake did not work out " + endpoint + ": " + Exceptions.toMessageString(ser)); drainFirstDocumentsInQueueIfOld(); + resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint)); return ThreadState.CONNECTED; } catch (Throwable throwable) { // This cover IOException as well executeProblemsCounter.incrementAndGet(); + resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint)); log.info("Problem with Handshake " + endpoint + ": " + Exceptions.toMessageString(throwable)); drainFirstDocumentsInQueueIfOld(); client.close(); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 5907694f55a..4b95f367485 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor; import com.google.common.annotations.Beta; import com.google.common.collect.ArrayListMultimap; import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedEndpointException; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.config.Cluster; import com.yahoo.vespa.http.client.config.SessionParams; @@ -202,6 +203,11 @@ public class OperationProcessor { } } } + + public void onEndpointError(FeedEndpointException e) { + resultCallback.onEndpointException(e); + } + public List<Exception> closeClusters() { List<Exception> exceptions = new ArrayList<>(); // first, close cluster sessions and allow connections to drain normally @@ -297,5 +303,4 @@ public class OperationProcessor { throw new RuntimeException("Did not manage to shut down retry threads. Please report problem."); } } - } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index a894aa5af1d..f2a63895fd4 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -1,27 +1,41 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.http.client.core.communication; +import com.yahoo.vespa.http.client.FeedConnectException; +import com.yahoo.vespa.http.client.FeedEndpointException; +import com.yahoo.vespa.http.client.FeedProtocolException; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.V3HttpAPITest; +import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.ServerResponseException; import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; public class IOThreadTest { + + private static final Endpoint ENDPOINT = Endpoint.create("myhost"); + final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class); final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class); final String exceptionMessage = "SOME EXCEPTION FOO"; @@ -34,6 +48,10 @@ public class IOThreadTest { V3HttpAPITest.documents.get(1).getContents(), null /* context */); DocumentQueue documentQueue = new DocumentQueue(4); + public IOThreadTest() { + when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT); + } + /** * Set up mock so that it can handle both failDocument() and resultReceived(). * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail. @@ -124,4 +142,59 @@ public class IOThreadTest { assert (latch.await(120, TimeUnit.SECONDS)); } } + + @Test + public void requireThatEndpointProtocolExceptionsArePropagated() + throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { + when(apacheGatewayConnection.connect()).thenReturn(true); + int errorCode = 403; + String errorMessage = "Not authorized"; + doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake(); + Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); + + try (IOThread ioThread = new IOThread( + endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + ioThread.post(doc1); + FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); + assertThat(reportedException, instanceOf(FeedProtocolException.class)); + FeedProtocolException actualException = (FeedProtocolException) reportedException; + assertThat(actualException.getHttpStatusCode(), equalTo(errorCode)); + assertThat(actualException.getHttpResponseMessage(), equalTo(errorMessage)); + assertThat(actualException.getEndpoint(), equalTo(ENDPOINT)); + assertThat(actualException.getMessage(), equalTo("Endpoint 'myhost:4080' returned an error on handshake: 403 - Not authorized")); + } + } + + @Test + public void requireThatEndpointConnectExceptionsArePropagated() + throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { + when(apacheGatewayConnection.connect()).thenReturn(true); + String errorMessage = "generic error message"; + IOException cause = new IOException(errorMessage); + doThrow(cause).when(apacheGatewayConnection).handshake(); + Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); + + try (IOThread ioThread = new IOThread( + endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + ioThread.post(doc1); + FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); + assertThat(reportedException, instanceOf(FeedConnectException.class)); + FeedConnectException actualException = (FeedConnectException) reportedException; + assertThat(actualException.getCause(), equalTo(cause)); + assertThat(actualException.getEndpoint(), equalTo(ENDPOINT)); + assertThat(actualException.getMessage(), equalTo("Handshake to endpoint 'myhost:4080' failed: generic error message")); + } + } + + private static Future<FeedEndpointException> endpointErrorCapturer(EndpointResultQueue endpointResultQueue) { + CompletableFuture<FeedEndpointException> futureResult = new CompletableFuture<>(); + doAnswer(invocation -> { + if (futureResult.isDone()) return null; + FeedEndpointException reportedException = (FeedEndpointException) invocation.getArguments()[0]; + futureResult.complete(reportedException); + return null; + }).when(endpointResultQueue).onEndpointError(any()); + return futureResult; + } + } |