From 500e792bd97b3086a65cc4fe24f318371bcfd4de Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Tue, 1 Sep 2020 15:40:47 +0200 Subject: Pull up OperationProcessorTester --- .../vespa/http/client/core/ThrottlePolicy.java | 5 +- .../http/client/core/communication/IOThread.java | 2 +- .../http/client/core/OperationProcessorTester.java | 125 +++++++++++++++++++++ .../client/core/communication/IOThreadTest.java | 112 +----------------- .../IncompleteResultsThrottlerTest.java | 10 +- 5 files changed, 135 insertions(+), 119 deletions(-) create mode 100644 vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java (limited to 'vespa-http-client') diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java index a4bd5d51496..ee107a7d4d2 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java @@ -7,6 +7,7 @@ import static java.lang.Math.min; /** * Class that has a method for finding next maxInFlight. + * * @author dybis */ public class ThrottlePolicy { @@ -16,6 +17,7 @@ public class ThrottlePolicy { /** * Generate nex in-flight value for throttling. + * * @param maxPerformanceChange This value limit the dynamics of the algorithm. * @param numOk number of success in last phase * @param previousNumOk number of success in previous (before last) phase. @@ -27,8 +29,7 @@ public class ThrottlePolicy { public int calcNewMaxInFlight(double maxPerformanceChange, int numOk, int previousNumOk, int previousMaxInFlight, int maxInFlightNow, boolean messagesQueued) { - double difference = calculateRuleBasedDifference( - maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow); + double difference = calculateRuleBasedDifference(maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow); boolean previousRunWasBetter = numOk < previousNumOk; boolean previousRunHadLessInFlight = previousMaxInFlight < maxInFlightNow; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 4796f4bb4a8..652f65489db 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -34,7 +34,7 @@ import java.util.logging.Logger; * * @author Einar M R Rosenvinge */ -class IOThread implements Runnable, AutoCloseable { +public class IOThread implements Runnable, AutoCloseable { private static final Logger log = Logger.getLogger(IOThread.class.getName()); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java new file mode 100644 index 00000000000..9b563e193d5 --- /dev/null +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java @@ -0,0 +1,125 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client.core; + +import com.yahoo.vespa.http.client.FeedClient; +import com.yahoo.vespa.http.client.FeedEndpointException; +import com.yahoo.vespa.http.client.ManualClock; +import com.yahoo.vespa.http.client.Result; +import com.yahoo.vespa.http.client.config.Cluster; +import com.yahoo.vespa.http.client.config.ConnectionParams; +import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.SessionParams; +import com.yahoo.vespa.http.client.core.communication.ClusterConnection; +import com.yahoo.vespa.http.client.core.communication.IOThread; +import com.yahoo.vespa.http.client.core.communication.IOThreadTest; +import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; +import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; + +import java.time.Instant; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; + +/** + * Helper for testing with an operation processor + * + * @author bratseth + */ +public class OperationProcessorTester { + + private final Endpoint endpoint; + private final int clusterId = 0; + private final ManualClock clock; + private final TestResultCallback resultCallback; + private final OperationProcessor operationProcessor; + + public OperationProcessorTester() { + endpoint = Endpoint.create("test-endpoint"); + SessionParams.Builder params = new SessionParams.Builder(); + Cluster.Builder clusterParams = new Cluster.Builder(); + clusterParams.addEndpoint(endpoint); + params.addCluster(clusterParams.build()); + ConnectionParams.Builder connectionParams = new ConnectionParams.Builder(); + connectionParams.setDryRun(true); + connectionParams.setRunThreads(false); + params.setConnectionParams(connectionParams.build()); + + clock = new ManualClock(Instant.ofEpochMilli(0)); + resultCallback = new TestResultCallback(); + operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()), + resultCallback, + params.build(), + new ScheduledThreadPoolExecutor(1), + clock); + } + + public ManualClock clock() { return clock; } + + /** Asserts that this has but a single IOThread and returns it */ + public IOThread getSingleIOThread() { + assertEquals(1, clusterConnections().size()); + assertEquals(1, clusterConnections().get(0).ioThreads().size()); + return clusterConnections().get(0).ioThreads().get(0); + } + + /** Do n iteration of work in all io threads of this */ + public void tick(int n) { + for (int i = 0; i < n; i++) + for (ClusterConnection cluster : operationProcessor.clusters()) + for (IOThread thread : cluster.ioThreads()) + thread.tick(); + } + + public void send(String documentId) { + operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant())); + } + + public int incomplete() { + return operationProcessor.getIncompleteResultQueueSize(); + } + + public int success() { + return resultCallback.successes; + } + + public List clusterConnections() { + return operationProcessor.clusters(); + } + + public int failures() { + return resultCallback.failures; + } + + public int endpointExceptions() { + return resultCallback.endpointExceptions; + } + + public Result lastResult() { + return resultCallback.lastResult; + } + + private static class TestResultCallback implements FeedClient.ResultCallback { + + private int successes = 0; + private int failures = 0; + private int endpointExceptions = 0; + private Result lastResult; + + @Override + public void onCompletion(String docId, Result documentResult) { + this.lastResult = documentResult; + if (documentResult.isSuccess()) + successes++; + else + failures++; + } + + @Override + public void onEndpointException(FeedEndpointException exception) { + endpointExceptions++; + } + + } + +} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index a36a08b0c26..e684c929fda 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -3,24 +3,14 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.FeedClient; import com.yahoo.vespa.http.client.FeedEndpointException; -import com.yahoo.vespa.http.client.ManualClock; import com.yahoo.vespa.http.client.Result; -import com.yahoo.vespa.http.client.config.Cluster; -import com.yahoo.vespa.http.client.config.ConnectionParams; -import com.yahoo.vespa.http.client.config.Endpoint; -import com.yahoo.vespa.http.client.config.SessionParams; -import com.yahoo.vespa.http.client.core.Document; +import com.yahoo.vespa.http.client.core.OperationProcessorTester; import com.yahoo.vespa.http.client.core.ServerResponseException; -import com.yahoo.vespa.http.client.core.ThrottlePolicy; -import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler; -import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import org.junit.Test; import java.io.IOException; import java.time.Duration; -import java.time.Instant; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; @@ -157,102 +147,4 @@ public class IOThreadTest { assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000); } - private static class OperationProcessorTester { - - private final Endpoint endpoint; - private final int clusterId = 0; - private final ManualClock clock; - private final TestResultCallback resultCallback; - private final OperationProcessor operationProcessor; - - public OperationProcessorTester() { - endpoint = Endpoint.create("test-endpoint"); - SessionParams.Builder params = new SessionParams.Builder(); - Cluster.Builder clusterParams = new Cluster.Builder(); - clusterParams.addEndpoint(endpoint); - params.addCluster(clusterParams.build()); - ConnectionParams.Builder connectionParams = new ConnectionParams.Builder(); - connectionParams.setDryRun(true); - connectionParams.setRunThreads(false); - params.setConnectionParams(connectionParams.build()); - - clock = new ManualClock(Instant.ofEpochMilli(0)); - resultCallback = new TestResultCallback(); - operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()), - resultCallback, - params.build(), - new ScheduledThreadPoolExecutor(1), - clock); - } - - public ManualClock clock() { return clock; } - - /** Asserts that this has but a single IOThread and returns it */ - public IOThread getSingleIOThread() { - assertEquals(1, clusterConnections().size()); - assertEquals(1, clusterConnections().get(0).ioThreads().size()); - return clusterConnections().get(0).ioThreads().get(0); - } - - /** Do n iteration of work in all io threads of this */ - public void tick(int n) { - for (int i = 0; i < n; i++) - for (ClusterConnection cluster : operationProcessor.clusters()) - for (IOThread thread : cluster.ioThreads()) - thread.tick(); - } - - public void send(String documentId) { - operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant())); - } - - public int incomplete() { - return operationProcessor.getIncompleteResultQueueSize(); - } - - public int success() { - return resultCallback.successes; - } - - public List clusterConnections() { - return operationProcessor.clusters(); - } - - public int failures() { - return resultCallback.failures; - } - - public int endpointExceptions() { - return resultCallback.endpointExceptions; - } - - public Result lastResult() { - return resultCallback.lastResult; - } - - } - - private static class TestResultCallback implements FeedClient.ResultCallback { - - private int successes = 0; - private int failures = 0; - private int endpointExceptions = 0; - private Result lastResult; - - @Override - public void onCompletion(String docId, Result documentResult) { - this.lastResult = documentResult; - if (documentResult.isSuccess()) - successes++; - else - failures++; - } - - @Override - public void onEndpointException(FeedEndpointException exception) { - endpointExceptions++; - } - - } - } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java index e4970fa28fe..9ea66b57fb3 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java @@ -13,9 +13,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Random; -import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.eq; @@ -27,14 +25,14 @@ public class IncompleteResultsThrottlerTest { @Test public void simpleStaticQueueSizeTest() { IncompleteResultsThrottler incompleteResultsThrottler = new IncompleteResultsThrottler(2, 2, null, null); - assertThat(incompleteResultsThrottler.waitingThreads(), is(0)); + assertEquals(0, incompleteResultsThrottler.waitingThreads()); incompleteResultsThrottler.operationStart(); incompleteResultsThrottler.operationStart(); - assertThat(incompleteResultsThrottler.waitingThreads(), is(2)); + assertEquals(2, incompleteResultsThrottler.waitingThreads()); incompleteResultsThrottler.resultReady(true); - assertThat(incompleteResultsThrottler.waitingThreads(), is(1)); + assertEquals(1, incompleteResultsThrottler.waitingThreads()); incompleteResultsThrottler.resultReady(true); - assertThat(incompleteResultsThrottler.waitingThreads(), is(0)); + assertEquals(0, incompleteResultsThrottler.waitingThreads()); } /** -- cgit v1.2.3