aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-08-31 13:03:54 +0200
committerJon Bratseth <bratseth@gmail.com>2020-08-31 13:03:54 +0200
commit136b145ab697400b6fa100bdedb5d78f15e2b2ac (patch)
treeac871496bec78a52bfaf810cab29bcc2bc7dbe8a /vespa-http-client
parent72bdb502a00cef23150d61e9c309938121763343 (diff)
Test old connection polling
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java24
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java6
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java28
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java11
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java85
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java3
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java3
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java12
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java3
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java27
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java192
12 files changed, 327 insertions, 80 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
index 6682f6ff1d0..2417a4acf71 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
@@ -42,6 +42,7 @@ public final class ConnectionParams {
private int maxRetries = 100;
private long minTimeBetweenRetriesMs = 700;
private boolean dryRun = false;
+ private boolean runThreads = true;
private int traceLevel = 0;
private int traceEveryXOperation = 0;
private boolean printTraceToStdErr = true;
@@ -191,10 +192,8 @@ public final class ConnectionParams {
}
/**
- * Don't send data to gateway, just pretend that everything is fine.
- *
- * @param dryRun true if enabled.
- * @return pointer to builder.
+ * Set to true to skip making network connections and instead
+ * let requests complete successfully with no effect.
*/
public Builder setDryRun(boolean dryRun) {
this.dryRun = dryRun;
@@ -202,6 +201,15 @@ public final class ConnectionParams {
}
/**
+ * Set to false to skip starting io threads, such that any operation must be driven by a calling thread.
+ * Useful for testing.
+ */
+ public Builder setRunThreads(boolean runThreads) {
+ this.runThreads = runThreads;
+ return this;
+ }
+
+ /**
* Set the min time between retries when temporarily failing against a gateway.
*
* @param minTimeBetweenRetries the min time value
@@ -274,6 +282,7 @@ public final class ConnectionParams {
maxRetries,
minTimeBetweenRetriesMs,
dryRun,
+ runThreads,
traceLevel,
traceEveryXOperation,
printTraceToStdErr,
@@ -293,6 +302,8 @@ public final class ConnectionParams {
return dryRun;
}
+ public boolean runThreads() { return runThreads; }
+
public int getMaxRetries() {
return maxRetries;
}
@@ -345,6 +356,7 @@ public final class ConnectionParams {
private final int maxRetries;
private final long minTimeBetweenRetriesMs;
private final boolean dryRun;
+ private final boolean runThreads;
private final int traceLevel;
private final int traceEveryXOperation;
private final boolean printTraceToStdErr;
@@ -364,6 +376,7 @@ public final class ConnectionParams {
int maxRetries,
long minTimeBetweenRetriesMs,
boolean dryRun,
+ boolean runThreads,
int traceLevel,
int traceEveryXOperation,
boolean printTraceToStdErr,
@@ -385,6 +398,7 @@ public final class ConnectionParams {
this.maxRetries = maxRetries;
this.minTimeBetweenRetriesMs = minTimeBetweenRetriesMs;
this.dryRun = dryRun;
+ this.runThreads = runThreads;
this.traceLevel = traceLevel;
this.traceEveryXOperation = traceEveryXOperation;
this.printTraceToStdErr = printTraceToStdErr;
@@ -436,6 +450,8 @@ public final class ConnectionParams {
return dryRun;
}
+ public boolean runThreads() { return runThreads; }
+
public int getTraceLevel() {
return traceLevel;
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
index 3131206f148..bf07e3ea634 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
@@ -141,13 +141,12 @@ public final class SessionParams {
private final ErrorReporter errorReport;
private int throttlerMinSize;
- private SessionParams(
- Collection<Cluster> clusters,
- FeedParams feedParams,
- ConnectionParams connectionParams,
- int clientQueueSize,
- ErrorReporter errorReporter,
- int throttlerMinSize) {
+ private SessionParams(Collection<Cluster> clusters,
+ FeedParams feedParams,
+ ConnectionParams connectionParams,
+ int clientQueueSize,
+ ErrorReporter errorReporter,
+ int throttlerMinSize) {
this.clusters = Collections.unmodifiableList(new ArrayList<>(clusters));
this.feedParams = feedParams;
this.connectionParams = connectionParams;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index a232ceeacf5..8e55e59b3f4 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -17,6 +17,7 @@ import java.io.StringWriter;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -93,6 +94,7 @@ public class ClusterConnection implements AutoCloseable {
documentQueue,
feedParams.getMaxSleepTimeMs(),
connectionParams.getConnectionTimeToLive(),
+ connectionParams.runThreads(),
idlePollFrequency,
clock);
ioThreads.add(ioThread);
@@ -167,6 +169,10 @@ public class ClusterConnection implements AutoCloseable {
return stringWriter.toString();
}
+ public List<IOThread> ioThreads() {
+ return Collections.unmodifiableList(ioThreads);
+ }
+
@Override
public boolean equals(Object o) {
return (this == o) || (o instanceof ClusterConnection && clusterId == ((ClusterConnection) o).clusterId);
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index cfd4c1003de..129fc000271 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -12,6 +12,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -26,6 +27,10 @@ public class DryRunGatewayConnection implements GatewayConnection {
private Instant connectionTime = null;
private Instant lastPollTime = null;
+ /** Set to true to hold off responding with a result to any incoming operations until this is set false */
+ private boolean hold = false;
+ private List<Document> held = new ArrayList<>();
+
public DryRunGatewayConnection(Endpoint endpoint, Clock clock) {
this.endpoint = endpoint;
this.clock = clock;
@@ -34,13 +39,23 @@ public class DryRunGatewayConnection implements GatewayConnection {
@Override
public InputStream write(List<Document> docs) {
StringBuilder result = new StringBuilder();
- for (Document doc : docs) {
- OperationStatus operationStatus = new OperationStatus("ok", doc.getOperationId(), ErrorCode.OK, false, "");
- result.append(operationStatus.render());
+ if (hold) {
+ held.addAll(docs);
+ }
+ else {
+ for (Document doc : held)
+ result.append(okResponse(doc).render());
+ held.clear();
+ for (Document doc : docs)
+ result.append(okResponse(doc).render());
}
return new ByteArrayInputStream(result.toString().getBytes(StandardCharsets.UTF_8));
}
+ public void hold(boolean hold) {
+ this.hold = hold;
+ }
+
@Override
public InputStream poll() {
lastPollTime = clock.instant();
@@ -75,4 +90,11 @@ public class DryRunGatewayConnection implements GatewayConnection {
@Override
public void close() { }
+ /** Returns the document currently held in this */
+ public List<Document> held() { return Collections.unmodifiableList(held); }
+
+ private OperationStatus okResponse(Document document) {
+ return new OperationStatus("ok", document.getOperationId(), ErrorCode.OK, false, "");
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
index c4ee2f58b65..1dd8b3bf3ec 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
@@ -33,12 +33,11 @@ class EndpointResultQueue {
private final ScheduledThreadPoolExecutor timer;
private final long totalTimeoutMs;
- EndpointResultQueue(
- OperationProcessor operationProcessor,
- Endpoint endpoint,
- int clusterId,
- ScheduledThreadPoolExecutor timer,
- long totalTimeoutMs) {
+ EndpointResultQueue(OperationProcessor operationProcessor,
+ Endpoint endpoint,
+ int clusterId,
+ ScheduledThreadPoolExecutor timer,
+ long totalTimeoutMs) {
this.operationProcessor = operationProcessor;
this.endpoint = endpoint;
this.clusterId = clusterId;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 99aff8b4baa..c7c94587640 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -18,6 +18,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -41,6 +42,8 @@ class IOThread implements Runnable, AutoCloseable {
private final GatewayConnectionFactory connectionFactory;
private final DocumentQueue documentQueue;
private final EndpointResultQueue resultQueue;
+
+ /** The thread running this, or null if it does not run a thread (meaning tick() must be called from the outside) */
private final Thread thread;
private final int clusterId;
private final CountDownLatch running = new CountDownLatch(1);
@@ -56,6 +59,7 @@ class IOThread implements Runnable, AutoCloseable {
private final Random random = new Random();
private GatewayConnection currentConnection;
+ private ConnectionState connectionState = ConnectionState.DISCONNECTED;
/**
* Previous connections on which we have sent operations and are still waiting for the result
@@ -87,6 +91,7 @@ class IOThread implements Runnable, AutoCloseable {
DocumentQueue documentQueue,
long maxSleepTimeMs,
Duration connectionTimeToLive,
+ boolean runThreads,
double idlePollFrequency,
Clock clock) {
this.endpoint = endpoint;
@@ -101,11 +106,18 @@ class IOThread implements Runnable, AutoCloseable {
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
this.clock = clock;
- this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
- thread.setDaemon(true);
this.localQueueTimeOut = localQueueTimeOut;
- this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10);
- thread.start();
+ this.maxOldConnectionPollInterval = localQueueTimeOut.dividedBy(10).toMillis() > pollIntervalUS / 1000
+ ? localQueueTimeOut.dividedBy(10)
+ : Duration.ofMillis(pollIntervalUS / 1000);
+ if (runThreads) {
+ this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
+ thread.setDaemon(true);
+ thread.start();
+ }
+ else {
+ this.thread = null;
+ }
}
public Endpoint getEndpoint() {
@@ -173,7 +185,7 @@ class IOThread implements Runnable, AutoCloseable {
int chunkSizeBytes = 0;
try {
drainFirstDocumentsInQueueIfOld();
- Document doc = documentQueue.poll(maxWaitUnits, timeUnit);
+ Document doc = thread != null ? documentQueue.poll(maxWaitUnits, timeUnit) : documentQueue.poll();
if (doc != null) {
docsForSendChunk.add(doc);
chunkSizeBytes = doc.size();
@@ -371,18 +383,6 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private boolean isStale(GatewayConnection connection) {
- return connection.connectionTime() != null
- && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant());
- }
-
- private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
- if (currentConnectionState == ConnectionState.SESSION_SYNCED)
- oldConnections.add(currentConnection);
- currentConnection = connectionFactory.newConnection();
- return ConnectionState.DISCONNECTED;
- }
-
private void sleepIfProblemsGettingSyncedConnection(ConnectionState newState, ConnectionState oldState) {
if (newState == ConnectionState.SESSION_SYNCED) return;
if (newState == ConnectionState.CONNECTED && oldState == ConnectionState.DISCONNECTED) return;
@@ -397,17 +397,19 @@ class IOThread implements Runnable, AutoCloseable {
@Override
public void run() {
- ConnectionState connectionState = ConnectionState.DISCONNECTED;
- while (stopSignal.getCount() > 0 || !documentQueue.isEmpty()) {
- ConnectionState oldState = connectionState;
- connectionState = cycle(connectionState);
- checkOldConnections();
- sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
-
- }
+ while (stopSignal.getCount() > 0 || !documentQueue.isEmpty())
+ tick();
log.finer(toString() + " exiting, documentQueue.size()=" + documentQueue.size());
running.countDown();
+ }
+ /** Do one iteration of work. Should be called from the single worker thread of this. */
+ public void tick() {
+ ConnectionState oldState = connectionState;
+ connectionState = cycle(connectionState);
+ checkOldConnections();
+ if (thread != null)
+ sleepIfProblemsGettingSyncedConnection(connectionState, oldState);
}
private void drainFirstDocumentsInQueueIfOld() {
@@ -434,28 +436,33 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private void checkOldConnections() {
- if (resultQueue.getPendingSize() == 0) {
- oldConnections.forEach(c -> c.close());
- oldConnections.clear();
- return;
- }
+ private boolean isStale(GatewayConnection connection) {
+ return connection.connectionTime() != null
+ && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant());
+ }
+
+ private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
+ if (currentConnectionState == ConnectionState.SESSION_SYNCED)
+ oldConnections.add(currentConnection);
+ currentConnection = connectionFactory.newConnection();
+ return ConnectionState.DISCONNECTED;
+ }
+ private void checkOldConnections() {
for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) {
GatewayConnection connection = i.next();
if (closingTime(connection).isBefore(clock.instant())) {
connection.close();
- i.remove();;
+ i.remove();
}
else if (timeToPoll(connection)) {
try {
processResponse(connection.poll());
}
catch (Exception e) {
- // Old connection; this may be ok
+ // Old connection; best effort
}
}
-
}
}
@@ -471,8 +478,8 @@ class IOThread implements Runnable, AutoCloseable {
(double)localQueueTimeOut.toMillis();
if (newness < 0) return true; // connection retired prematurely
if (newness > 1) return false; // closing time reached
- Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 +
- (long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000)));
+ Duration pollInterval = Duration.ofMillis(pollIntervalUS / 1000 +
+ (long)((1 - newness) * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS / 1000)));
return connection.lastPollTime().plus(pollInterval).isBefore(clock.instant());
}
@@ -511,4 +518,10 @@ class IOThread implements Runnable, AutoCloseable {
}
}
+ /** For testing. Returns the current connection of this. Not thread safe. */
+ public GatewayConnection currentConnection() { return currentConnection; }
+
+ /** For testing. Returns a snapshot of the old connections of this. Not thread safe. */
+ public List<GatewayConnection> oldConnections() { return new ArrayList<>(oldConnections); }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 702a316422d..90d07104fef 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -17,6 +17,7 @@ import java.math.BigInteger;
import java.security.SecureRandom;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -255,6 +256,8 @@ public class OperationProcessor {
}
}
+ public List<ClusterConnection> clusters() { return Collections.unmodifiableList(clusters); }
+
public String getStatsAsJson() {
return operationStats.getStatsAsJson();
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java
index 0821fa55e06..79a91d0b5f3 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/Server.java
@@ -5,8 +5,7 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
/**
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
- * @since 5.1.20
+ * @author Einar M R Rosenvinge
*/
public final class Server implements AutoCloseable {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
index 0b03f3338c9..780de3e695c 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
@@ -18,13 +18,9 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.yahoo.vespa.http.client.TestUtils.getResults;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -96,9 +92,9 @@ public class V3HttpAPITest {
}
@Test
- public void requireThatSingleDestinationWorks() throws Exception {
+ public void testSingleDestination() throws Exception {
try (Server server = new Server(new V3MockParsingRequestHandler(), 0);
- Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) {
+ Session session = SessionFactory.create(Endpoint.create("localhost", server.getPort(), false))) {
writeDocuments(session);
Map<String, Result> results = getResults(session, documents.size());
@@ -106,8 +102,8 @@ public class V3HttpAPITest {
for (TestDocument document : documents) {
Result r = results.remove(document.getDocumentId());
- assertThat(r, not(nullValue()));
- assertThat(r.getDetails().toString(), r.isSuccess(), is(true));
+ assertNotNull(r);
+ assertTrue(r.getDetails().toString(), r.isSuccess());
}
assertTrue(results.isEmpty());
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
index 0005bddeb73..da82079e992 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java
@@ -20,8 +20,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
/**
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
- * @since 5.1.22
+ * @author Einar M R Rosenvinge
*/
public class EndpointResultQueueTest {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 8eb9513065e..59fb968906f 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.FeedConnectException;
import com.yahoo.vespa.http.client.FeedEndpointException;
import com.yahoo.vespa.http.client.FeedProtocolException;
+import com.yahoo.vespa.http.client.ManualClock;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.V3HttpAPITest;
import com.yahoo.vespa.http.client.config.Endpoint;
@@ -37,10 +38,12 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+// DO NOT ADD TESTS HERE, add to NewIOThreadTest
public class IOThreadTest {
private static final Endpoint ENDPOINT = Endpoint.create("myhost");
+ final Clock clock = Clock.systemUTC();
final EndpointResultQueue endpointResultQueue = mock(EndpointResultQueue.class);
final ApacheGatewayConnection apacheGatewayConnection = mock(ApacheGatewayConnection.class);
final String exceptionMessage = "SOME EXCEPTION FOO";
@@ -49,13 +52,13 @@ public class IOThreadTest {
Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(),
V3HttpAPITest.documents.get(0).getContents(),
null,
- Clock.systemUTC().instant());
+ clock.instant());
String docId2 = V3HttpAPITest.documents.get(1).getDocumentId();
Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(),
V3HttpAPITest.documents.get(1).getContents(),
null,
- Clock.systemUTC().instant());
- DocumentQueue documentQueue = new DocumentQueue(4, Clock.systemUTC());
+ clock.instant());
+ DocumentQueue documentQueue = new DocumentQueue(4, clock);
public IOThreadTest() {
when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT);
@@ -63,6 +66,7 @@ public class IOThreadTest {
/**
* Set up mock so that it can handle both failDocument() and resultReceived().
+ *
* @param expectedDocIdFail on failure, this has to be the doc id, or the mock will fail.
* @param expectedDocIdOk on ok, this has to be the doc id, or the mock will fail.
* @param isTransient checked on failure, if different, the mock will fail.
@@ -100,8 +104,9 @@ public class IOThreadTest {
documentQueue,
0,
Duration.ofSeconds(15),
+ true,
10,
- Clock.systemUTC());
+ clock);
}
@Test
@@ -118,7 +123,7 @@ public class IOThreadTest {
}
@Test
- public void requireThatSingleDocumentWriteErrorIsHandledProperly() throws Exception {
+ public void testDocumentWriteError() throws Exception {
when(apacheGatewayConnection.connect()).thenReturn(true);
when(apacheGatewayConnection.write(any())).thenThrow(new IOException(exceptionMessage));
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
@@ -129,7 +134,7 @@ public class IOThreadTest {
}
@Test
- public void requireThatTwoDocumentsFirstWriteErrorSecondOkIsHandledProperly() throws Exception {
+ public void testTwoDocumentsFirstWriteErrorSecondOk() throws Exception {
when(apacheGatewayConnection.connect()).thenReturn(true);
InputStream serverResponse = new ByteArrayInputStream(
(docId2 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
@@ -149,10 +154,8 @@ public class IOThreadTest {
@Test
public void testQueueTimeOutNoNoConnectionToServer() throws Exception {
when(apacheGatewayConnection.connect()).thenReturn(false);
- InputStream serverResponse = new ByteArrayInputStream(
- ("").getBytes(StandardCharsets.UTF_8));
- when(apacheGatewayConnection.write(any()))
- .thenReturn(serverResponse);
+ InputStream serverResponse = new ByteArrayInputStream(("").getBytes(StandardCharsets.UTF_8));
+ when(apacheGatewayConnection.write(any())).thenReturn(serverResponse);
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
"java.lang.Exception: Not sending document operation, timed out in queue after");
try (IOThread ioThread = createIOThread(10, 10)) {
@@ -162,7 +165,7 @@ public class IOThreadTest {
}
@Test
- public void requireThatEndpointProtocolExceptionsArePropagated()
+ public void testEndpointProtocolExceptionPropagation()
throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
when(apacheGatewayConnection.connect()).thenReturn(true);
int errorCode = 403;
@@ -183,7 +186,7 @@ public class IOThreadTest {
}
@Test
- public void requireThatEndpointConnectExceptionsArePropagated()
+ public void testEndpointConnectExceptionsPropagation()
throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
when(apacheGatewayConnection.connect()).thenReturn(true);
String errorMessage = "generic error message";
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
new file mode 100644
index 00000000000..615fa22a6cf
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java
@@ -0,0 +1,192 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client.core.communication;
+
+import com.yahoo.vespa.http.client.FeedClient;
+import com.yahoo.vespa.http.client.FeedEndpointException;
+import com.yahoo.vespa.http.client.ManualClock;
+import com.yahoo.vespa.http.client.Result;
+import com.yahoo.vespa.http.client.config.Cluster;
+import com.yahoo.vespa.http.client.config.ConnectionParams;
+import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.SessionParams;
+import com.yahoo.vespa.http.client.core.Document;
+import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.ThrottlePolicy;
+import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThrottler;
+import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+/**
+ * TODO: Migrate IOThreadTests here.
+ *
+ * @author bratseth
+ */
+public class NewIOThreadTest {
+
+ @Test
+ public void testBasics() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ assertEquals(0, tester.inflight());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.send("doc1");
+ tester.send("doc2");
+ tester.send("doc3");
+ assertEquals(3, tester.inflight());
+ assertEquals(0, tester.success());
+ assertEquals(0, tester.failures());
+ tester.success("doc1");
+ tester.success("doc2");
+ tester.success("doc3");
+ assertEquals(0, tester.inflight());
+ assertEquals(3, tester.success());
+ assertEquals(0, tester.failures());
+ }
+
+ @Test
+ public void testPollingOldConnections() {
+ OperationProcessorTester tester = new OperationProcessorTester();
+ tester.tick(3);
+
+ assertEquals(1, tester.clusterConnections().size());
+ assertEquals(1, tester.clusterConnections().get(0).ioThreads().size());
+ IOThread ioThread = tester.clusterConnections().get(0).ioThreads().get(0);
+ DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection();
+ assertEquals(0, ioThread.oldConnections().size());
+
+ firstConnection.hold(true);
+ tester.send("doc1");
+ tester.tick(1);
+
+ tester.clock().advance(Duration.ofSeconds(20)); // Default connection ttl is 15
+ tester.tick(3);
+
+ assertEquals(1, ioThread.oldConnections().size());
+ assertEquals(firstConnection, ioThread.oldConnections().get(0));
+ assertNotSame(firstConnection, ioThread.currentConnection());
+ assertEquals(20, firstConnection.lastPollTime().toEpochMilli() / 1000);
+
+ // Check old connection poll pattern (linear backoff)
+ assertLastPollTimeWhenAdvancing(21, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(23, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(24, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(24, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(26, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(26, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(28, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(28, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(32, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(32, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester);
+ assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester);
+
+ tester.clock().advance(Duration.ofSeconds(200));
+ tester.tick(1);
+ assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size());
+ }
+
+ private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds,
+ int advanceSeconds,
+ DryRunGatewayConnection connection,
+ OperationProcessorTester tester) {
+ tester.clock().advance(Duration.ofSeconds(advanceSeconds));
+ tester.tick(1);
+ assertEquals(lastPollTimeSeconds, connection.lastPollTime().toEpochMilli() / 1000);
+ }
+
+ private static class OperationProcessorTester {
+
+ private final Endpoint endpoint;
+ private final int clusterId = 0;
+ private final ManualClock clock;
+ private final TestResultCallback resultCallback;
+ private final OperationProcessor operationProcessor;
+
+ public OperationProcessorTester() {
+ endpoint = Endpoint.create("test-endpoint");
+ SessionParams.Builder params = new SessionParams.Builder();
+ Cluster.Builder clusterParams = new Cluster.Builder();
+ clusterParams.addEndpoint(endpoint);
+ params.addCluster(clusterParams.build());
+ ConnectionParams.Builder connectionParams = new ConnectionParams.Builder();
+ connectionParams.setDryRun(true);
+ connectionParams.setRunThreads(false);
+ params.setConnectionParams(connectionParams.build());
+
+ clock = new ManualClock(Instant.ofEpochMilli(0));
+ resultCallback = new TestResultCallback();
+ operationProcessor = new OperationProcessor(new IncompleteResultsThrottler(1, 100, clock, new ThrottlePolicy()),
+ resultCallback,
+ params.build(),
+ new ScheduledThreadPoolExecutor(1),
+ clock);
+ }
+
+ public ManualClock clock() { return clock; }
+
+ /** Do n iteration of work in all io threads of this */
+ public void tick(int n) {
+ for (int i = 0; i < n; i++)
+ for (ClusterConnection cluster : operationProcessor.clusters())
+ for (IOThread thread : cluster.ioThreads())
+ thread.tick();
+ }
+
+ public void send(String documentId) {
+ operationProcessor.sendDocument(new Document(documentId, documentId, "data of " + documentId, null, clock.instant()));
+ }
+
+ public void success(String documentId) {
+ operationProcessor.resultReceived(new EndpointResult(documentId, new Result.Detail(endpoint)), clusterId);
+ }
+
+ public int inflight() {
+ return operationProcessor.getIncompleteResultQueueSize();
+ }
+
+ public int success() {
+ return resultCallback.successes;
+ }
+
+ public List<ClusterConnection> clusterConnections() {
+ return operationProcessor.clusters();
+ }
+
+ public int failures() {
+ return resultCallback.failures;
+ }
+
+ }
+
+ private static class TestResultCallback implements FeedClient.ResultCallback {
+
+ private int successes = 0;
+ private int failures = 0;
+
+ @Override
+ public void onCompletion(String docId, Result documentResult) {
+ successes++;
+ }
+
+ @Override
+ public void onEndpointException(FeedEndpointException exception) {
+ failures++;
+ }
+
+
+ }
+
+}