aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-09-01 15:40:47 +0200
committerJon Bratseth <bratseth@gmail.com>2020-09-01 15:40:47 +0200
commit500e792bd97b3086a65cc4fe24f318371bcfd4de (patch)
treee7b0a8dcd3ad7b0f7bd39f562f66012345038b8d /vespa-http-client
parentd6cce38048caa708eac01364f06510ba0e4172a1 (diff)
Pull up OperationProcessorTester
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java2
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java125
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java112
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java10
5 files changed, 135 insertions, 119 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
index a4bd5d51496..ee107a7d4d2 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/ThrottlePolicy.java
@@ -7,6 +7,7 @@ import static java.lang.Math.min;
/**
* Class that has a method for finding next maxInFlight.
+ *
* @author dybis
*/
public class ThrottlePolicy {
@@ -16,6 +17,7 @@ public class ThrottlePolicy {
/**
* Generate nex in-flight value for throttling.
+ *
* @param maxPerformanceChange This value limit the dynamics of the algorithm.
* @param numOk number of success in last phase
* @param previousNumOk number of success in previous (before last) phase.
@@ -27,8 +29,7 @@ public class ThrottlePolicy {
public int calcNewMaxInFlight(double maxPerformanceChange, int numOk, int previousNumOk, int previousMaxInFlight,
int maxInFlightNow, boolean messagesQueued) {
- double difference = calculateRuleBasedDifference(
- maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow);
+ double difference = calculateRuleBasedDifference(maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow);
boolean previousRunWasBetter = numOk < previousNumOk;
boolean previousRunHadLessInFlight = previousMaxInFlight < maxInFlightNow;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 4796f4bb4a8..652f65489db 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -34,7 +34,7 @@ import java.util.logging.Logger;
*
* @author Einar M R Rosenvinge
*/
-class IOThread implements Runnable, AutoCloseable {
+public class IOThread implements Runnable, AutoCloseable {
private static final Logger log = Logger.getLogger(IOThread.class.getName());
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java
new file mode 100644
index 00000000000..9b563e193d5
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/OperationProcessorTester.java
@@ -0,0 +1,125 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core;
+
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedEndpointException;
+import com.yahoo.vespa.http.client.ManualClock;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.core.communication.ClusterConnection;
+import com.yahoo.vespa.http.client.core.communication.IOThread;
+import com.yahoo.vespa.http.client.core.communication.IOThreadTest;
+import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
+import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Helper for testing with an operation processor
+ *
+ * @author bratseth
+ */
+public class OperationProcessorTester {
+
+ private final Endpoint endpoint;
+ private final int clusterId = 0;
+ private final ManualClock clock;
+ private final TestResultCallback resultCallback;
+ private final OperationProcessor operationProcessor;
+
+ public OperationProcessorTester() {
+ endpoint = Endpoint.create("test-endpoint");
+ SessionParams.Builder params = new SessionParams.Builder();
+ Cluster.Builder clusterParams = new Cluster.Builder();
+ clusterParams.addEndpoint(endpoint);
+ params.addCluster(clusterParams.build());
+ ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
+ connectionParams.setDryRun(true);
+ connectionParams.setRunThreads(false);
+ params.setConnectionParams(connectionParams.build());
+
+ clock = new ManualClock(Instant.ofEpochMilli(0));
+ resultCallback = new TestResultCallback();
+ operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
+ resultCallback,
+ params.build(),
+ new ScheduledThreadPoolExecutor(1),
+ clock);
+ }
+
+ public ManualClock clock() { return clock; }
+
+ /** Asserts that this has but a single IOThread and returns it */
+ public IOThread getSingleIOThread() {
+ assertEquals(1, clusterConnections().size());
+ assertEquals(1, clusterConnections().get(0).ioThreads().size());
+ return clusterConnections().get(0).ioThreads().get(0);
+ }
+
+ /** Do n iteration of work in all io threads of this */
+ public void tick(int n) {
+ for (int i = 0; i < n; i++)
+ for (ClusterConnection cluster : operationProcessor.clusters())
+ for (IOThread thread : cluster.ioThreads())
+ thread.tick();
+ }
+
+ public void send(String documentId) {
+ operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
+ }
+
+ public int incomplete() {
+ return operationProcessor.getIncompleteResultQueueSize();
+ }
+
+ public int success() {
+ return resultCallback.successes;
+ }
+
+ public List<ClusterConnection> clusterConnections() {
+ return operationProcessor.clusters();
+ }
+
+ public int failures() {
+ return resultCallback.failures;
+ }
+
+ public int endpointExceptions() {
+ return resultCallback.endpointExceptions;
+ }
+
+ public Result lastResult() {
+ return resultCallback.lastResult;
+ }
+
+ private static class TestResultCallback implements FeedClient.ResultCallback {
+
+ private int successes = 0;
+ private int failures = 0;
+ private int endpointExceptions = 0;
+ private Result lastResult;
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ this.lastResult = documentResult;
+ if (documentResult.isSuccess())
+ successes++;
+ else
+ failures++;
+ }
+
+ @Override
+ public void onEndpointException(FeedEndpointException exception) {
+ endpointExceptions++;
+ }
+
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index a36a08b0c26..e684c929fda 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -3,24 +3,14 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedEndpointException;
-import com.yahoo.vespa.http.client.ManualClock;
import com.yahoo.vespa.http.client.Result;
-import com.yahoo.vespa.http.client.config.Cluster;
-import com.yahoo.vespa.http.client.config.ConnectionParams;
-import com.yahoo.vespa.http.client.config.Endpoint;
-import com.yahoo.vespa.http.client.config.SessionParams;
-import com.yahoo.vespa.http.client.core.Document;
+import com.yahoo.vespa.http.client.core.OperationProcessorTester;
import com.yahoo.vespa.http.client.core.ServerResponseException;
-import com.yahoo.vespa.http.client.core.ThrottlePolicy;
-import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
-import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
-import java.time.Instant;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
@@ -157,102 +147,4 @@ public class IOThreadTest {
assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
}
- private static class OperationProcessorTester {
-
- private final Endpoint endpoint;
- private final int clusterId = 0;
- private final ManualClock clock;
- private final TestResultCallback resultCallback;
- private final OperationProcessor operationProcessor;
-
- public OperationProcessorTester() {
- endpoint = Endpoint.create("test-endpoint");
- SessionParams.Builder params = new SessionParams.Builder();
- Cluster.Builder clusterParams = new Cluster.Builder();
- clusterParams.addEndpoint(endpoint);
- params.addCluster(clusterParams.build());
- ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
- connectionParams.setDryRun(true);
- connectionParams.setRunThreads(false);
- params.setConnectionParams(connectionParams.build());
-
- clock = new ManualClock(Instant.ofEpochMilli(0));
- resultCallback = new TestResultCallback();
- operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
- resultCallback,
- params.build(),
- new ScheduledThreadPoolExecutor(1),
- clock);
- }
-
- public ManualClock clock() { return clock; }
-
- /** Asserts that this has but a single IOThread and returns it */
- public IOThread getSingleIOThread() {
- assertEquals(1, clusterConnections().size());
- assertEquals(1, clusterConnections().get(0).ioThreads().size());
- return clusterConnections().get(0).ioThreads().get(0);
- }
-
- /** Do n iteration of work in all io threads of this */
- public void tick(int n) {
- for (int i = 0; i < n; i++)
- for (ClusterConnection cluster : operationProcessor.clusters())
- for (IOThread thread : cluster.ioThreads())
- thread.tick();
- }
-
- public void send(String documentId) {
- operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
- }
-
- public int incomplete() {
- return operationProcessor.getIncompleteResultQueueSize();
- }
-
- public int success() {
- return resultCallback.successes;
- }
-
- public List<ClusterConnection> clusterConnections() {
- return operationProcessor.clusters();
- }
-
- public int failures() {
- return resultCallback.failures;
- }
-
- public int endpointExceptions() {
- return resultCallback.endpointExceptions;
- }
-
- public Result lastResult() {
- return resultCallback.lastResult;
- }
-
- }
-
- private static class TestResultCallback implements FeedClient.ResultCallback {
-
- private int successes = 0;
- private int failures = 0;
- private int endpointExceptions = 0;
- private Result lastResult;
-
- @Override
- public void onCompletion(String docId, Result documentResult) {
- this.lastResult = documentResult;
- if (documentResult.isSuccess())
- successes++;
- else
- failures++;
- }
-
- @Override
- public void onEndpointException(FeedEndpointException exception) {
- endpointExceptions++;
- }
-
- }
-
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
index e4970fa28fe..9ea66b57fb3 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
@@ -13,9 +13,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Random;
-import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.eq;
@@ -27,14 +25,14 @@ public class IncompleteResultsThrottlerTest {
@Test
public void simpleStaticQueueSizeTest() {
IncompleteResultsThrottler incompleteResultsThrottler = new IncompleteResultsThrottler(2, 2, null, null);
- assertThat(incompleteResultsThrottler.waitingThreads(), is(0));
+ assertEquals(0, incompleteResultsThrottler.waitingThreads());
incompleteResultsThrottler.operationStart();
incompleteResultsThrottler.operationStart();
- assertThat(incompleteResultsThrottler.waitingThreads(), is(2));
+ assertEquals(2, incompleteResultsThrottler.waitingThreads());
incompleteResultsThrottler.resultReady(true);
- assertThat(incompleteResultsThrottler.waitingThreads(), is(1));
+ assertEquals(1, incompleteResultsThrottler.waitingThreads());
incompleteResultsThrottler.resultReady(true);
- assertThat(incompleteResultsThrottler.waitingThreads(), is(0));
+ assertEquals(0, incompleteResultsThrottler.waitingThreads());
}
/**