summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@oath.com>2018-04-03 17:37:17 +0200
committerBjørn Christian Seime <bjorncs@oath.com>2018-04-03 17:37:17 +0200
commit2f45e213261e98a272a171b6fb5d07cf77496f33 (patch)
tree98f12ffd9157f6e1519c23fcc74da22b4d1267c9 /vespa-http-client
parent81b0d89f8e9b788da56c9cea3b5cabb3d00c42dc (diff)
Add reporting of endpoint errors to FeedClient.ResultCallback
Extend the ResultCallback interface with the method onEndpointException which users may implement to observe endpoint failures.
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java17
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java23
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java25
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java42
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java4
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java7
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java73
8 files changed, 193 insertions, 3 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
index 299541a5a2d..fc6605eeba0 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClient.java
@@ -43,13 +43,26 @@ public interface FeedClient extends AutoCloseable {
/**
- * This callback is executed when new results are arriving. Don't do any heavy lifting in this thread (no IO, disk,
- * or heavy CPU usage). This call back will run in a different thread than your main program so use e.g.
+ * This callback is executed when new results are arriving or an error occur.
+ * Don't do any heavy lifting in this thread (no IO, disk, or heavy CPU usage).
+ * This call back will run in a different thread than your main program so use e.g.
* AtomicInteger for counters and follow general guides for thread-safe programming.
* There is an example implementation in class SimpleLoggerResultCallback.
*/
interface ResultCallback {
void onCompletion(String docId, Result documentResult);
+
+ /**
+ * Called with an exception whenever an endpoint specific error occurs during feeding.
+ * The error may or may not be transient - the operation will in both cases be retried until it's successful.
+ * This callback is intended for application level monitoring (logging, metrics, altering etc).
+ * Document specific errors will be reported back through {@link #onCompletion(String, Result)}.
+ *
+ * @see FeedEndpointException
+ * @param exception An exception specifying endpoint and cause. See {@link FeedEndpointException} for details.
+ */
+ // TODO Vespa 7: Remove empty default implementation
+ default void onEndpointException(FeedEndpointException exception) {}
}
/**
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java
new file mode 100644
index 00000000000..7ef585e814f
--- /dev/null
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedConnectException.java
@@ -0,0 +1,23 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Endpoint;
+
+/**
+ * An exception thrown when the client is unable to connect to a feed endpoint.
+ *
+ * @author bjorncs
+ */
+public class FeedConnectException extends FeedEndpointException {
+
+ public FeedConnectException(Throwable cause, Endpoint endpoint) {
+ super(createMessage(cause, endpoint), cause, endpoint);
+ }
+
+ private static String createMessage(Throwable cause, Endpoint endpoint) {
+ return String.format("Handshake to endpoint '%s:%d' failed: %s",
+ endpoint.getHostname(),
+ endpoint.getPort(),
+ cause.getMessage());
+ }
+}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
new file mode 100644
index 00000000000..d3e4b4ed762
--- /dev/null
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedEndpointException.java
@@ -0,0 +1,25 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Endpoint;
+
+/**
+ * An exception type for endpoint specific errors.
+ *
+ * @see FeedConnectException
+ * @see FeedProtocolException
+ * @author bjorncs
+ */
+public abstract class FeedEndpointException extends RuntimeException {
+ private final Endpoint endpoint;
+
+ protected FeedEndpointException(String message, Throwable cause, Endpoint endpoint) {
+ super(message, cause);
+ this.endpoint = endpoint;
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java
new file mode 100644
index 00000000000..dd5aa902c1a
--- /dev/null
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedProtocolException.java
@@ -0,0 +1,42 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import com.yahoo.vespa.http.client.config.Endpoint;
+
+/**
+ * An exception thrown when a feed endpoint returns an error during feeding.
+ *
+ * @author bjorncs
+ */
+public class FeedProtocolException extends FeedEndpointException {
+
+ private final int httpStatusCode;
+ private final String httpResponseMessage;
+
+ public FeedProtocolException(int httpStatusCode,
+ String httpResponseMessage,
+ Throwable cause,
+ Endpoint endpoint) {
+ super(createMessage(httpStatusCode, httpResponseMessage, endpoint), cause, endpoint);
+ this.httpStatusCode = httpStatusCode;
+ this.httpResponseMessage = httpResponseMessage;
+ }
+
+ private static String createMessage(int httpStatusCode,
+ String httpResponseMessage,
+ Endpoint endpoint) {
+ return String.format("Endpoint '%s:%d' returned an error on handshake: %d - %s",
+ endpoint.getHostname(),
+ endpoint.getPort(),
+ httpStatusCode,
+ httpResponseMessage);
+ }
+
+ public int getHttpStatusCode() {
+ return httpStatusCode;
+ }
+
+ public String getHttpResponseMessage() {
+ return httpResponseMessage;
+ }
+}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index 98dd067b7c5..c7731f5ab48 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.communication;
import com.google.common.annotations.Beta;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
+import com.yahoo.vespa.http.client.FeedEndpointException;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.EndpointResult;
@@ -58,6 +59,10 @@ class EndpointResultQueue {
resultReceived(result, clusterId, true);
}
+ void onEndpointError(FeedEndpointException e) {
+ operationProcessor.onEndpointError(e);
+ }
+
private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
operationProcessor.resultReceived(result, clusterId);
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index bff9d2186e6..26dc1d1bd73 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -2,6 +2,8 @@
package com.yahoo.vespa.http.client.core.communication;
import com.google.common.annotations.Beta;
+import com.yahoo.vespa.http.client.FeedConnectException;
+import com.yahoo.vespa.http.client.FeedProtocolException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
@@ -321,9 +323,11 @@ class IOThread implements Runnable, AutoCloseable {
executeProblemsCounter.incrementAndGet();
log.info("Handshake did not work out " + endpoint + ": " + Exceptions.toMessageString(ser));
drainFirstDocumentsInQueueIfOld();
+ resultQueue.onEndpointError(new FeedProtocolException(ser.getResponseCode(), ser.getResponseString(), ser, endpoint));
return ThreadState.CONNECTED;
} catch (Throwable throwable) { // This cover IOException as well
executeProblemsCounter.incrementAndGet();
+ resultQueue.onEndpointError(new FeedConnectException(throwable, endpoint));
log.info("Problem with Handshake " + endpoint + ": " + Exceptions.toMessageString(throwable));
drainFirstDocumentsInQueueIfOld();
client.close();
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 5907694f55a..4b95f367485 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor;
import com.google.common.annotations.Beta;
import com.google.common.collect.ArrayListMultimap;
import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedEndpointException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.SessionParams;
@@ -202,6 +203,11 @@ public class OperationProcessor {
}
}
}
+
+ public void onEndpointError(FeedEndpointException e) {
+ resultCallback.onEndpointException(e);
+ }
+
public List<Exception> closeClusters() {
List<Exception> exceptions = new ArrayList<>();
// first, close cluster sessions and allow connections to drain normally
@@ -297,5 +303,4 @@ public class OperationProcessor {
throw new RuntimeException("Did not manage to shut down retry threads. Please report problem.");
}
}
-
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index a894aa5af1d..f2a63895fd4 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -1,27 +1,41 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.communication;
+import com.yahoo.vespa.http.client.FeedConnectException;
+import com.yahoo.vespa.http.client.FeedEndpointException;
+import com.yahoo.vespa.http.client.FeedProtocolException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.V3HttpAPITest;
+import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.ServerResponseException;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
public class IOThreadTest {
+
+ private static final Endpoint ENDPOINT = Endpoint.create("myhost");
+
final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class);
final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class);
final String exceptionMessage = "SOME EXCEPTION FOO";
@@ -34,6 +48,10 @@ public class IOThreadTest {
V3HttpAPITest.documents.get(1).getContents(), null /* context */);
DocumentQueue documentQueue = new DocumentQueue(4);
+ public IOThreadTest() {
+ when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT);
+ }
+
/**
* Set up mock so that it can handle both failDocument() and resultReceived().
* @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail.
@@ -124,4 +142,59 @@ public class IOThreadTest {
assert (latch.await(120, TimeUnit.SECONDS));
}
}
+
+ @Test
+ public void requireThatEndpointProtocolExceptionsArePropagated()
+ throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
+ when(apacheGatewayConnection.connect()).thenReturn(true);
+ int errorCode = 403;
+ String errorMessage = "Not authorized";
+ doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake();
+ Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
+
+ try (IOThread ioThread = new IOThread(
+ endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ ioThread.post(doc1);
+ FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
+ assertThat(reportedException, instanceOf(FeedProtocolException.class));
+ FeedProtocolException actualException = (FeedProtocolException) reportedException;
+ assertThat(actualException.getHttpStatusCode(), equalTo(errorCode));
+ assertThat(actualException.getHttpResponseMessage(), equalTo(errorMessage));
+ assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
+ assertThat(actualException.getMessage(), equalTo("Endpoint 'myhost:4080' returned an error on handshake: 403 - Not authorized"));
+ }
+ }
+
+ @Test
+ public void requireThatEndpointConnectExceptionsArePropagated()
+ throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
+ when(apacheGatewayConnection.connect()).thenReturn(true);
+ String errorMessage = "generic error message";
+ IOException cause = new IOException(errorMessage);
+ doThrow(cause).when(apacheGatewayConnection).handshake();
+ Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
+
+ try (IOThread ioThread = new IOThread(
+ endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ ioThread.post(doc1);
+ FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
+ assertThat(reportedException, instanceOf(FeedConnectException.class));
+ FeedConnectException actualException = (FeedConnectException) reportedException;
+ assertThat(actualException.getCause(), equalTo(cause));
+ assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
+ assertThat(actualException.getMessage(), equalTo("Handshake to endpoint 'myhost:4080' failed: generic error message"));
+ }
+ }
+
+ private static Future<FeedEndpointException> endpointErrorCapturer(EndpointResultQueue endpointResultQueue) {
+ CompletableFuture<FeedEndpointException> futureResult = new CompletableFuture<>();
+ doAnswer(invocation -> {
+ if (futureResult.isDone()) return null;
+ FeedEndpointException reportedException = (FeedEndpointException) invocation.getArguments()[0];
+ futureResult.complete(reportedException);
+ return null;
+ }).when(endpointResultQueue).onEndpointError(any());
+ return futureResult;
+ }
+
}