diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-08-28 10:20:11 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-08-28 10:20:11 +0200 |
commit | 364db9b44900a0453d6ab509bc5523e55295d30c (patch) | |
tree | f9a93d0c964c89ba5f11d0348869562273b0b3a5 /vespa-http-client | |
parent | feafa659ac4ac76cd9ac3067ca394a6470b1e59e (diff) |
Parametrize clock
Diffstat (limited to 'vespa-http-client')
17 files changed, 106 insertions, 57 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java index 4d50905da7b..62c15fcea27 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/FeedClientFactory.java @@ -5,6 +5,7 @@ package com.yahoo.vespa.http.client; import com.yahoo.vespa.http.client.config.SessionParams; import com.yahoo.vespa.http.client.core.api.FeedClientImpl; +import java.time.Clock; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -24,7 +25,7 @@ public class FeedClientFactory { * @return newly created FeedClient API object. */ public static FeedClient create(SessionParams sessionParams, FeedClient.ResultCallback resultCallback) { - return new FeedClientImpl(sessionParams, resultCallback, createTimeoutExecutor()); + return new FeedClientImpl(sessionParams, resultCallback, createTimeoutExecutor(), Clock.systemUTC()); } static ScheduledThreadPoolExecutor createTimeoutExecutor() { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java index b03a2541cd0..b7423f75c87 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/SessionFactory.java @@ -5,6 +5,7 @@ import com.yahoo.vespa.http.client.config.Cluster; import com.yahoo.vespa.http.client.config.Endpoint; import com.yahoo.vespa.http.client.config.SessionParams; +import java.time.Clock; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -30,7 +31,7 @@ public final class SessionFactory { @SuppressWarnings("deprecation") static Session createInternal(SessionParams params) { - return new com.yahoo.vespa.http.client.core.api.SessionImpl(params, createTimeoutExecutor()); + return new com.yahoo.vespa.http.client.core.api.SessionImpl(params, createTimeoutExecutor(), Clock.systemUTC()); } static ScheduledThreadPoolExecutor createTimeoutExecutor() { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index 7238a0c4ba7..afa4cd0ae14 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -11,6 +11,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.nio.charset.CharsetEncoder; import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.time.Instant; import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -29,7 +30,8 @@ public class FeedClientImpl implements FeedClient { public FeedClientImpl(SessionParams sessionParams, ResultCallback resultCallback, - ScheduledThreadPoolExecutor timeoutExecutor) { + ScheduledThreadPoolExecutor timeoutExecutor, + Clock clock) { this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * ( sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS)); @@ -41,7 +43,8 @@ public class FeedClientImpl implements FeedClient { new ThrottlePolicy()), resultCallback, sessionParams, - timeoutExecutor); + timeoutExecutor, + clock); } @Override diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java index a5c97351347..1663e876d83 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java @@ -9,6 +9,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.IncompleteResultsThro import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.io.OutputStream; +import java.time.Clock; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -25,7 +26,7 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session { private final BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); - public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor) { + public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) { this.operationProcessor = new OperationProcessor( new IncompleteResultsThrottler( sessionParams.getThrottlerMinSize(), @@ -39,7 +40,8 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session { } }, sessionParams, - timeoutExecutor); + timeoutExecutor, + clock); } @Override diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java index ee801270c89..b51c0dd30a1 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java @@ -52,11 +52,12 @@ class ApacheGatewayConnection implements GatewayConnection { private static final Logger log = Logger.getLogger(ApacheGatewayConnection.class.getName()); private static final ObjectMapper mapper = new ObjectMapper(); private static final String PATH = "/reserved-for-internal-use/feedapi?"; - private final List<Integer> SUPPORTED_VERSIONS = new ArrayList<>(); private static final byte[] START_OF_FEED_XML = "<vespafeed>\n".getBytes(StandardCharsets.UTF_8); private static final byte[] END_OF_FEED_XML = "\n</vespafeed>\n".getBytes(StandardCharsets.UTF_8); private static final byte[] START_OF_FEED_JSON = "[".getBytes(StandardCharsets.UTF_8); private static final byte[] END_OF_FEED_JSON = "]".getBytes(StandardCharsets.UTF_8); + + private final List<Integer> supportedVersions = new ArrayList<>(); private final byte[] startOfFeed; private final byte[] endOfFeed; private final Endpoint endpoint; @@ -71,14 +72,16 @@ class ApacheGatewayConnection implements GatewayConnection { private int negotiatedVersion = -1; private final HttpClientFactory httpClientFactory; private final String shardingKey = UUID.randomUUID().toString().substring(0, 5); + private final Clock clock; ApacheGatewayConnection(Endpoint endpoint, FeedParams feedParams, String clusterSpecificRoute, ConnectionParams connectionParams, HttpClientFactory httpClientFactory, - String clientId) { - SUPPORTED_VERSIONS.add(3); + String clientId, + Clock clock) { + supportedVersions.add(3); this.endpoint = endpoint; this.feedParams = feedParams; this.clusterSpecificRoute = clusterSpecificRoute; @@ -86,6 +89,7 @@ class ApacheGatewayConnection implements GatewayConnection { this.connectionParams = connectionParams; this.httpClient = null; this.clientId = clientId; + this.clock = clock; if (feedParams.getDataFormat() == FeedParams.DataFormat.JSON_UTF8) { startOfFeed = START_OF_FEED_JSON; @@ -103,7 +107,7 @@ class ApacheGatewayConnection implements GatewayConnection { @Override public InputStream poll() throws ServerResponseException, IOException { - lastPollTime = Clock.systemUTC().instant(); + lastPollTime = clock.instant(); return write(Collections.<Document>emptyList(), false, false); } @@ -121,7 +125,7 @@ class ApacheGatewayConnection implements GatewayConnection { if (httpClient != null) log.log(Level.WARNING, "Previous httpClient still exists."); httpClient = httpClientFactory.createClient(); - connectionTime = Clock.systemUTC().instant(); + connectionTime = clock.instant(); return httpClient != null; } @@ -185,7 +189,7 @@ class ApacheGatewayConnection implements GatewayConnection { private HttpPost createPost(boolean drain, boolean useCompression, boolean isHandshake) { HttpPost httpPost = new HttpPost(createUri()); - for (int v : SUPPORTED_VERSIONS) { + for (int v : supportedVersions) { httpPost.addHeader(Headers.VERSION, "" + v); } if (sessionId != null) { @@ -323,9 +327,9 @@ class ApacheGatewayConnection implements GatewayConnection { } catch (NumberFormatException nfe) { throw new ServerResponseException("Got bad protocol version from server: " + nfe.getMessage()); } - if (!SUPPORTED_VERSIONS.contains(serverVersion)) { + if (!supportedVersions.contains(serverVersion)) { throw new ServerResponseException("Unsupported version: " + serverVersion - + ". Supported versions: " + SUPPORTED_VERSIONS); + + ". Supported versions: " + supportedVersions); } if (negotiatedVersion == -1) { if (log.isLoggable(Level.FINE)) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java index be1aa80b9ff..31ec8aa06a2 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionFactory.java @@ -7,6 +7,7 @@ import com.yahoo.vespa.http.client.config.FeedParams; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Clock; import java.util.Objects; /** @@ -20,19 +21,22 @@ public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory private final ConnectionParams connectionParams; private final ApacheGatewayConnection.HttpClientFactory httpClientFactory; private final String clientId; + private final Clock clock; public ApacheGatewayConnectionFactory(Endpoint endpoint, FeedParams feedParams, String clusterSpecificRoute, ConnectionParams connectionParams, ApacheGatewayConnection.HttpClientFactory httpClientFactory, - String clientId) { + String clientId, + Clock clock) { this.endpoint = validate(endpoint); this.feedParams = feedParams; this.clusterSpecificRoute = clusterSpecificRoute; this.httpClientFactory = httpClientFactory; this.connectionParams = connectionParams; this.clientId = Objects.requireNonNull(clientId, "clientId cannot be null"); + this.clock = clock; } private static Endpoint validate(Endpoint endpoint) { @@ -52,7 +56,8 @@ public class ApacheGatewayConnectionFactory implements GatewayConnectionFactory clusterSpecificRoute, connectionParams, httpClientFactory, - clientId); + clientId, + clock); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index 319a925611a..bc537c42f88 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -14,6 +14,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.io.IOException; import java.io.StringWriter; +import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -44,7 +45,8 @@ public class ClusterConnection implements AutoCloseable { Cluster cluster, int clusterId, int clientQueueSizePerCluster, - ScheduledThreadPoolExecutor timeoutExecutor) { + ScheduledThreadPoolExecutor timeoutExecutor, + Clock clock) { if (cluster.getEndpoints().isEmpty()) throw new IllegalArgumentException("At least a single endpoint is required in " + cluster); @@ -69,14 +71,15 @@ public class ClusterConnection implements AutoCloseable { for (int i = 0; i < connectionParams.getNumPersistentConnectionsPerEndpoint(); i++) { GatewayConnectionFactory connectionFactory; if (connectionParams.isDryRun()) { - connectionFactory = new DryRunGatewayConnectionFactory(endpoint); + connectionFactory = new DryRunGatewayConnectionFactory(endpoint, clock); } else { connectionFactory = new ApacheGatewayConnectionFactory(endpoint, feedParams, cluster.getRoute(), connectionParams, new ApacheGatewayConnection.HttpClientFactory(connectionParams, endpoint.isUseSsl()), - operationProcessor.getClientId() + operationProcessor.getClientId(), + clock ); } IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(), @@ -90,7 +93,8 @@ public class ClusterConnection implements AutoCloseable { documentQueue, feedParams.getMaxSleepTimeMs(), connectionParams.getConnectionTimeToLive(), - idlePollFrequency); + idlePollFrequency, + clock); ioThreads.add(ioThread); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java index 02df8c52878..cfd4c1003de 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnection.java @@ -22,11 +22,13 @@ import java.util.List; public class DryRunGatewayConnection implements GatewayConnection { private final Endpoint endpoint; + private final Clock clock; private Instant connectionTime = null; private Instant lastPollTime = null; - public DryRunGatewayConnection(Endpoint endpoint) { + public DryRunGatewayConnection(Endpoint endpoint, Clock clock) { this.endpoint = endpoint; + this.clock = clock; } @Override @@ -41,7 +43,7 @@ public class DryRunGatewayConnection implements GatewayConnection { @Override public InputStream poll() { - lastPollTime = Clock.systemUTC().instant(); + lastPollTime = clock.instant(); return write(new ArrayList<>()); } @@ -55,7 +57,7 @@ public class DryRunGatewayConnection implements GatewayConnection { @Override public boolean connect() { - connectionTime = Clock.systemUTC().instant(); + connectionTime = clock.instant(); return true; } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java index 21a18ed5983..a234dba6b8e 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DryRunGatewayConnectionFactory.java @@ -3,20 +3,24 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.config.Endpoint; +import java.time.Clock; + /** * @author bratseth */ public class DryRunGatewayConnectionFactory implements GatewayConnectionFactory { private final Endpoint endpoint; + private final Clock clock; - public DryRunGatewayConnectionFactory(Endpoint endpoint) { + public DryRunGatewayConnectionFactory(Endpoint endpoint, Clock clock) { this.endpoint = endpoint; + this.clock = clock; } @Override public GatewayConnection newConnection() { - return new DryRunGatewayConnection(endpoint); + return new DryRunGatewayConnection(endpoint, clock); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index d1bb0ad9a4f..40c100fa1eb 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -36,6 +36,7 @@ import java.util.logging.Logger; class IOThread implements Runnable, AutoCloseable { private static final Logger log = Logger.getLogger(IOThread.class.getName()); + private final Endpoint endpoint; private final GatewayConnectionFactory connectionFactory; private final DocumentQueue documentQueue; @@ -51,6 +52,7 @@ class IOThread implements Runnable, AutoCloseable { private final GatewayThrottler gatewayThrottler; private final Duration connectionTimeToLive; private final long pollIntervalUS; + private final Clock clock; private final Random random = new Random(); private GatewayConnection currentConnection; @@ -85,7 +87,8 @@ class IOThread implements Runnable, AutoCloseable { DocumentQueue documentQueue, long maxSleepTimeMs, Duration connectionTimeToLive, - double idlePollFrequency) { + double idlePollFrequency, + Clock clock) { this.endpoint = endpoint; this.documentQueue = documentQueue; this.connectionFactory = connectionFactory; @@ -97,6 +100,7 @@ class IOThread implements Runnable, AutoCloseable { this.connectionTimeToLive = connectionTimeToLive; this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); // ensure range [1us, 10s] + this.clock = clock; this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); thread.setDaemon(true); this.localQueueTimeOut = localQueueTimeOut; @@ -369,7 +373,7 @@ class IOThread implements Runnable, AutoCloseable { private boolean isStale(GatewayConnection connection) { return connection.connectionTime() != null - && connection.connectionTime().plus(connectionTimeToLive).isBefore(Clock.systemUTC().instant()); + && connection.connectionTime().plus(connectionTimeToLive).isBefore(clock.instant()); } private ConnectionState refreshConnection(ConnectionState currentConnectionState) { @@ -439,7 +443,7 @@ class IOThread implements Runnable, AutoCloseable { for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) { GatewayConnection connection = i.next(); - if (closingTime(connection).isBefore(Clock.systemUTC().instant())) { + if (closingTime(connection).isBefore(clock.instant())) { connection.close(); i.remove();; } @@ -463,13 +467,13 @@ class IOThread implements Runnable, AutoCloseable { if (connection.lastPollTime() == null) return true; // Poll less the closer the connection comes to closing time - double newness = ( closingTime(connection).toEpochMilli() - Clock.systemUTC().instant().toEpochMilli() ) / + double newness = ( closingTime(connection).toEpochMilli() - clock.instant().toEpochMilli() ) / (double)localQueueTimeOut.toMillis(); if (newness < 0) return true; // connection retired prematurely if (newness > 1) return false; // closing time reached Duration pollInterval = Duration.ofMillis(pollIntervalUS * 1000 + (long)(newness * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS * 1000))); - return connection.lastPollTime().plus(pollInterval).isBefore(Clock.systemUTC().instant()); + return connection.lastPollTime().plus(pollInterval).isBefore(clock.instant()); } public static class ConnectionStats { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 692d90abe50..735b7332c03 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -15,6 +15,7 @@ import com.yahoo.vespa.http.client.core.communication.ClusterConnection; import java.math.BigInteger; import java.security.SecureRandom; +import java.time.Clock; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedHashMap; @@ -59,7 +60,8 @@ public class OperationProcessor { public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler, FeedClient.ResultCallback resultCallback, SessionParams sessionParams, - ScheduledThreadPoolExecutor timeoutExecutor) { + ScheduledThreadPoolExecutor timeoutExecutor, + Clock clock) { this.numDestinations = sessionParams.getClusters().size(); this.resultCallback = resultCallback; this.incompleteResultsThrottler = incompleteResultsThrottler; @@ -82,7 +84,8 @@ public class OperationProcessor { cluster, i, sessionParams.getClientQueueSize() / sessionParams.getClusters().size(), - timeoutExecutor)); + timeoutExecutor, + clock)); } operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler); maxRetries = sessionParams.getConnectionParams().getMaxRetries(); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java index aa47128f436..b70fbaf3096 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/FeedClientTest.java @@ -11,6 +11,7 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; @@ -41,7 +42,7 @@ public class FeedClientTest { resultsReceived.incrementAndGet(); }; - FeedClient feedClient = new FeedClientImpl(sessionParams, resultCallback, FeedClientFactory.createTimeoutExecutor()); + FeedClient feedClient = new FeedClientImpl(sessionParams, resultCallback, FeedClientFactory.createTimeoutExecutor(), Clock.systemUTC()); @Test public void testStreamAndClose() { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java index 8ff2566ead1..0813cb36078 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/QueueBoundsTest.java @@ -12,6 +12,7 @@ import org.junit.Test; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -79,7 +80,8 @@ public class QueueBoundsTest { .build()) .setClientQueueSize(2) .build(), - SessionFactory.createTimeoutExecutor())) { + SessionFactory.createTimeoutExecutor(), + Clock.systemUTC())) { FeederThread feeder = new FeederThread(session); try { feeder.start(); @@ -123,7 +125,8 @@ public class QueueBoundsTest { .setNumPersistentConnectionsPerEndpoint(1) .build()) .setClientQueueSize(6) //3 per cluster - .build(), SessionFactory.createTimeoutExecutor())) { + .build(), SessionFactory.createTimeoutExecutor(), + Clock.systemUTC())) { FeederThread feeder = new FeederThread(session); try { @@ -211,22 +214,23 @@ public class QueueBoundsTest { .build()) .setClientQueueSize(1) .build(), - SessionFactory.createTimeoutExecutor())) { + SessionFactory.createTimeoutExecutor(), + Clock.systemUTC())) { FeederThread feeder = new FeederThread(session); feeder.start(); try { { System.out.println("We start with failed connection, post a document."); assertFeedNotBlocking(feeder, 0); - assertThat(session.results().size(), is(0)); + assertEquals(0, session.results().size()); CountDownLatch lastPostFeed = assertFeedBlocking(feeder, 1); System.out.println("No result so far."); - assertThat(session.results().size(), is(0)); + assertEquals(0, session.results().size()); System.out.println("Make connection ok."); mockXmlParsingRequestHandler.setScenario(V3MockParsingRequestHandler.Scenario.ALL_OK); assert(lastPostFeed.await(120, TimeUnit.SECONDS)); - assertThat(lastPostFeed.getCount(), equalTo(0L)); + assertEquals(0L, lastPostFeed.getCount()); assertResultQueueSize(session, 2, 120, TimeUnit.SECONDS); } @@ -236,7 +240,7 @@ public class QueueBoundsTest { { assertFeedNotBlocking(feeder, 2); System.out.println("Fed one document, fit in queue."); - assertThat(session.results().size(), is(2)); + assertEquals(2, session.results().size()); System.out.println("Fed one document more, wait for failure."); assertFeedNotBlocking(feeder, 3); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java index 6907e24009b..0b03f3338c9 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/V3HttpAPITest.java @@ -200,4 +200,5 @@ public class V3HttpAPITest { testServerWithMock(new V3MockParsingRequestHandler( 200, V3MockParsingRequestHandler.Scenario.CONDITON_NOT_MET), false, true); } + } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java index 4169e7e0ecf..13859329515 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java @@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -71,7 +72,8 @@ public class ApacheGatewayConnectionTest { clusterSpecificRoute, connectionParams, mockFactory, - "clientId"); + "clientId", + Clock.systemUTC()); apacheGatewayConnection.connect(); apacheGatewayConnection.handshake(); documents.add(createDoc(docId, vespaDocContent, true)); @@ -97,7 +99,8 @@ public class ApacheGatewayConnectionTest { clusterSpecificRoute, connectionParams, mockFactory, - "clientId"); + "clientId", + Clock.systemUTC()); apacheGatewayConnection.connect(); final List<Document> documents = new ArrayList<>(); apacheGatewayConnection.write(documents); @@ -139,7 +142,8 @@ public class ApacheGatewayConnectionTest { clusterSpecificRoute, connectionParams, mockFactory, - "clientId"); + "clientId", + Clock.systemUTC()); apacheGatewayConnection.connect(); apacheGatewayConnection.handshake(); @@ -204,7 +208,8 @@ public class ApacheGatewayConnectionTest { clusterSpecificRoute, connectionParams, mockFactory, - "clientId"); + "clientId", + Clock.systemUTC()); apacheGatewayConnection.connect(); apacheGatewayConnection.handshake(); @@ -242,7 +247,8 @@ public class ApacheGatewayConnectionTest { "", connectionParams, mockFactory, - "clientId"); + "clientId", + Clock.systemUTC()); apacheGatewayConnection.connect(); apacheGatewayConnection.handshake(); @@ -270,7 +276,8 @@ public class ApacheGatewayConnectionTest { "", new ConnectionParams.Builder().build(), mockFactory, - "clientId"); + "clientId", + Clock.systemUTC()); apacheGatewayConnection.connect(); apacheGatewayConnection.handshake(); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 75df8a78b86..6836a0a1d2c 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -16,6 +16,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -95,7 +96,8 @@ public class IOThreadTest { documentQueue, 0, Duration.ofSeconds(15), - 10); + 10, + Clock.systemUTC()); } @Test diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java index 9753a180618..2f801012d6d 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java @@ -10,6 +10,7 @@ import com.yahoo.vespa.http.client.core.Document; import com.yahoo.vespa.http.client.core.EndpointResult; import org.junit.Test; +import java.time.Clock; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -49,7 +50,7 @@ public class OperationProcessorTest { OperationProcessor q = new OperationProcessor( new IncompleteResultsThrottler(1000, 1000, null, null), (docId, documentResult) -> queue.add(documentResult), - sessionParams, null); + sessionParams, null, Clock.systemUTC()); q.resultReceived(new EndpointResult("foo", new Result.Detail(null)), 0); @@ -127,7 +128,7 @@ public class OperationProcessorTest { OperationProcessor operationProcessor = new OperationProcessor( new IncompleteResultsThrottler(1000, 1000, null, null), (docId, documentResult) -> queue.add(documentResult), - sessionParams, null); + sessionParams, null, Clock.systemUTC()); operationProcessor.sendDocument(doc1); operationProcessor.sendDocument(doc1b); @@ -165,7 +166,7 @@ public class OperationProcessorTest { OperationProcessor operationProcessor = new OperationProcessor( new IncompleteResultsThrottler(1000, 1000, null, null), (docId, documentResult) -> queue.add(documentResult), - sessionParams, null); + sessionParams, null, Clock.systemUTC()); operationProcessor.sendDocument(doc1); operationProcessor.sendDocument(doc1b); @@ -198,7 +199,7 @@ public class OperationProcessorTest { OperationProcessor operationProcessor = new OperationProcessor( new IncompleteResultsThrottler(1000, 1000, null, null), (docId, documentResult) -> queue.add(documentResult), - sessionParams, null); + sessionParams, null, Clock.systemUTC()); Queue<Document> documentQueue = new ArrayDeque<>(); for (int x = 0; x < 100; x++) { @@ -233,7 +234,7 @@ public class OperationProcessorTest { OperationProcessor operationProcessor = new OperationProcessor( new IncompleteResultsThrottler(1000, 1000, null, null), (docId, documentResult) -> queue.add(documentResult), - sessionParams, null); + sessionParams, null, Clock.systemUTC()); operationProcessor.sendDocument(doc1); operationProcessor.sendDocument(doc1b); // Blocked @@ -273,7 +274,7 @@ public class OperationProcessorTest { OperationProcessor q = new OperationProcessor( new IncompleteResultsThrottler(1000, 1000, null, null), (docId, documentResult) -> queue.add(documentResult), - sessionParams, null); + sessionParams, null, Clock.systemUTC()); q.sendDocument(doc1); assertEquals(0, queue.size()); @@ -299,7 +300,7 @@ public class OperationProcessorTest { OperationProcessor q = new OperationProcessor( new IncompleteResultsThrottler(1000, 1000, null, null), (docId, documentResult) -> queue.add(documentResult), - sessionParams, null); + sessionParams, null, Clock.systemUTC()); q.sendDocument(doc1); assertEquals(0, queue.size()); @@ -358,7 +359,7 @@ public class OperationProcessorTest { OperationProcessor operationProcessor = new OperationProcessor( new IncompleteResultsThrottler(1, 1, null, null), (docId, documentResult) -> {}, - sessionParams, null); + sessionParams, null, Clock.systemUTC()); operationProcessor.sendDocument(doc1); @@ -397,7 +398,7 @@ public class OperationProcessorTest { (docId, documentResult) -> { countDownLatch.countDown(); }, - sessionParams, executor); + sessionParams, executor, Clock.systemUTC()); // Will fail due to bogus host name, but will be retried. operationProcessor.sendDocument(doc1); @@ -425,7 +426,7 @@ public class OperationProcessorTest { (docId, documentResult) -> { countDownLatch.countDown(); }, - sessionParams, executor); + sessionParams, executor, Clock.systemUTC()); fail("Expected exception"); } |