summaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java')
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java322
1 files changed, 120 insertions, 202 deletions
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 59fb968906f..e684c929fda 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -1,232 +1,150 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.communication;
-import com.yahoo.vespa.http.client.FeedConnectException;
+import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.FeedProtocolException;
-import com.yahoo.vespa.http.client.ManualClock;
import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.V3HttpAPITest;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.OperationProcessorTester;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import org.junit.Test;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.time.Clock;
import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-// DO NOT ADD TESTS HERE, add to NewIOThreadTest
-public class IOThreadTest {
-
- private static final Endpoint ENDPOINT = Endpoint.create("myhost");
-
- final Clock clock = Clock.systemUTC();
- final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class);
- final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class);
- final String exceptionMessage = "SOME EXCEPTION FOO";
- CountDownLatch latch = new CountDownLatch(1);
- String docId1 = V3HttpAPITest.documents.get(0).getDocumentId();
- Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(),
- V3HttpAPITest.documents.get(0).getContents(),
- null,
- clock.instant());
- String docId2 = V3HttpAPITest.documents.get(1).getDocumentId();
- Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(),
- V3HttpAPITest.documents.get(1).getContents(),
- null,
- clock.instant());
- DocumentQueue documentQueue = new DocumentQueue(4, clock);
-
- public IOThreadTest() {
- when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT);
- }
-
- /**
- * Set up mock so that it can handle both failDocument() and resultReceived().
- *
- * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail.
- * @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail.
- * @param isTransient checked on failure, if different, the mock will fail.
- * @param expectedException checked on failure, if exception toString is different, the mock will fail.
- */
- void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk, boolean isTransient, String expectedException) {
- doAnswer(invocation -> {
- EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
- assertThat(endpointResult.getOperationId(), is(expectedDocIdFail));
- assertThat(endpointResult.getDetail().getException().toString(), containsString(expectedException));
- assertThat(endpointResult.getDetail().getResultType(), is(isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR));
-
- latch.countDown();
- return null;
- }).when(endpointResultQueue).failOperation(any(), eq(0));
-
- doAnswer(invocation -> {
- EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0];
- assertThat(endpointResult.getOperationId(), is(expectedDocIdOk));
- assertThat(endpointResult.getDetail().getResultType(), is(Result.ResultType.OPERATION_EXECUTED));
- latch.countDown();
- return null;
- }).when(endpointResultQueue).resultReceived(any(), eq(0));
- }
- private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) {
- return new IOThread(null,
- ENDPOINT,
- endpointResultQueue,
- new SingletonGatewayConnectionFactory(apacheGatewayConnection),
- 0,
- 0,
- maxInFlightRequests,
- Duration.ofMillis(localQueueTimeOut),
- documentQueue,
- 0,
- Duration.ofSeconds(15),
- true,
- 10,
- clock);
- }
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
- @Test
- public void singleDocumentSuccess() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- InputStream serverResponse = new ByteArrayInputStream(
- (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
- setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage);
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
- }
+/**
+ * TODO: Migrate IOThreadTests here.
+ *
+ * @author bratseth
+ */
+public class IOThreadTest {
@Test
- public void testDocumentWriteError() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage));
- setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testSuccessfulWriting() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ assertEquals(0, tester.incomplete());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.send("doc1");
+ tester.send("doc2");
+ tester.send("doc3");
+ assertEquals(3, tester.incomplete());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.tick(1); // connect
+ assertEquals(3, tester.incomplete());
+ tester.tick(1); // sync
+ assertEquals(3, tester.incomplete());
+ tester.tick(1); // process queue
+ assertEquals(0, tester.incomplete());
+ assertEquals(3, tester.success());
+ assertEquals(0, tester.failures());
}
@Test
- public void testTwoDocumentsFirstWriteErrorSecondOk() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- InputStream serverResponse = new ByteArrayInputStream(
- (docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any()))
- .thenThrow(new IOException(exceptionMessage))
- .thenReturn(serverResponse);
- latch = new CountDownLatch(2);
- setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage);
-
- try (IOThread ioThread = createIOThread(10000, 10000)) {
- ioThread.post(doc1);
- ioThread.post(doc2);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testExceptionOnConnect() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry", 0, tester.failures());
}
@Test
- public void testQueueTimeOutNoNoConnectionToServer() throws Exception {
- when(apacheGatewayConnection.connect()).thenReturn(false);
- InputStream serverResponse = new ByteArrayInputStream(("").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
- setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
- "java.lang.Exception: Not sending document operation, timed out in queue after");
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- assert (latch.await(120, TimeUnit.SECONDS));
- }
+ public void testExceptionOnHandshake() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry", 0, tester.failures());
}
@Test
- public void testEndpointProtocolExceptionPropagation()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- int errorCode = 403;
- String errorMessage = "Not authorized";
- doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake();
- Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
-
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
- assertThat(reportedException, instanceOf(FeedProtocolException.class));
- FeedProtocolException actualException = (FeedProtocolException) reportedException;
- assertThat(actualException.getHttpStatusCode(), equalTo(errorCode));
- assertThat(actualException.getHttpResponseMessage(), equalTo(errorMessage));
- assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
- assertThat(actualException.getMessage(), equalTo("Endpoint 'myhost:4080' returned an error on handshake: 403 - Not authorized"));
- }
+ public void testExceptionOnWrite() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ firstConnection.throwOnWrite(new IOException("Test failure"));
+
+ tester.send("doc1");
+ tester.tick(3);
+ assertEquals(1, tester.incomplete());
+ assertEquals(0, ioThread.resultQueue().getPendingSize());
+ assertEquals(0, tester.success());
+ assertEquals("Awaiting retry since write exceptions is a transient failure",
+ 0, tester.failures());
}
@Test
- public void testEndpointConnectExceptionsPropagation()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
- when(apacheGatewayConnection.connect()).thenReturn(true);
- String errorMessage = "generic error message";
- IOException cause = new IOException(errorMessage);
- doThrow(cause).when(apacheGatewayConnection).handshake();
- Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
-
- try (IOThread ioThread = createIOThread(10, 10)) {
- ioThread.post(doc1);
- FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
- assertThat(reportedException, instanceOf(FeedConnectException.class));
- FeedConnectException actualException = (FeedConnectException) reportedException;
- assertThat(actualException.getCause(), equalTo(cause));
- assertThat(actualException.getEndpoint(), equalTo(ENDPOINT));
- assertThat(actualException.getMessage(), equalTo("Handshake to endpoint 'myhost:4080' failed: generic error message"));
- }
- }
-
- private static Future<FeedEndpointException> endpointErrorCapturer(EndpointResultQueue endpointResultQueue) {
- CompletableFuture<FeedEndpointException> futureResult = new CompletableFuture<>();
- doAnswer(invocation -> {
- if (futureResult.isDone()) return null;
- FeedEndpointException reportedException = (FeedEndpointException) invocation.getArguments()[0];
- futureResult.complete(reportedException);
- return null;
- }).when(endpointResultQueue).onEndpointError(any());
- return futureResult;
+ public void testPollingOldConnections() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ tester.tick(3);
+
+ IOThread ioThread = tester.getSingleIOThread();
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ assertEquals(0, ioThread.oldConnections().size());
+
+ firstConnection.hold(true);
+ tester.send("doc1");
+ tester.tick(1);
+
+ tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15
+ tester.tick(3);
+
+ assertEquals(1, ioThread.oldConnections().size());
+ assertEquals(firstConnection, ioThread.oldConnections().get(0));
+ assertNotSame(firstConnection, ioThread.currentConnection());
+ assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000);
+
+ // Check old connection poll pattern (exponential backoff)
+ assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+
+ tester.clock().advance(Duration.ofSeconds(200));
+ tester.tick(1);
+ assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
}
- private static final class SingletonGatewayConnectionFactory implements GatewayConnectionFactory {
-
- private final GatewayConnection singletonConnection;
-
- SingletonGatewayConnectionFactory(GatewayConnection singletonConnection) {
- this.singletonConnection = singletonConnection;
- }
-
- @Override
- public GatewayConnection newConnection() { return singletonConnection; }
-
+ private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
+ int advanceSeconds,
+ DryRunGatewayConnection connection,
+ OperationProcessorTester tester) {
+ tester.clock().advance(Duration.ofSeconds(advanceSeconds));
+ tester.tick(1);
+ assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
}
}