aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java')
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java196
1 files changed, 0 insertions, 196 deletions
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
deleted file mode 100644
index 7bac8f9cd9d..00000000000
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.client.core.communication;
-
-import com.yahoo.vespa.http.client.FeedClient;
-import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.ManualClock;
-import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.Document;
-import com.yahoo.vespa.http.client.core.EndpointResult;
-import com.yahoo.vespa.http.client.core.ThrottlePolicy;
-import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-
-/**
- * TODO: Migrate IOThreadTests here.
- *
- * @author bratseth
- */
-public class NewIOThreadTest {
-
- @Test
- public void testBasics() {
- OperationProcessorTester tester = new OperationProcessorTester();
- assertEquals(0, tester.inflight());
- assertEquals(0, tester.success());
- assertEquals(0, tester.failures());
- tester.send("doc1");
- tester.send("doc2");
- tester.send("doc3");
- assertEquals(3, tester.inflight());
- assertEquals(0, tester.success());
- assertEquals(0, tester.failures());
- tester.success("doc1");
- tester.success("doc2");
- tester.success("doc3");
- assertEquals(0, tester.inflight());
- assertEquals(3, tester.success());
- assertEquals(0, tester.failures());
- }
-
- @Test
- public void testPollingOldConnections() {
- OperationProcessorTester tester = new OperationProcessorTester();
- tester.tick(3);
-
- assertEquals(1, tester.clusterConnections().size());
- assertEquals(1, tester.clusterConnections().get(0).ioThreads().size());
- IOThread ioThread = tester.clusterConnections().get(0).ioThreads().get(0);
- DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
- assertEquals(0, ioThread.oldConnections().size());
-
- firstConnection.hold(true);
- tester.send("doc1");
- tester.tick(1);
-
- tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15
- tester.tick(3);
-
- assertEquals(1, ioThread.oldConnections().size());
- assertEquals(firstConnection, ioThread.oldConnections().get(0));
- assertNotSame(firstConnection, ioThread.currentConnection());
- assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000);
-
- // Check old connection poll pattern (exponential backoff)
- assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
- assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
-
- tester.clock().advance(Duration.ofSeconds(200));
- tester.tick(1);
- assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
- }
-
- private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
- int advanceSeconds,
- DryRunGatewayConnection connection,
- OperationProcessorTester tester) {
- tester.clock().advance(Duration.ofSeconds(advanceSeconds));
- tester.tick(1);
- assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
- }
-
- private static class OperationProcessorTester {
-
- private final Endpoint endpoint;
- private final int clusterId = 0;
- private final ManualClock clock;
- private final TestResultCallback resultCallback;
- private final OperationProcessor operationProcessor;
-
- public OperationProcessorTester() {
- endpoint = Endpoint.create("test-endpoint");
- SessionParams.Builder params = new SessionParams.Builder();
- Cluster.Builder clusterParams = new Cluster.Builder();
- clusterParams.addEndpoint(endpoint);
- params.addCluster(clusterParams.build());
- ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
- connectionParams.setDryRun(true);
- connectionParams.setRunThreads(false);
- params.setConnectionParams(connectionParams.build());
-
- clock = new ManualClock(Instant.ofEpochMilli(0));
- resultCallback = new TestResultCallback();
- operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
- resultCallback,
- params.build(),
- new ScheduledThreadPoolExecutor(1),
- clock);
- }
-
- public ManualClock clock() { return clock; }
-
- /** Do n iteration of work in all io threads of this */
- public void tick(int n) {
- for (int i = 0; i < n; i++)
- for (ClusterConnection cluster : operationProcessor.clusters())
- for (IOThread thread : cluster.ioThreads())
- thread.tick();
- }
-
- public void send(String documentId) {
- operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
- }
-
- public void success(String documentId) {
- operationProcessor.resultReceived(new EndpointResult(documentId, new Result.Detail(endpoint)), clusterId);
- }
-
- public int inflight() {
- return operationProcessor.getIncompleteResultQueueSize();
- }
-
- public int success() {
- return resultCallback.successes;
- }
-
- public List<ClusterConnection> clusterConnections() {
- return operationProcessor.clusters();
- }
-
- public int failures() {
- return resultCallback.failures;
- }
-
- }
-
- private static class TestResultCallback implements FeedClient.ResultCallback {
-
- private int successes = 0;
- private int failures = 0;
-
- @Override
- public void onCompletion(String docId, Result documentResult) {
- successes++;
- }
-
- @Override
- public void onEndpointException(FeedEndpointException exception) {
- failures++;
- }
-
-
- }
-
-}