diff options
Diffstat (limited to 'vespa-http-client/src/test/java/com/yahoo')
2 files changed, 213 insertions, 383 deletions
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 59fb968906f..a36a08b0c26 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -1,231 +1,257 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.http.client.core.communication; -import com.yahoo.vespa.http.client.FeedConnectException; +import com.yahoo.vespa.http.client.FeedClient; import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.FeedProtocolException; import com.yahoo.vespa.http.client.ManualClock; import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.V3HttpAPITest; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.SessionParams; import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.EndpointResult; import com.yahoo.vespa.http.client.core.ServerResponseException; +import com.yahoo.vespa.http.client.core.ThrottlePolicy; +import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; +import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import org.junit.Test; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Clock; import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -// DO NOT ADD TESTS HERE, add to NewIOThreadTest +import java.time.Instant; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +/** + * TODO: Migrate IOThreadTests here. + * + * @author bratseth + */ public class IOThreadTest { - private static final Endpoint ENDPOINT = Endpoint.create("myhost"); - - final Clock clock = Clock.systemUTC(); - final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class); - final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class); - final String exceptionMessage = "SOME EXCEPTION FOO"; - CountDownLatch latch = new CountDownLatch(1); - String docId1 = V3HttpAPITest.documents.get(0).getDocumentId(); - Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(), - V3HttpAPITest.documents.get(0).getContents(), - null, - clock.instant()); - String docId2 = V3HttpAPITest.documents.get(1).getDocumentId(); - Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(), - V3HttpAPITest.documents.get(1).getContents(), - null, - clock.instant()); - DocumentQueue documentQueue = new DocumentQueue(4, clock); - - public IOThreadTest() { - when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT); + @Test + public void testSuccessfulWriting() { + OperationProcessorTester tester = new OperationProcessorTester(); + assertEquals(0, tester.incomplete()); + assertEquals(0, tester.success()); + assertEquals(0, tester.failures()); + tester.send("doc1"); + tester.send("doc2"); + tester.send("doc3"); + assertEquals(3, tester.incomplete()); + assertEquals(0, tester.success()); + assertEquals(0, tester.failures()); + tester.tick(1); // connect + assertEquals(3, tester.incomplete()); + tester.tick(1); // sync + assertEquals(3, tester.incomplete()); + tester.tick(1); // process queue + assertEquals(0, tester.incomplete()); + assertEquals(3, tester.success()); + assertEquals(0, tester.failures()); } - /** - * Set up mock so that it can handle both failDocument() and resultReceived(). - * - * @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail. - * @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail. - * @param isTransient checked on failure, if different, the mock will fail. - * @param expectedException checked on failure, if exception toString is different, the mock will fail. - */ - void setupEndpointResultQueueMock(String expectedDocIdFail, String expectedDocIdOk, boolean isTransient, String expectedException) { - doAnswer(invocation -> { - EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0]; - assertThat(endpointResult.getOperationId(), is(expectedDocIdFail)); - assertThat(endpointResult.getDetail().getException().toString(), containsString(expectedException)); - assertThat(endpointResult.getDetail().getResultType(), is(isTransient ? Result.ResultType.TRANSITIVE_ERROR : Result.ResultType.FATAL_ERROR)); - - latch.countDown(); - return null; - }).when(endpointResultQueue).failOperation(any(), eq(0)); - - doAnswer(invocation -> { - EndpointResult endpointResult = (EndpointResult) invocation.getArguments()[0]; - assertThat(endpointResult.getOperationId(), is(expectedDocIdOk)); - assertThat(endpointResult.getDetail().getResultType(), is(Result.ResultType.OPERATION_EXECUTED)); - latch.countDown(); - return null; - }).when(endpointResultQueue).resultReceived(any(), eq(0)); - } + @Test + public void testExceptionOnConnect() { + OperationProcessorTester tester = new OperationProcessorTester(); + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized")); - private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) { - return new IOThread(null, - ENDPOINT, - endpointResultQueue, - new SingletonGatewayConnectionFactory(apacheGatewayConnection), - 0, - 0, - maxInFlightRequests, - Duration.ofMillis(localQueueTimeOut), - documentQueue, - 0, - Duration.ofSeconds(15), - true, - 10, - clock); + tester.send("doc1"); + tester.tick(3); + assertEquals(1, tester.incomplete()); + assertEquals(0, ioThread.resultQueue().getPendingSize()); + assertEquals(0, tester.success()); + assertEquals("Awaiting retry", 0, tester.failures()); } @Test - public void singleDocumentSuccess() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(true); - InputStream serverResponse = new ByteArrayInputStream( - (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8)); - when(apacheGatewayConnection.write(any())).thenReturn(serverResponse); - setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage); - try (IOThread ioThread = createIOThread(10000, 10000)) { - ioThread.post(doc1); - assert (latch.await(120, TimeUnit.SECONDS)); - } + public void testExceptionOnHandshake() { + OperationProcessorTester tester = new OperationProcessorTester(); + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + firstConnection.throwOnHandshake(new ServerResponseException(403, "Not authorized")); + + tester.send("doc1"); + tester.tick(3); + assertEquals(1, tester.incomplete()); + assertEquals(0, ioThread.resultQueue().getPendingSize()); + assertEquals(0, tester.success()); + assertEquals("Awaiting retry", 0, tester.failures()); } @Test - public void testDocumentWriteError() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(true); - when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage)); - setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage); - try (IOThread ioThread = createIOThread(10000, 10000)) { - ioThread.post(doc1); - assert (latch.await(120, TimeUnit.SECONDS)); - } + public void testExceptionOnWrite() { + OperationProcessorTester tester = new OperationProcessorTester(); + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + firstConnection.throwOnWrite(new IOException("Test failure")); + + tester.send("doc1"); + tester.tick(3); + assertEquals(1, tester.incomplete()); + assertEquals(0, ioThread.resultQueue().getPendingSize()); + assertEquals(0, tester.success()); + assertEquals("Awaiting retry since write exceptions is a transient failure", + 0, tester.failures()); } @Test - public void testTwoDocumentsFirstWriteErrorSecondOk() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(true); - InputStream serverResponse = new ByteArrayInputStream( - (docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8)); - when(apacheGatewayConnection.write(any())) - .thenThrow(new IOException(exceptionMessage)) - .thenReturn(serverResponse); - latch = new CountDownLatch(2); - setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage); - - try (IOThread ioThread = createIOThread(10000, 10000)) { - ioThread.post(doc1); - ioThread.post(doc2); - assert (latch.await(120, TimeUnit.SECONDS)); - } + public void testPollingOldConnections() { + OperationProcessorTester tester = new OperationProcessorTester(); + tester.tick(3); + + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + assertEquals(0, ioThread.oldConnections().size()); + + firstConnection.hold(true); + tester.send("doc1"); + tester.tick(1); + + tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15 + tester.tick(3); + + assertEquals(1, ioThread.oldConnections().size()); + assertEquals(firstConnection, ioThread.oldConnections().get(0)); + assertNotSame(firstConnection, ioThread.currentConnection()); + assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000); + + // Check old connection poll pattern (exponential backoff) + assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + + tester.clock().advance(Duration.ofSeconds(200)); + tester.tick(1); + assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); } - @Test - public void testQueueTimeOutNoNoConnectionToServer() throws Exception { - when(apacheGatewayConnection.connect()).thenReturn(false); - InputStream serverResponse = new ByteArrayInputStream(("").getBytes(StandardCharsets.UTF_8)); - when(apacheGatewayConnection.write(any())).thenReturn(serverResponse); - setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, - "java.lang.Exception: Not sending document operation, timed out in queue after"); - try (IOThread ioThread = createIOThread(10, 10)) { - ioThread.post(doc1); - assert (latch.await(120, TimeUnit.SECONDS)); - } + private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, + int advanceSeconds, + DryRunGatewayConnection connection, + OperationProcessorTester tester) { + tester.clock().advance(Duration.ofSeconds(advanceSeconds)); + tester.tick(1); + assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000); } - @Test - public void testEndpointProtocolExceptionPropagation() - throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { - when(apacheGatewayConnection.connect()).thenReturn(true); - int errorCode = 403; - String errorMessage = "Not authorized"; - doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake(); - Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - - try (IOThread ioThread = createIOThread(10, 10)) { - ioThread.post(doc1); - FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); - assertThat(reportedException, instanceOf(FeedProtocolException.class)); - FeedProtocolException actualException = (FeedProtocolException) reportedException; - assertThat(actualException.getHttpStatusCode(), equalTo(errorCode)); - assertThat(actualException.getHttpResponseMessage(), equalTo(errorMessage)); - assertThat(actualException.getEndpoint(), equalTo(ENDPOINT)); - assertThat(actualException.getMessage(), equalTo("Endpoint 'myhost:4080' returned an error on handshake: 403 - Not authorized")); + private static class OperationProcessorTester { + + private final Endpoint endpoint; + private final int clusterId = 0; + private final ManualClock clock; + private final TestResultCallback resultCallback; + private final OperationProcessor operationProcessor; + + public OperationProcessorTester() { + endpoint = Endpoint.create("test-endpoint"); + SessionParams.Builder params = new SessionParams.Builder(); + Cluster.Builder clusterParams = new Cluster.Builder(); + clusterParams.addEndpoint(endpoint); + params.addCluster(clusterParams.build()); + ConnectionParams.Builder connectionParams = new ConnectionParams.Builder(); + connectionParams.setDryRun(true); + connectionParams.setRunThreads(false); + params.setConnectionParams(connectionParams.build()); + + clock = new ManualClock(Instant.ofEpochMilli(0)); + resultCallback = new TestResultCallback(); + operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()), + resultCallback, + params.build(), + new ScheduledThreadPoolExecutor(1), + clock); } - } - @Test - public void testEndpointConnectExceptionsPropagation() - throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { - when(apacheGatewayConnection.connect()).thenReturn(true); - String errorMessage = "generic error message"; - IOException cause = new IOException(errorMessage); - doThrow(cause).when(apacheGatewayConnection).handshake(); - Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - - try (IOThread ioThread = createIOThread(10, 10)) { - ioThread.post(doc1); - FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); - assertThat(reportedException, instanceOf(FeedConnectException.class)); - FeedConnectException actualException = (FeedConnectException) reportedException; - assertThat(actualException.getCause(), equalTo(cause)); - assertThat(actualException.getEndpoint(), equalTo(ENDPOINT)); - assertThat(actualException.getMessage(), equalTo("Handshake to endpoint 'myhost:4080' failed: generic error message")); + public ManualClock clock() { return clock; } + + /** Asserts that this has but a single IOThread and returns it */ + public IOThread getSingleIOThread() { + assertEquals(1, clusterConnections().size()); + assertEquals(1, clusterConnections().get(0).ioThreads().size()); + return clusterConnections().get(0).ioThreads().get(0); + } + + /** Do n iteration of work in all io threads of this */ + public void tick(int n) { + for (int i = 0; i < n; i++) + for (ClusterConnection cluster : operationProcessor.clusters()) + for (IOThread thread : cluster.ioThreads()) + thread.tick(); + } + + public void send(String documentId) { + operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant())); + } + + public int incomplete() { + return operationProcessor.getIncompleteResultQueueSize(); + } + + public int success() { + return resultCallback.successes; + } + + public List<ClusterConnection> clusterConnections() { + return operationProcessor.clusters(); + } + + public int failures() { + return resultCallback.failures; + } + + public int endpointExceptions() { + return resultCallback.endpointExceptions; + } + + public Result lastResult() { + return resultCallback.lastResult; } - } - private static Future<FeedEndpointException> endpointErrorCapturer(EndpointResultQueue endpointResultQueue) { - CompletableFuture<FeedEndpointException> futureResult = new CompletableFuture<>(); - doAnswer(invocation -> { - if (futureResult.isDone()) return null; - FeedEndpointException reportedException = (FeedEndpointException) invocation.getArguments()[0]; - futureResult.complete(reportedException); - return null; - }).when(endpointResultQueue).onEndpointError(any()); - return futureResult; } - private static final class SingletonGatewayConnectionFactory implements GatewayConnectionFactory { + private static class TestResultCallback implements FeedClient.ResultCallback { - private final GatewayConnection singletonConnection; + private int successes = 0; + private int failures = 0; + private int endpointExceptions = 0; + private Result lastResult; - SingletonGatewayConnectionFactory(GatewayConnection singletonConnection) { - this.singletonConnection = singletonConnection; + @Override + public void onCompletion(String docId, Result documentResult) { + this.lastResult = documentResult; + if (documentResult.isSuccess()) + successes++; + else + failures++; } @Override - public GatewayConnection newConnection() { return singletonConnection; } + public void onEndpointException(FeedEndpointException exception) { + endpointExceptions++; + } } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java deleted file mode 100644 index 7bac8f9cd9d..00000000000 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.client.core.communication; - -import com.yahoo.vespa.http.client.FeedClient; -import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.ManualClock; -import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.Document; -import com.yahoo.vespa.http.client.core.EndpointResult; -import com.yahoo.vespa.http.client.core.ThrottlePolicy; -import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; -import org.junit.Test; - -import java.time.Duration; -import java.time.Instant; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; - -/** - * TODO: Migrate IOThreadTests here. - * - * @author bratseth - */ -public class NewIOThreadTest { - - @Test - public void testBasics() { - OperationProcessorTester tester = new OperationProcessorTester(); - assertEquals(0, tester.inflight()); - assertEquals(0, tester.success()); - assertEquals(0, tester.failures()); - tester.send("doc1"); - tester.send("doc2"); - tester.send("doc3"); - assertEquals(3, tester.inflight()); - assertEquals(0, tester.success()); - assertEquals(0, tester.failures()); - tester.success("doc1"); - tester.success("doc2"); - tester.success("doc3"); - assertEquals(0, tester.inflight()); - assertEquals(3, tester.success()); - assertEquals(0, tester.failures()); - } - - @Test - public void testPollingOldConnections() { - OperationProcessorTester tester = new OperationProcessorTester(); - tester.tick(3); - - assertEquals(1, tester.clusterConnections().size()); - assertEquals(1, tester.clusterConnections().get(0).ioThreads().size()); - IOThread ioThread = tester.clusterConnections().get(0).ioThreads().get(0); - DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); - assertEquals(0, ioThread.oldConnections().size()); - - firstConnection.hold(true); - tester.send("doc1"); - tester.tick(1); - - tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15 - tester.tick(3); - - assertEquals(1, ioThread.oldConnections().size()); - assertEquals(firstConnection, ioThread.oldConnections().get(0)); - assertNotSame(firstConnection, ioThread.currentConnection()); - assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000); - - // Check old connection poll pattern (exponential backoff) - assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - - tester.clock().advance(Duration.ofSeconds(200)); - tester.tick(1); - assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); - } - - private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, - int advanceSeconds, - DryRunGatewayConnection connection, - OperationProcessorTester tester) { - tester.clock().advance(Duration.ofSeconds(advanceSeconds)); - tester.tick(1); - assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000); - } - - private static class OperationProcessorTester { - - private final Endpoint endpoint; - private final int clusterId = 0; - private final ManualClock clock; - private final TestResultCallback resultCallback; - private final OperationProcessor operationProcessor; - - public OperationProcessorTester() { - endpoint = Endpoint.create("test-endpoint"); - SessionParams.Builder params = new SessionParams.Builder(); - Cluster.Builder clusterParams = new Cluster.Builder(); - clusterParams.addEndpoint(endpoint); - params.addCluster(clusterParams.build()); - ConnectionParams.Builder connectionParams = new ConnectionParams.Builder(); - connectionParams.setDryRun(true); - connectionParams.setRunThreads(false); - params.setConnectionParams(connectionParams.build()); - - clock = new ManualClock(Instant.ofEpochMilli(0)); - resultCallback = new TestResultCallback(); - operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()), - resultCallback, - params.build(), - new ScheduledThreadPoolExecutor(1), - clock); - } - - public ManualClock clock() { return clock; } - - /** Do n iteration of work in all io threads of this */ - public void tick(int n) { - for (int i = 0; i < n; i++) - for (ClusterConnection cluster : operationProcessor.clusters()) - for (IOThread thread : cluster.ioThreads()) - thread.tick(); - } - - public void send(String documentId) { - operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant())); - } - - public void success(String documentId) { - operationProcessor.resultReceived(new EndpointResult(documentId, new Result.Detail(endpoint)), clusterId); - } - - public int inflight() { - return operationProcessor.getIncompleteResultQueueSize(); - } - - public int success() { - return resultCallback.successes; - } - - public List<ClusterConnection> clusterConnections() { - return operationProcessor.clusters(); - } - - public int failures() { - return resultCallback.failures; - } - - } - - private static class TestResultCallback implements FeedClient.ResultCallback { - - private int successes = 0; - private int failures = 0; - - @Override - public void onCompletion(String docId, Result documentResult) { - successes++; - } - - @Override - public void onEndpointException(FeedEndpointException exception) { - failures++; - } - - - } - -} |