aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-08-28 10:20:11 +0200
committerJon Bratseth <bratseth@gmail.com>2020-08-28 10:20:11 +0200
commit364db9b44900a0453d6ab509bc5523e55295d30c (patch)
treef9a93d0c964c89ba5f11d0348869562273b0b3a5 /vespa-http-client
parentfeafa659ac4ac76cd9ac3067ca394a6470b1e59e (diff)
Parametrize clock
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java3
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java3
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java7
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java6
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java20
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java9
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java12
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java8
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java8
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java14
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java7
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java3
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java18
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java1
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java19
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java4
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java21
17 files changed, 106 insertions, 57 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java
index 4d50905da7b..62c15fcea27 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java
@@ -5,6 +5,7 @@ package com.yahoo.vespa.http.client;
import com.yahoo.vespa.http.client.config.SessionParams;
import com.yahoo.vespa.http.client.core.api.FeedClientImpl;
+import java.time.Clock;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -24,7 +25,7 @@ public class FeedClientFactory {
* @return newly created FeedClient API object.
*/
public static FeedClient create(SessionParams sessionParams, FeedClient.ResultCallback resultCallback) {
- return new FeedClientImpl(sessionParams, resultCallback, createTimeoutExecutor());
+ return new FeedClientImpl(sessionParams, resultCallback, createTimeoutExecutor(), Clock.systemUTC());
}
static ScheduledThreadPoolExecutor createTimeoutExecutor() {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java
index b03a2541cd0..b7423f75c87 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java
@@ -5,6 +5,7 @@ import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.config.SessionParams;
+import java.time.Clock;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -30,7 +31,7 @@ public final class SessionFactory {
@SuppressWarnings("deprecation")
static Session createInternal(SessionParams params) {
- return new com.yahoo.vespa.http.client.core.api.SessionImpl(params, createTimeoutExecutor());
+ return new com.yahoo.vespa.http.client.core.api.SessionImpl(params, createTimeoutExecutor(), Clock.systemUTC());
}
static ScheduledThreadPoolExecutor createTimeoutExecutor() {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
index 7238a0c4ba7..afa4cd0ae14 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
@@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -29,7 +30,8 @@ public class FeedClientImpl implements FeedClient {
public FeedClientImpl(SessionParams sessionParams,
ResultCallback resultCallback,
- ScheduledThreadPoolExecutor timeoutExecutor) {
+ ScheduledThreadPoolExecutor timeoutExecutor,
+ Clock clock) {
this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * (
sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
@@ -41,7 +43,8 @@ public class FeedClientImpl implements FeedClient {
new ThrottlePolicy()),
resultCallback,
sessionParams,
- timeoutExecutor);
+ timeoutExecutor,
+ clock);
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
index a5c97351347..1663e876d83 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
@@ -9,6 +9,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThro
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.io.OutputStream;
+import java.time.Clock;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -25,7 +26,7 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session {
private final BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
- public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor) {
+ public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) {
this.operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(
sessionParams.getThrottlerMinSize(),
@@ -39,7 +40,8 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session {
}
},
sessionParams,
- timeoutExecutor);
+ timeoutExecutor,
+ clock);
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
index ee801270c89..b51c0dd30a1 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
@@ -52,11 +52,12 @@ class ApacheGatewayConnection implements GatewayConnection {
private static final Logger log = Logger.getLogger(ApacheGatewayConnection.class.getName());
private static final ObjectMapper mapper = new ObjectMapper();
private static final String PATH = "/reserved-for-internal-use/feedapi?";
- private final List<Integer> SUPPORTED_VERSIONS = new ArrayList<>();
private static final byte[] START_OF_FEED_XML = "<vespafeed>\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] END_OF_FEED_XML = "\n</vespafeed>\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] START_OF_FEED_JSON = "[".getBytes(StandardCharsets.UTF_8);
private static final byte[] END_OF_FEED_JSON = "]".getBytes(StandardCharsets.UTF_8);
+
+ private final List<Integer> supportedVersions = new ArrayList<>();
private final byte[] startOfFeed;
private final byte[] endOfFeed;
private final Endpoint endpoint;
@@ -71,14 +72,16 @@ class ApacheGatewayConnection implements GatewayConnection {
private int negotiatedVersion = -1;
private final HttpClientFactory httpClientFactory;
private final String shardingKey = UUID.randomUUID().toString().substring(0, 5);
+ private final Clock clock;
ApacheGatewayConnection(Endpoint endpoint,
FeedParams feedParams,
String clusterSpecificRoute,
ConnectionParams connectionParams,
HttpClientFactory httpClientFactory,
- String clientId) {
- SUPPORTED_VERSIONS.add(3);
+ String clientId,
+ Clock clock) {
+ supportedVersions.add(3);
this.endpoint = endpoint;
this.feedParams = feedParams;
this.clusterSpecificRoute = clusterSpecificRoute;
@@ -86,6 +89,7 @@ class ApacheGatewayConnection implements GatewayConnection {
this.connectionParams = connectionParams;
this.httpClient = null;
this.clientId = clientId;
+ this.clock = clock;
if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) {
startOfFeed = START_OF_FEED_JSON;
@@ -103,7 +107,7 @@ class ApacheGatewayConnection implements GatewayConnection {
@Override
public InputStream poll() throws ServerResponseException, IOException {
- lastPollTime = Clock.systemUTC().instant();
+ lastPollTime = clock.instant();
return write(Collections.<Document>emptyList(), false, false);
}
@@ -121,7 +125,7 @@ class ApacheGatewayConnection implements GatewayConnection {
if (httpClient != null)
log.log(Level.WARNING, "Previous httpClient still exists.");
httpClient = httpClientFactory.createClient();
- connectionTime = Clock.systemUTC().instant();
+ connectionTime = clock.instant();
return httpClient != null;
}
@@ -185,7 +189,7 @@ class ApacheGatewayConnection implements GatewayConnection {
private HttpPost createPost(boolean drain, boolean useCompression, boolean isHandshake) {
HttpPost httpPost = new HttpPost(createUri());
- for (int v : SUPPORTED_VERSIONS) {
+ for (int v : supportedVersions) {
httpPost.addHeader(Headers.VERSION, "" + v);
}
if (sessionId != null) {
@@ -323,9 +327,9 @@ class ApacheGatewayConnection implements GatewayConnection {
} catch (NumberFormatException nfe) {
throw new ServerResponseException("Got bad protocol version from server: " + nfe.getMessage());
}
- if (!SUPPORTED_VERSIONS.contains(serverVersion)) {
+ if (!supportedVersions.contains(serverVersion)) {
throw new ServerResponseException("Unsupported version: " + serverVersion
- + ". Supported versions: " + SUPPORTED_VERSIONS);
+ + ". Supported versions: " + supportedVersions);
}
if (negotiatedVersion == -1) {
if (log.isLoggable(Level.FINE)) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java
index be1aa80b9ff..31ec8aa06a2 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java
@@ -7,6 +7,7 @@ import com.yahoo.vespa.http.client.config.FeedParams;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.time.Clock;
import java.util.Objects;
/**
@@ -20,19 +21,22 @@ public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory
private final ConnectionParams connectionParams;
private final ApacheGatewayConnection.HttpClientFactory httpClientFactory;
private final String clientId;
+ private final Clock clock;
public ApacheGatewayConnectionFactory(Endpoint endpoint,
FeedParams feedParams,
String clusterSpecificRoute,
ConnectionParams connectionParams,
ApacheGatewayConnection.HttpClientFactory httpClientFactory,
- String clientId) {
+ String clientId,
+ Clock clock) {
this.endpoint = validate(endpoint);
this.feedParams = feedParams;
this.clusterSpecificRoute = clusterSpecificRoute;
this.httpClientFactory = httpClientFactory;
this.connectionParams = connectionParams;
this.clientId = Objects.requireNonNull(clientId, "clientId cannot be null");
+ this.clock = clock;
}
private static Endpoint validate(Endpoint endpoint) {
@@ -52,7 +56,8 @@ public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory
clusterSpecificRoute,
connectionParams,
httpClientFactory,
- clientId);
+ clientId,
+ clock);
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index 319a925611a..bc537c42f88 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -14,6 +14,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.io.IOException;
import java.io.StringWriter;
+import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -44,7 +45,8 @@ public class ClusterConnection implements AutoCloseable {
Cluster cluster,
int clusterId,
int clientQueueSizePerCluster,
- ScheduledThreadPoolExecutor timeoutExecutor) {
+ ScheduledThreadPoolExecutor timeoutExecutor,
+ Clock clock) {
if (cluster.getEndpoints().isEmpty())
throw new IllegalArgumentException("At least a single endpoint is required in " + cluster);
@@ -69,14 +71,15 @@ public class ClusterConnection implements AutoCloseable {
for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) {
GatewayConnectionFactory connectionFactory;
if (connectionParams.isDryRun()) {
- connectionFactory = new DryRunGatewayConnectionFactory(endpoint);
+ connectionFactory = new DryRunGatewayConnectionFactory(endpoint, clock);
} else {
connectionFactory = new ApacheGatewayConnectionFactory(endpoint,
feedParams,
cluster.getRoute(),
connectionParams,
new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()),
- operationProcessor.getClientId()
+ operationProcessor.getClientId(),
+ clock
);
}
IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(),
@@ -90,7 +93,8 @@ public class ClusterConnection implements AutoCloseable {
documentQueue,
feedParams.getMaxSleepTimeMs(),
connectionParams.getConnectionTimeToLive(),
- idlePollFrequency);
+ idlePollFrequency,
+ clock);
ioThreads.add(ioThread);
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
index 02df8c52878..cfd4c1003de 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java
@@ -22,11 +22,13 @@ import java.util.List;
public class DryRunGatewayConnection implements GatewayConnection {
private final Endpoint endpoint;
+ private final Clock clock;
private Instant connectionTime = null;
private Instant lastPollTime = null;
- public DryRunGatewayConnection(Endpoint endpoint) {
+ public DryRunGatewayConnection(Endpoint endpoint, Clock clock) {
this.endpoint = endpoint;
+ this.clock = clock;
}
@Override
@@ -41,7 +43,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
@Override
public InputStream poll() {
- lastPollTime = Clock.systemUTC().instant();
+ lastPollTime = clock.instant();
return write(new ArrayList<>());
}
@@ -55,7 +57,7 @@ public class DryRunGatewayConnection implements GatewayConnection {
@Override
public boolean connect() {
- connectionTime = Clock.systemUTC().instant();
+ connectionTime = clock.instant();
return true;
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java
index 21a18ed5983..a234dba6b8e 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java
@@ -3,20 +3,24 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.config.Endpoint;
+import java.time.Clock;
+
/**
* @author bratseth
*/
public class DryRunGatewayConnectionFactory implements GatewayConnectionFactory {
private final Endpoint endpoint;
+ private final Clock clock;
- public DryRunGatewayConnectionFactory(Endpoint endpoint) {
+ public DryRunGatewayConnectionFactory(Endpoint endpoint, Clock clock) {
this.endpoint = endpoint;
+ this.clock = clock;
}
@Override
public GatewayConnection newConnection() {
- return new DryRunGatewayConnection(endpoint);
+ return new DryRunGatewayConnection(endpoint, clock);
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index d1bb0ad9a4f..40c100fa1eb 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -36,6 +36,7 @@ import java.util.logging.Logger;
class IOThread implements Runnable, AutoCloseable {
private static final Logger log = Logger.getLogger(IOThread.class.getName());
+
private final Endpoint endpoint;
private final GatewayConnectionFactory connectionFactory;
private final DocumentQueue documentQueue;
@@ -51,6 +52,7 @@ class IOThread implements Runnable, AutoCloseable {
private final GatewayThrottler gatewayThrottler;
private final Duration connectionTimeToLive;
private final long pollIntervalUS;
+ private final Clock clock;
private final Random random = new Random();
private GatewayConnection currentConnection;
@@ -85,7 +87,8 @@ class IOThread implements Runnable, AutoCloseable {
DocumentQueue documentQueue,
long maxSleepTimeMs,
Duration connectionTimeToLive,
- double idlePollFrequency) {
+ double idlePollFrequency,
+ Clock clock) {
this.endpoint = endpoint;
this.documentQueue = documentQueue;
this.connectionFactory = connectionFactory;
@@ -97,6 +100,7 @@ class IOThread implements Runnable, AutoCloseable {
this.connectionTimeToLive = connectionTimeToLive;
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s]
+ this.clock = clock;
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
this.localQueueTimeOut = localQueueTimeOut;
@@ -369,7 +373,7 @@ class IOThread implements Runnable, AutoCloseable {
private boolean isStale(GatewayConnection connection) {
return connection.connectionTime() != null
- && connection.connectionTime().plus(connectionTimeToLive).isBefore(Clock.systemUTC().instant());
+ && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant());
}
private ConnectionState refreshConnection(ConnectionState currentConnectionState) {
@@ -439,7 +443,7 @@ class IOThread implements Runnable, AutoCloseable {
for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) {
GatewayConnection connection = i.next();
- if (closingTime(connection).isBefore(Clock.systemUTC().instant())) {
+ if (closingTime(connection).isBefore(clock.instant())) {
connection.close();
i.remove();;
}
@@ -463,13 +467,13 @@ class IOThread implements Runnable, AutoCloseable {
if (connection.lastPollTime() == null) return true;
// Poll less the closer the connection comes to closing time
- double newness = ( closingTime(connection).toEpochMilli() - Clock.systemUTC().instant().toEpochMilli() ) /
+ double newness = ( closingTime(connection).toEpochMilli() - clock.instant().toEpochMilli() ) /
(double)localQueueTimeOut.toMillis();
if (newness < 0) return true; // connection retired prematurely
if (newness > 1) return false; // closing time reached
Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 +
(long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000)));
- return connection.lastPollTime().plus(pollInterval).isBefore(Clock.systemUTC().instant());
+ return connection.lastPollTime().plus(pollInterval).isBefore(clock.instant());
}
public static class ConnectionStats {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 692d90abe50..735b7332c03 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -15,6 +15,7 @@ import com.yahoo.vespa.http.client.core.communication.ClusterConnection;
import java.math.BigInteger;
import java.security.SecureRandom;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -59,7 +60,8 @@ public class OperationProcessor {
public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler,
FeedClient.ResultCallback resultCallback,
SessionParams sessionParams,
- ScheduledThreadPoolExecutor timeoutExecutor) {
+ ScheduledThreadPoolExecutor timeoutExecutor,
+ Clock clock) {
this.numDestinations = sessionParams.getClusters().size();
this.resultCallback = resultCallback;
this.incompleteResultsThrottler = incompleteResultsThrottler;
@@ -82,7 +84,8 @@ public class OperationProcessor {
cluster,
i,
sessionParams.getClientQueueSize() / sessionParams.getClusters().size(),
- timeoutExecutor));
+ timeoutExecutor,
+ clock));
}
operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler);
maxRetries = sessionParams.getConnectionParams().getMaxRetries();
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
index aa47128f436..b70fbaf3096 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java
@@ -11,6 +11,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
@@ -41,7 +42,7 @@ public class FeedClientTest {
resultsReceived.incrementAndGet();
};
- FeedClient feedClient = new FeedClientImpl(sessionParams, resultCallback, FeedClientFactory.createTimeoutExecutor());
+ FeedClient feedClient = new FeedClientImpl(sessionParams, resultCallback, FeedClientFactory.createTimeoutExecutor(), Clock.systemUTC());
@Test
public void testStreamAndClose() {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
index 8ff2566ead1..0813cb36078 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java
@@ -12,6 +12,7 @@ import org.junit.Test;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -79,7 +80,8 @@ public class QueueBoundsTest {
.build())
.setClientQueueSize(2)
.build(),
- SessionFactory.createTimeoutExecutor())) {
+ SessionFactory.createTimeoutExecutor(),
+ Clock.systemUTC())) {
FeederThread feeder = new FeederThread(session);
try {
feeder.start();
@@ -123,7 +125,8 @@ public class QueueBoundsTest {
.setNumPersistentConnectionsPerEndpoint(1)
.build())
.setClientQueueSize(6) //3 per cluster
- .build(), SessionFactory.createTimeoutExecutor())) {
+ .build(), SessionFactory.createTimeoutExecutor(),
+ Clock.systemUTC())) {
FeederThread feeder = new FeederThread(session);
try {
@@ -211,22 +214,23 @@ public class QueueBoundsTest {
.build())
.setClientQueueSize(1)
.build(),
- SessionFactory.createTimeoutExecutor())) {
+ SessionFactory.createTimeoutExecutor(),
+ Clock.systemUTC())) {
FeederThread feeder = new FeederThread(session);
feeder.start();
try {
{
System.out.println("We start with failed connection, post a document.");
assertFeedNotBlocking(feeder, 0);
- assertThat(session.results().size(), is(0));
+ assertEquals(0, session.results().size());
CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 1);
System.out.println("No result so far.");
- assertThat(session.results().size(), is(0));
+ assertEquals(0, session.results().size());
System.out.println("Make connection ok.");
mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK);
assert(lastPostFeed.await(120, TimeUnit.SECONDS));
- assertThat(lastPostFeed.getCount(), equalTo(0L));
+ assertEquals(0L, lastPostFeed.getCount());
assertResultQueueSize(session, 2, 120, TimeUnit.SECONDS);
}
@@ -236,7 +240,7 @@ public class QueueBoundsTest {
{
assertFeedNotBlocking(feeder, 2);
System.out.println("Fed one document, fit in queue.");
- assertThat(session.results().size(), is(2));
+ assertEquals(2, session.results().size());
System.out.println("Fed one document more, wait for failure.");
assertFeedNotBlocking(feeder, 3);
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
index 6907e24009b..0b03f3338c9 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java
@@ -200,4 +200,5 @@ public class V3HttpAPITest {
testServerWithMock(new V3MockParsingRequestHandler(
200, V3MockParsingRequestHandler.Scenario.CONDITON_NOT_MET), false, true);
}
+
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
index 4169e7e0ecf..13859329515 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
@@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -71,7 +72,8 @@ public class ApacheGatewayConnectionTest {
clusterSpecificRoute,
connectionParams,
mockFactory,
- "clientId");
+ "clientId",
+ Clock.systemUTC());
apacheGatewayConnection.connect();
apacheGatewayConnection.handshake();
documents.add(createDoc(docId, vespaDocContent, true));
@@ -97,7 +99,8 @@ public class ApacheGatewayConnectionTest {
clusterSpecificRoute,
connectionParams,
mockFactory,
- "clientId");
+ "clientId",
+ Clock.systemUTC());
apacheGatewayConnection.connect();
final List<Document> documents = new ArrayList<>();
apacheGatewayConnection.write(documents);
@@ -139,7 +142,8 @@ public class ApacheGatewayConnectionTest {
clusterSpecificRoute,
connectionParams,
mockFactory,
- "clientId");
+ "clientId",
+ Clock.systemUTC());
apacheGatewayConnection.connect();
apacheGatewayConnection.handshake();
@@ -204,7 +208,8 @@ public class ApacheGatewayConnectionTest {
clusterSpecificRoute,
connectionParams,
mockFactory,
- "clientId");
+ "clientId",
+ Clock.systemUTC());
apacheGatewayConnection.connect();
apacheGatewayConnection.handshake();
@@ -242,7 +247,8 @@ public class ApacheGatewayConnectionTest {
"",
connectionParams,
mockFactory,
- "clientId");
+ "clientId",
+ Clock.systemUTC());
apacheGatewayConnection.connect();
apacheGatewayConnection.handshake();
@@ -270,7 +276,8 @@ public class ApacheGatewayConnectionTest {
"",
new ConnectionParams.Builder().build(),
mockFactory,
- "clientId");
+ "clientId",
+ Clock.systemUTC());
apacheGatewayConnection.connect();
apacheGatewayConnection.handshake();
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 75df8a78b86..6836a0a1d2c 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -16,6 +16,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -95,7 +96,8 @@ public class IOThreadTest {
documentQueue,
0,
Duration.ofSeconds(15),
- 10);
+ 10,
+ Clock.systemUTC());
}
@Test
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
index 9753a180618..2f801012d6d 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
@@ -10,6 +10,7 @@ import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.EndpointResult;
import org.junit.Test;
+import java.time.Clock;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
@@ -49,7 +50,7 @@ public class OperationProcessorTest {
OperationProcessor q = new OperationProcessor(
new IncompleteResultsThrottler(1000, 1000, null, null),
(docId, documentResult) -> queue.add(documentResult),
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
q.resultReceived(new EndpointResult("foo", new Result.Detail(null)), 0);
@@ -127,7 +128,7 @@ public class OperationProcessorTest {
OperationProcessor operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(1000, 1000, null, null),
(docId, documentResult) -> queue.add(documentResult),
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
operationProcessor.sendDocument(doc1);
operationProcessor.sendDocument(doc1b);
@@ -165,7 +166,7 @@ public class OperationProcessorTest {
OperationProcessor operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(1000, 1000, null, null),
(docId, documentResult) -> queue.add(documentResult),
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
operationProcessor.sendDocument(doc1);
operationProcessor.sendDocument(doc1b);
@@ -198,7 +199,7 @@ public class OperationProcessorTest {
OperationProcessor operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(1000, 1000, null, null),
(docId, documentResult) -> queue.add(documentResult),
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
Queue<Document> documentQueue = new ArrayDeque<>();
for (int x = 0; x < 100; x++) {
@@ -233,7 +234,7 @@ public class OperationProcessorTest {
OperationProcessor operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(1000, 1000, null, null),
(docId, documentResult) -> queue.add(documentResult),
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
operationProcessor.sendDocument(doc1);
operationProcessor.sendDocument(doc1b); // Blocked
@@ -273,7 +274,7 @@ public class OperationProcessorTest {
OperationProcessor q = new OperationProcessor(
new IncompleteResultsThrottler(1000, 1000, null, null),
(docId, documentResult) -> queue.add(documentResult),
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
q.sendDocument(doc1);
assertEquals(0, queue.size());
@@ -299,7 +300,7 @@ public class OperationProcessorTest {
OperationProcessor q = new OperationProcessor(
new IncompleteResultsThrottler(1000, 1000, null, null),
(docId, documentResult) -> queue.add(documentResult),
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
q.sendDocument(doc1);
assertEquals(0, queue.size());
@@ -358,7 +359,7 @@ public class OperationProcessorTest {
OperationProcessor operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(1, 1, null, null),
(docId, documentResult) -> {},
- sessionParams, null);
+ sessionParams, null, Clock.systemUTC());
operationProcessor.sendDocument(doc1);
@@ -397,7 +398,7 @@ public class OperationProcessorTest {
(docId, documentResult) -> {
countDownLatch.countDown();
},
- sessionParams, executor);
+ sessionParams, executor, Clock.systemUTC());
// Will fail due to bogus host name, but will be retried.
operationProcessor.sendDocument(doc1);
@@ -425,7 +426,7 @@ public class OperationProcessorTest {
(docId, documentResult) -> {
countDownLatch.countDown();
},
- sessionParams, executor);
+ sessionParams, executor, Clock.systemUTC());
fail("Expected exception");
}