aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-08-28 12:08:27 +0200
committerJon Bratseth <bratseth@gmail.com>2020-08-28 12:08:27 +0200
commit72bdb502a00cef23150d61e9c309938121763343 (patch)
tree0c1fa9745fe5c62571484997849381e349958c9c /vespa-http-client
parent364db9b44900a0453d6ab509bc5523e55295d30c (diff)
Completely parametrize time
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java2
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java56
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java19
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java7
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java2
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java24
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java8
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java18
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java10
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java7
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/ManualClock.java55
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java22
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java81
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java20
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java10
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java43
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java10
20 files changed, 247 insertions, 186 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
index 473b9494ba4..16374ec07cc 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
@@ -78,7 +78,6 @@ public class Result {
private final Endpoint endpoint;
private final Exception exception;
private final String traceMessage;
- private final long timeStampMillis = System.currentTimeMillis();
public Detail(Endpoint endpoint, ResultType resultType, String traceMessage, Exception e) {
this.endpoint = endpoint;
@@ -133,7 +132,6 @@ public class Result {
b.append(" trace='").append(traceMessage).append("'");
if (endpoint != null)
b.append(" endpoint=").append(endpoint);
- b.append(" resultTimeLocally=").append(timeStampMillis).append("\n");
return b.toString();
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
index a08064c8ce5..98fd2f9da84 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
@@ -7,6 +7,8 @@ import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -18,44 +20,40 @@ final public class Document {
private final String documentId;
private final ByteBuffer data;
- private final long createTimeMillis = System.currentTimeMillis();
- // This is initialized lazily to reduce work on calling thread (which is the thread calling the API).
+ private final Instant createTime;
+ // This is initialized lazily to reduce work on calling thread (which is the thread calling the API)
private String operationId = null;
private final Object context;
- private long queueInsertTimestampMillis;
+ private Instant queueInsertTime;
- public Document(String documentId, byte[] data, Object context) {
- this.documentId = documentId;
- this.context = context;
- this.data = ByteBuffer.wrap(data);
+ public Document(String documentId, byte[] data, Object context, Instant createTime) {
+ this(documentId, null, ByteBuffer.wrap(data), context, createTime);
}
- public Document(String documentId, String operationId, CharSequence data, Object context) {
+ public Document(String documentId, String operationId, CharSequence data, Object context, Instant createTime) {
+ this(documentId, operationId, encode(data, documentId), context, createTime);
+ }
+
+ private Document(String documentId, String operationId, ByteBuffer data, Object context, Instant createTime) {
this.documentId = documentId;
this.operationId = operationId;
+ this.data = data;
this.context = context;
- try {
- this.data = StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data));
- } catch (CharacterCodingException e) {
- throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e);
- }
+ this.createTime = Objects.requireNonNull(createTime, "createTime cannot be null");
+ this.queueInsertTime = createTime;
}
- public void resetQueueTime() {
- queueInsertTimestampMillis = System.currentTimeMillis();
+ public void setQueueInsertTime(Instant queueInsertTime) {
+ this.queueInsertTime = queueInsertTime;
}
- public long timeInQueueMillis() {
- return System.currentTimeMillis() - queueInsertTimestampMillis;
- }
+ public Instant getQueueInsertTime() { return queueInsertTime; }
public CharSequence getDataAsString() {
return StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer());
}
- public Object getContext() {
- return context;
- }
+ public Object getContext() { return context; }
public static class DocumentException extends IOException {
private static final long serialVersionUID = 29832833292L;
@@ -65,9 +63,7 @@ final public class Document {
}
}
- public String getDocumentId() {
- return documentId;
- }
+ public String getDocumentId() { return documentId; }
public ByteBuffer getData() {
return data.asReadOnlyBuffer();
@@ -77,9 +73,7 @@ final public class Document {
return data.remaining();
}
- public long createTimeMillis() {
- return createTimeMillis;
- }
+ public Instant createTime() { return createTime; }
public String getOperationId() {
if (operationId == null) {
@@ -91,4 +85,12 @@ final public class Document {
@Override
public String toString() { return "document '" + documentId + "'"; }
+ private static ByteBuffer encode(CharSequence data, String documentId) {
+ try {
+ return StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data));
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e);
+ }
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
index afa4cd0ae14..a950cb545de 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
*/
public class FeedClientImpl implements FeedClient {
+ private final Clock clock;
private final OperationProcessor operationProcessor;
private final long closeTimeoutMs;
private final long sleepTimeMs = 500;
@@ -32,15 +33,15 @@ public class FeedClientImpl implements FeedClient {
ResultCallback resultCallback,
ScheduledThreadPoolExecutor timeoutExecutor,
Clock clock) {
- this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * (
- sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
- sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
+ this.clock = clock;
+ this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) *
+ (sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
+ sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
this.operationProcessor = new OperationProcessor(
- new IncompleteResultsThrottler(
- sessionParams.getThrottlerMinSize(),
- sessionParams.getClientQueueSize(),
- ()->System.currentTimeMillis(),
- new ThrottlePolicy()),
+ new IncompleteResultsThrottler(sessionParams.getThrottlerMinSize(),
+ sessionParams.getClientQueueSize(),
+ clock,
+ new ThrottlePolicy()),
resultCallback,
sessionParams,
timeoutExecutor,
@@ -53,7 +54,7 @@ public class FeedClientImpl implements FeedClient {
charsetEncoder.onMalformedInput(CodingErrorAction.REPORT);
charsetEncoder.onUnmappableCharacter(CodingErrorAction.REPORT);
- Document document = new Document(documentId, operationId, documentData, context);
+ Document document = new Document(documentId, operationId, documentData, context, clock.instant());
operationProcessor.sendDocument(document);
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
index bf55a46277d..e09cecf7161 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
@@ -6,6 +6,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.time.Clock;
/**
* Class for wiring up the Session API. It is the return value of stream() in the Session API.
@@ -17,19 +18,21 @@ class MultiClusterSessionOutputStream extends ByteArrayOutputStream {
private final CharSequence documentId;
private final OperationProcessor operationProcessor;
private final Object context;
+ private final Clock clock;
- public MultiClusterSessionOutputStream(
- CharSequence documentId,
- OperationProcessor operationProcessor,
- Object context) {
+ public MultiClusterSessionOutputStream(CharSequence documentId,
+ OperationProcessor operationProcessor,
+ Object context,
+ Clock clock) {
this.documentId = documentId;
this.context = context;
this.operationProcessor = operationProcessor;
+ this.clock = clock;
}
@Override
public void close() throws IOException {
- Document document = new Document(documentId.toString(), toByteArray(), context);
+ Document document = new Document(documentId.toString(), toByteArray(), context, clock.instant());
operationProcessor.sendDocument(document);
super.close();
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
index 1663e876d83..a68d7eb7524 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
@@ -24,14 +24,15 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session {
private final OperationProcessor operationProcessor;
private final BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
-
+ private final Clock clock;
public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) {
+ this.clock = clock;
this.operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(
sessionParams.getThrottlerMinSize(),
sessionParams.getClientQueueSize(),
- ()->System.currentTimeMillis(),
+ clock,
new ThrottlePolicy()),
new FeedClient.ResultCallback() {
@Override
@@ -46,7 +47,7 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session {
@Override
public OutputStream stream(CharSequence documentId) {
- return new MultiClusterSessionOutputStream(documentId, operationProcessor, null);
+ return new MultiClusterSessionOutputStream(documentId, operationProcessor, null, clock);
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index bc537c42f88..a232ceeacf5 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -56,7 +56,7 @@ public class ClusterConnection implements AutoCloseable {
throw new IllegalArgumentException("At least 1 persistent connection per endpoint is required in " + cluster);
int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
- documentQueue = new DocumentQueue(clientQueueSizePerCluster);
+ documentQueue = new DocumentQueue(clientQueueSizePerCluster, clock);
ioThreadGroup = operationProcessor.getIoThreadGroup();
singleEndpoint = cluster.getEndpoints().size() == 1 ? cluster.getEndpoints().get(0) : null;
Double idlePollFrequency = feedParams.getIdlePollFrequency();
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
index ce867d001aa..3536013e043 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.core.Document;
+import java.time.Clock;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -22,10 +23,12 @@ class DocumentQueue {
private final Deque<Document> queue;
private final int maxSize;
private boolean closed = false;
+ private final Clock clock;
- DocumentQueue(int maxSize) {
+ DocumentQueue(int maxSize, Clock clock) {
this.maxSize = maxSize;
this.queue = new ArrayDeque<>(maxSize);
+ this.clock = clock;
}
List<Document> removeAllDocuments() {
@@ -40,7 +43,7 @@ class DocumentQueue {
}
void put(Document document, boolean calledFromIoThreadGroup) throws InterruptedException {
- document.resetQueueTime();
+ document.setQueueInsertTime(clock.instant());
synchronized (queue) {
while (!closed && (queue.size() >= maxSize) && !calledFromIoThreadGroup) {
queue.wait();
@@ -57,9 +60,9 @@ class DocumentQueue {
synchronized (queue) {
long remainingToWait = unit.toMillis(timeout);
while (queue.isEmpty()) {
- long startTime = System.currentTimeMillis();
+ long startTime = clock.millis();
queue.wait(remainingToWait);
- remainingToWait -= (System.currentTimeMillis() - startTime);
+ remainingToWait -= (clock.millis() - startTime);
if (remainingToWait <= 0) {
break;
}
@@ -109,14 +112,13 @@ class DocumentQueue {
Optional<Document> pollDocumentIfTimedoutInQueue(Duration localQueueTimeOut) {
synchronized (queue) {
- if (queue.isEmpty()) {
- return Optional.empty();
- }
+ if (queue.isEmpty()) return Optional.empty();
+
Document document = queue.peek();
- if (document.timeInQueueMillis() > localQueueTimeOut.toMillis()) {
- return Optional.of(queue.poll());
- }
- return Optional.empty();
+ if (document.getQueueInsertTime().plus(localQueueTimeOut).isBefore(clock.instant()))
+ return Optional.ofNullable(queue.poll());
+ else
+ return Optional.empty();
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 40c100fa1eb..99aff8b4baa 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -263,11 +263,11 @@ class IOThread implements Runnable, AutoCloseable {
private ProcessResponse feedDocumentAndProcessResults(List<Document> docs)
throws ServerResponseException, IOException {
addDocumentsToResultQueue(docs);
- long startTime = System.currentTimeMillis();
+ long startTime = clock.millis();
InputStream serverResponse = sendAndReceive(docs);
ProcessResponse processResponse = processResponse(serverResponse);
- lastGatewayProcessTimeMillis.set((int) (System.currentTimeMillis() - startTime));
+ lastGatewayProcessTimeMillis.set((int) (clock.millis() - startTime));
return processResponse;
}
@@ -418,7 +418,7 @@ class IOThread implements Runnable, AutoCloseable {
EndpointResult endpointResult = EndPointResultFactory.createTransientError(
endpoint, document.get().getOperationId(),
new Exception("Not sending document operation, timed out in queue after " +
- document.get().timeInQueueMillis() + " ms."));
+ (clock.millis() - document.get().getQueueInsertTime().toEpochMilli()) + " ms."));
resultQueue.failOperation(endpointResult, clusterId);
}
}
@@ -467,7 +467,7 @@ class IOThread implements Runnable, AutoCloseable {
if (connection.lastPollTime() == null) return true;
// Poll less the closer the connection comes to closing time
- double newness = ( closingTime(connection).toEpochMilli() - clock.instant().toEpochMilli() ) /
+ double newness = ( closingTime(connection).toEpochMilli() - clock.millis() ) /
(double)localQueueTimeOut.toMillis();
if (newness < 0) return true; // connection retired prematurely
if (newness > 1) return false; // closing time reached
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
index 883cea7e6f0..27ad88c123e 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.core.Document;
+import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
@@ -18,24 +19,25 @@ class DocumentSendInfo {
// This is lazily populated as normal cases does not require retries.
private Map<Integer, Integer> attemptedRetriesByClusterId = null;
private final StringBuilder localTrace;
+ private final Clock clock;
- DocumentSendInfo(Document document, boolean traceThisDoc) {
+ DocumentSendInfo(Document document, boolean traceThisDoc, Clock clock) {
this.document = document;
- localTrace = traceThisDoc
- ? new StringBuilder("\n" + document.createTimeMillis() + " Trace starting " + "\n")
- : null;
+ localTrace = traceThisDoc ? new StringBuilder("\n" + document.createTime() + " Trace starting " + "\n")
+ : null;
+ this.clock = clock;
}
boolean addIfNotAlreadyThere(Result.Detail detail, int clusterId) {
if (detailByClusterId.containsKey(clusterId)) {
if (localTrace != null) {
- localTrace.append(System.currentTimeMillis() + " Got duplicate detail, ignoring this: "
- + detail.toString() + "\n");
+ localTrace.append(clock.millis() + " Got duplicate detail, ignoring this: " +
+ detail.toString() + "\n");
}
return false;
}
if (localTrace != null) {
- localTrace.append(System.currentTimeMillis() + " Got detail: " + detail.toString() + "\n");
+ localTrace.append(clock.millis() + " Got detail: " + detail.toString() + "\n");
}
detailByClusterId.put(clusterId, detail);
return true;
@@ -60,7 +62,7 @@ class DocumentSendInfo {
retries++;
attemptedRetriesByClusterId.put(clusterId, retries);
if (localTrace != null) {
- localTrace.append(System.currentTimeMillis() + " Asked about retrying for cluster ID "
+ localTrace.append(clock.millis() + " Asked about retrying for cluster ID "
+ clusterId + ", number of retries is " + retries + " Detail:\n" + detail.toString());
}
return retries;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
index 205153a7a00..3d662eca3e7 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
@@ -21,12 +21,11 @@ import java.util.logging.Logger;
*/
public final class EndPointResultFactory {
- private static Logger log = Logger.getLogger(EndPointResultFactory.class.getName());
-
+ private static final Logger log = Logger.getLogger(EndPointResultFactory.class.getName());
private static final String EMPTY_MESSAGE = "-";
- public static Collection<EndpointResult> createResult(
- Endpoint endpoint, InputStream inputStream) throws IOException {
+ public static Collection<EndpointResult> createResult(Endpoint endpoint,
+ InputStream inputStream) throws IOException {
List<EndpointResult> results = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.US_ASCII))) {
@@ -82,9 +81,9 @@ public final class EndPointResultFactory {
return new EndpointResult(
reply.operationId,
new Result.Detail(endpoint,
- replyToResultType(reply),
- reply.traceMessage,
- exception));
+ replyToResultType(reply),
+ reply.traceMessage,
+ exception));
} catch (Throwable t) {
throw new IllegalArgumentException("Bad result line from server: '" + line + "'", t);
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
index 7cf4e32a880..ebeee802303 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor;
import com.yahoo.vespa.http.client.core.ThrottlePolicy;
+import java.time.Clock;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -57,6 +58,7 @@ public class IncompleteResultsThrottler {
/**
* Creates the throttler.
+ *
* @param minInFlightValue the throttler will never throttle beyond this limit.
* @param maxInFlightValue the throttler will never throttle above this limit. If zero, no limit.
* @param clock use to calculate window size. Can be null if minWindowSize and maxInFlightValue are equal.
@@ -68,7 +70,7 @@ public class IncompleteResultsThrottler {
this.policy = policy;
this.clock = clock;
if (minInFlightValue != maxInFlightValue) {
- this.sampleStartTimeMs = clock.getTimeMillis();
+ this.sampleStartTimeMs = clock.millis();
}
setNewSemaphoreSize(INITIAL_MAX_IN_FLIGHT_VALUE);
}
@@ -96,10 +98,6 @@ public class IncompleteResultsThrottler {
}
}
- public interface Clock {
- long getTimeMillis();
- }
-
public void resultReady(boolean success) {
blocker.operationDone();
if (!success) {
@@ -147,9 +145,8 @@ public class IncompleteResultsThrottler {
}
private void adjustThrottling() {
- if (clock.getTimeMillis() < sampleStartTimeMs + phaseSizeMs) {
- return;
- }
+ if (clock.millis() < sampleStartTimeMs + phaseSizeMs) return;
+
sampleStartTimeMs += phaseSizeMs;
if (stabilizingPhasesLeft-- == 0) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 735b7332c03..702a316422d 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -56,6 +56,7 @@ public class OperationProcessor {
private final boolean traceToStderr;
private final ThreadGroup ioThreadGroup;
private final String clientId = new BigInteger(130, random).toString(32);
+ private final Clock clock;
public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler,
FeedClient.ResultCallback resultCallback,
@@ -67,6 +68,7 @@ public class OperationProcessor {
this.incompleteResultsThrottler = incompleteResultsThrottler;
this.timeoutExecutor = timeoutExecutor;
this.ioThreadGroup = new ThreadGroup("operationprocessor");
+ this.clock = clock;
if (sessionParams.getClusters().isEmpty())
throw new IllegalArgumentException("Cannot feed to 0 clusters.");
@@ -184,7 +186,7 @@ public class OperationProcessor {
}
}
if (blockedDocumentToSend != null) {
- sendToClusters(blockedDocumentToSend);
+ sendToClusters(blockedDocumentToSend, clock);
}
return result;
}
@@ -228,13 +230,13 @@ public class OperationProcessor {
inflightDocumentIds.add(document.getDocumentId());
}
- sendToClusters(document);
+ sendToClusters(document, clock);
}
- private void sendToClusters(Document document) {
+ private void sendToClusters(Document document, Clock clock) {
synchronized (monitor) {
boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0;
- docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc));
+ docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc, clock));
}
for (ClusterConnection clusterConnection : clusters) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
index 7c034cab75f..926b4cf8c79 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
@@ -10,6 +10,7 @@ import com.yahoo.vespa.http.client.core.XmlFeedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,11 +35,11 @@ public class Runner {
boolean isJson,
AtomicInteger numSent,
boolean verbose) {
-
+ Clock clock = Clock.systemUTC();
if (verbose)
System.err.println("Now sending data.");
- long sendStartTime = System.currentTimeMillis();
+ long sendStartTime = clock.millis();
if (isJson) {
JsonReader.read(inputStream, feedClient, numSent);
} else {
@@ -49,7 +50,7 @@ public class Runner {
}
}
- long sendTotalTime = System.currentTimeMillis() - sendStartTime;
+ long sendTotalTime = clock.millis() - sendStartTime;
if (verbose)
System.err.println("Waiting for all results, sent " + numSent.get() + " docs.");
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/ManualClock.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/ManualClock.java
new file mode 100644
index 00000000000..b32d1eaa859
--- /dev/null
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/ManualClock.java
@@ -0,0 +1,55 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.http.client;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAmount;
+
+/**
+ * A clock which initially has the time of its creation but can only be advanced by calling advance
+ *
+ * @author bratseth
+ */
+public class ManualClock extends Clock {
+
+ private Instant currentTime = Instant.now();
+
+ public ManualClock() {}
+
+ public ManualClock(String utcIsoTime) {
+ this(at(utcIsoTime));
+ }
+
+ public ManualClock(Instant currentTime) {
+ this.currentTime = currentTime;
+ }
+
+ public void advance(TemporalAmount temporal) {
+ currentTime = currentTime.plus(temporal);
+ }
+
+ public void setInstant(Instant time) {
+ currentTime = time;
+ }
+
+ @Override
+ public Instant instant() { return currentTime; }
+
+ @Override
+ public ZoneId getZone() { return null; }
+
+ @Override
+ public Clock withZone(ZoneId zone) { return null; }
+
+ @Override
+ public long millis() { return currentTime.toEpochMilli(); }
+
+ public static Instant at(String utcIsoTime) {
+ return LocalDateTime.parse(utcIsoTime, DateTimeFormatter.ISO_DATE_TIME).atZone(ZoneOffset.UTC).toInstant();
+ }
+
+}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java
index b5c03eade51..ee2f021df6a 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java
@@ -5,9 +5,9 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
+import java.time.Clock;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
public class DocumentTest {
@@ -15,25 +15,25 @@ public class DocumentTest {
public void simpleCaseOk() {
String docId = "doc id";
String docContent = "foo";
- Document document = new Document(docId, docContent.getBytes(), null);
- assertThat(document.getDocumentId(), is(docId));
- assertThat(document.getData(), is(ByteBuffer.wrap(docContent.getBytes())));
- assertThat(document.getDataAsString().toString(), is(docContent));
+ Document document = new Document(docId, docContent.getBytes(), null, Clock.systemUTC().instant());
+ assertEquals(docId, document.getDocumentId());
+ assertEquals(ByteBuffer.wrap(docContent.getBytes()), document.getData());
+ assertEquals(docContent, document.getDataAsString().toString());
// Make sure that data is not modified on retrieval.
- assertThat(document.getDataAsString().toString(), is(docContent));
- assertThat(document.getData(), is(ByteBuffer.wrap(docContent.getBytes())));
- assertThat(document.getDocumentId(), is(docId));
+ assertEquals(docContent, document.getDataAsString().toString());
+ assertEquals(ByteBuffer.wrap(docContent.getBytes()), document.getData());
+ assertEquals(docId, document.getDocumentId());
}
@Test(expected = ReadOnlyBufferException.class)
public void notMutablePutTest() {
- Document document = new Document("id", null, "data", null /* context */);
+ Document document = new Document("id", null, "data", null, Clock.systemUTC().instant());
document.getData().put("a".getBytes());
}
@Test(expected = ReadOnlyBufferException.class)
public void notMutableCompactTest() {
- Document document = new Document("id", null, "data", null /* context */);
+ Document document = new Document("id", null, "data", null, Clock.systemUTC().instant());
document.getData().compact();
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
index 13859329515..2ee1a146dfa 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java
@@ -33,8 +33,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.any;
@@ -83,11 +81,10 @@ public class ApacheGatewayConnectionTest {
@Test(expected=IllegalArgumentException.class)
public void testServerReturnsBadSessionInV3() throws Exception {
- final Endpoint endpoint = Endpoint.create("localhost", 666, false);
- final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
- final String clusterSpecificRoute = "";
- final ConnectionParams connectionParams = new ConnectionParams.Builder()
- .build();
+ Endpoint endpoint = Endpoint.create("localhost", 666, false);
+ FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
+ String clusterSpecificRoute = "";
+ ConnectionParams connectionParams = new ConnectionParams.Builder().build();
// This is the fake server, returns wrong session Id.
ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> httpResponse("Wrong Id from server", "3"));
@@ -102,35 +99,33 @@ public class ApacheGatewayConnectionTest {
"clientId",
Clock.systemUTC());
apacheGatewayConnection.connect();
- final List<Document> documents = new ArrayList<>();
+ List<Document> documents = new ArrayList<>();
apacheGatewayConnection.write(documents);
}
@Test
public void testJsonDocumentHeader() throws Exception {
- final Endpoint endpoint = Endpoint.create("localhost", 666, false);
- final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
- final String clusterSpecificRoute = "";
- final ConnectionParams connectionParams = new ConnectionParams.Builder()
- .setUseCompression(true)
- .build();
- final List<Document> documents = new ArrayList<>();
+ Endpoint endpoint = Endpoint.create("localhost", 666, false);
+ FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build();
+ String clusterSpecificRoute = "";
+ ConnectionParams connectionParams = new ConnectionParams.Builder().setUseCompression(true).build();
+ List<Document> documents = new ArrayList<>();
- final String vespaDocContent ="Hello, I a JSON doc.";
- final String docId = "42";
+ String vespaDocContent ="Hello, I a JSON doc.";
+ String docId = "42";
- final AtomicInteger requestsReceived = new AtomicInteger(0);
+ AtomicInteger requestsReceived = new AtomicInteger(0);
// This is the fake server, checks that DATA_FORMAT header is set properly.
ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> {
- final Header header = post.getFirstHeader(Headers.DATA_FORMAT);
+ Header header = post.getFirstHeader(Headers.DATA_FORMAT);
if (requestsReceived.incrementAndGet() == 1) {
// This is handshake, it is not json.
assert (header == null);
return httpResponse("clientId", "3");
}
assertNotNull(header);
- assertThat(header.getValue(), is(FeedParams.DataFormat.JSON_UTF8.name()));
+ assertEquals(FeedParams.DataFormat.JSON_UTF8.name(), header.getValue());
// Test is done.
return httpResponse("clientId", "3");
});
@@ -154,13 +149,13 @@ public class ApacheGatewayConnectionTest {
@Test
public void testZipAndCreateEntity() throws IOException {
- final String testString = "Hello world";
+ String testString = "Hello world";
InputStream stream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8));
// Send in test data to method.
InputStreamEntity inputStreamEntity = ApacheGatewayConnection.zipAndCreateEntity(stream);
// Verify zipped data by comparing unzipped data with test data.
- final String rawContent = TestUtils.zipStreamToString(inputStreamEntity.getContent());
- assert(testString.equals(rawContent));
+ String rawContent = TestUtils.zipStreamToString(inputStreamEntity.getContent());
+ assertEquals(testString, rawContent);
}
/**
@@ -168,32 +163,28 @@ public class ApacheGatewayConnectionTest {
*/
@Test
public void testCompressedWriteOperations() throws Exception {
- final Endpoint endpoint = Endpoint.create("localhost", 666, false);
- final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.XML_UTF8).build();
- final String clusterSpecificRoute = "";
- final ConnectionParams connectionParams = new ConnectionParams.Builder()
- .setUseCompression(true)
- .build();
- final List<Document> documents = new ArrayList<>();
+ Endpoint endpoint = Endpoint.create("localhost", 666, false);
+ FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.XML_UTF8).build();
+ String clusterSpecificRoute = "";
+ ConnectionParams connectionParams = new ConnectionParams.Builder().setUseCompression(true).build();
+ List<Document> documents = new ArrayList<>();
- final String vespaDocContent ="Hello, I am the document data.";
- final String docId = "42";
+ String vespaDocContent ="Hello, I am the document data.";
+ String docId = "42";
- final Document doc = createDoc(docId, vespaDocContent, false);
+ Document doc = createDoc(docId, vespaDocContent, false);
// When sending data on http client, check if it is compressed. If compressed, unzip, check result,
// and count down latch.
ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> {
- final Header header = post.getFirstHeader("Content-Encoding");
+ Header header = post.getFirstHeader("Content-Encoding");
if (header != null && header.getValue().equals("gzip")) {
final String rawContent = TestUtils.zipStreamToString(post.getEntity().getContent());
final String vespaHeaderText = "<vespafeed>\n";
final String vespaFooterText = "</vespafeed>\n";
- assertThat(rawContent, is(
- doc.getOperationId() + " 38\n" + vespaHeaderText + vespaDocContent + "\n"
- + vespaFooterText));
-
+ assertEquals(doc.getOperationId() + " 38\n" + vespaHeaderText + vespaDocContent + "\n" + vespaFooterText,
+ rawContent);
}
return httpResponse("clientId", "3");
});
@@ -301,16 +292,12 @@ public class ApacheGatewayConnectionTest {
HttpResponse execute(HttpPost httpPost) throws IOException;
}
- private Document createDoc(final String docId, final String content, boolean useJson) throws IOException {
- return new Document(docId, content.getBytes(), null /* context */);
+ private Document createDoc(String docId, String content, boolean useJson) {
+ return new Document(docId, content.getBytes(), null, Clock.systemUTC().instant());
}
- private void addMockedHeader(
- final HttpResponse httpResponseMock,
- final String name,
- final String value,
- HeaderElement[] elements) {
- final Header header = new Header() {
+ private void addMockedHeader(HttpResponse httpResponseMock, String name, String value, HeaderElement[] elements) {
+ Header header = new Header() {
@Override
public String getName() {
return name;
@@ -328,7 +315,7 @@ public class ApacheGatewayConnectionTest {
}
private HttpResponse httpResponse(String sessionIdInResult, String version) throws IOException {
- final HttpResponse httpResponseMock = mock(HttpResponse.class);
+ HttpResponse httpResponseMock = mock(HttpResponse.class);
StatusLine statusLineMock = mock(StatusLine.class);
when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java
index 35a06258f86..af354b8feea 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java
@@ -4,21 +4,24 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.core.Document;
import org.junit.Test;
+import java.time.Clock;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class CloseableQTestCase {
+
@Test
public void requestThatPutIsInterruptedOnClose() throws InterruptedException {
- final DocumentQueue q = new DocumentQueue(1);
- q.put(new Document("id", null, "data", null), false);
+ Clock clock = Clock.systemUTC();
+ DocumentQueue q = new DocumentQueue(1, clock);
+ q.put(new Document("id", null, "data", null, clock.instant()), false);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
-
}
q.close();
q.clear();
@@ -26,7 +29,7 @@ public class CloseableQTestCase {
});
t.start();
try {
- q.put(new Document("id2", null, "data2", null), false);
+ q.put(new Document("id2", null, "data2", null, Clock.systemUTC().instant()), false);
fail("This shouldn't have worked.");
} catch (IllegalStateException ise) {
// ok!
@@ -39,10 +42,11 @@ public class CloseableQTestCase {
@Test
public void requireThatSelfIsUnbounded() throws InterruptedException {
- DocumentQueue q = new DocumentQueue(1);
- q.put(new Document("1", null, "data", null), true);
- q.put(new Document("2", null, "data", null), true);
- q.put(new Document("3", null, "data", null), true);
+ DocumentQueue q = new DocumentQueue(1, Clock.systemUTC());
+ q.put(new Document("1", null, "data", null, Clock.systemUTC().instant()), true);
+ q.put(new Document("2", null, "data", null, Clock.systemUTC().instant()), true);
+ q.put(new Document("3", null, "data", null, Clock.systemUTC().instant()), true);
assertEquals(3, q.size());
}
+
}
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 6836a0a1d2c..8eb9513065e 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -47,11 +47,15 @@ public class IOThreadTest {
CountDownLatch latch = new CountDownLatch(1);
String docId1 = V3HttpAPITest.documents.get(0).getDocumentId();
Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(),
- V3HttpAPITest.documents.get(0).getContents(), null /* context */);
+ V3HttpAPITest.documents.get(0).getContents(),
+ null,
+ Clock.systemUTC().instant());
String docId2 = V3HttpAPITest.documents.get(1).getDocumentId();
Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(),
- V3HttpAPITest.documents.get(1).getContents(), null /* context */);
- DocumentQueue documentQueue = new DocumentQueue(4);
+ V3HttpAPITest.documents.get(1).getContents(),
+ null,
+ Clock.systemUTC().instant());
+ DocumentQueue documentQueue = new DocumentQueue(4, Clock.systemUTC());
public IOThreadTest() {
when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT);
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
index baf6e2f2df3..ec929d68efb 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java
@@ -1,9 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.operationProcessor;
+import com.yahoo.vespa.http.client.ManualClock;
import com.yahoo.vespa.http.client.core.ThrottlePolicy;
import org.junit.Test;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
@@ -12,6 +15,7 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyDouble;
@@ -42,14 +46,14 @@ public class IncompleteResultsThrottlerTest {
* @return median queue length.
*/
int getAverageQueue(int clientCount, int breakPoint, int simulationTimeMs) {
- final AtomicLong timeMs = new AtomicLong(0);
+ ManualClock clock = new ManualClock(Instant.ofEpochMilli(0));
ArrayList<IncompleteResultsThrottler> incompleteResultsThrottlers = new ArrayList<>();
MockServer mockServer = new MockServer(breakPoint);
for (int x = 0; x < clientCount; x++) {
IncompleteResultsThrottler incompleteResultsThrottler =
- new IncompleteResultsThrottler(10, 50000, () -> timeMs.get(), new ThrottlePolicy());
+ new IncompleteResultsThrottler(10, 50000, clock, new ThrottlePolicy());
incompleteResultsThrottlers.add(incompleteResultsThrottler);
}
long sum = 0;
@@ -68,8 +72,8 @@ public class IncompleteResultsThrottlerTest {
if (fastForward) {
time = mockServer.nextRequestFinished();
}
- timeMs.set(time);
- mockServer.moveTime(timeMs.get());
+ clock.setInstant(Instant.ofEpochMilli(time));
+ mockServer.moveTime(clock.instant().toEpochMilli());
for (int y = 0; y < clientCount; y++) {
// Fill up, but don't block as that would stop the simulation.
while (incompleteResultsThrottlers.get(y).availableCapacity() > 0) {
@@ -140,45 +144,46 @@ public class IncompleteResultsThrottlerTest {
}
}
- private void moveToNextCycle(final IncompleteResultsThrottler throttler, AtomicLong timeMs)
+ private void moveToNextCycle(final IncompleteResultsThrottler throttler, ManualClock clock)
throws InterruptedException {
waitForThreads();
// Enter an adaption phase, we don't care about this phase.
- timeMs.addAndGet(throttler.phaseSizeMs);
+ clock.advance(Duration.ofMillis(throttler.phaseSizeMs));
throttler.operationStart();
throttler.resultReady(false);
// Now enter the real next phase.
- timeMs.addAndGet(throttler.phaseSizeMs);
+ clock.advance(Duration.ofMillis(throttler.phaseSizeMs));
throttler.operationStart();
throttler.resultReady(false);
}
@Test
public void testInteractionWithPolicyByMockingPolicy() throws InterruptedException {
+ ManualClock clock = new ManualClock(Instant.ofEpochMilli(0));
final int MAX_SIZE = 1000;
final int MORE_THAN_MAX_SIZE = MAX_SIZE + 20;
final int SIZE_AFTER_CYCLE_FIRST = 30;
final int SIZE_AFTER_CYCLE_SECOND = 5000;
ThrottlePolicy policy = mock(ThrottlePolicy.class);
- final AtomicLong timeMs = new AtomicLong(0);
IncompleteResultsThrottler incompleteResultsThrottler =
- new IncompleteResultsThrottler(2, MAX_SIZE, ()->timeMs.get(), policy);
+ new IncompleteResultsThrottler(2, MAX_SIZE, clock, policy);
long bucketSizeMs = incompleteResultsThrottler.phaseSizeMs;
// Cycle 1 - Algorithm has fixed value for max-in-flight: INITIAL_MAX_IN_FLIGHT_VALUE.
// We post a few operations, not all finishing in this cycle. We explicitly do not fill the window
// size to test the argument about any requests blocked.
- assertThat(incompleteResultsThrottler.availableCapacity(),
- is(IncompleteResultsThrottler.INITIAL_MAX_IN_FLIGHT_VALUE));
+ assertEquals(IncompleteResultsThrottler.INITIAL_MAX_IN_FLIGHT_VALUE,
+ incompleteResultsThrottler.availableCapacity());
postOperations(20, incompleteResultsThrottler);
postSuccesses(15, incompleteResultsThrottler);
- moveToNextCycle(incompleteResultsThrottler, timeMs);
+ moveToNextCycle(incompleteResultsThrottler, clock);
// Cycle 2 - Algorithm has fixed value also for second iteration: SECOND_MAX_IN_FLIGHT_VALUE.
// Test verifies that this value is used, and insert a value to be used for next phase SIZE_AFTER_CYCLE_FIRST.
- assertThat(incompleteResultsThrottler.availableCapacity(),
- is(IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE - 5)); // 5 slots already taken earlier
+ assertEquals("5 slots already taken earlier",
+ IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE - 5,
+ incompleteResultsThrottler.availableCapacity());
postSuccesses(5, incompleteResultsThrottler);
when(policy.calcNewMaxInFlight(
anyDouble(), // Max performance change
@@ -188,12 +193,11 @@ public class IncompleteResultsThrottlerTest {
eq(IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE), // current size
eq(false))) // is any request blocked, should be false since we only posted 20 docs.
.thenReturn(SIZE_AFTER_CYCLE_FIRST);
- moveToNextCycle(incompleteResultsThrottler, timeMs);
+ moveToNextCycle(incompleteResultsThrottler, clock);
// Cycle 3 - Test that value set in previous phase is used. Now return a very large number.
// However, this number should be cropped by the system (tested in next cycle).
- assertThat(incompleteResultsThrottler.availableCapacity(),
- is(SIZE_AFTER_CYCLE_FIRST));
+ assertEquals(SIZE_AFTER_CYCLE_FIRST, incompleteResultsThrottler.availableCapacity());
postOperations(MORE_THAN_MAX_SIZE, incompleteResultsThrottler);
postSuccesses(MORE_THAN_MAX_SIZE, incompleteResultsThrottler);
when(policy.calcNewMaxInFlight(
@@ -204,11 +208,10 @@ public class IncompleteResultsThrottlerTest {
eq(SIZE_AFTER_CYCLE_FIRST),// current size
eq(true))) // is any request blocked, should be true since we posted MORE_THAN_MAX_SIZE docs.
.thenReturn(SIZE_AFTER_CYCLE_SECOND);
- moveToNextCycle(incompleteResultsThrottler, timeMs);
+ moveToNextCycle(incompleteResultsThrottler, clock);
// Cycle 4 - Test that the large number from previous cycle is cropped and that max value is used instead.
- assertThat(incompleteResultsThrottler.availableCapacity(),
- is(MAX_SIZE));
+ assertEquals(MAX_SIZE, incompleteResultsThrottler.availableCapacity());
}
private long inversesU(int size, int sweetSpot) {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
index 2f801012d6d..e4ae138054d 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
@@ -33,10 +33,10 @@ import static org.mockito.Mockito.when;
public class OperationProcessorTest {
final Queue<Result> queue = new ArrayDeque<>();
- final Document doc1 = new Document("id:a:type::b", null, "data doc 1", null);
- final Document doc1b = new Document("id:a:type::b", null, "data doc 1b", null);
- final Document doc2 = new Document("id:a:type::b2", null, "data doc 2", null);
- final Document doc3 = new Document("id:a:type::b3", null, "data doc 3", null);
+ final Document doc1 = new Document("id:a:type::b", null, "data doc 1", null, Clock.systemUTC().instant());
+ final Document doc1b = new Document("id:a:type::b", null, "data doc 1b", null, Clock.systemUTC().instant());
+ final Document doc2 = new Document("id:a:type::b2", null, "data doc 2", null, Clock.systemUTC().instant());
+ final Document doc3 = new Document("id:a:type::b3", null, "data doc 3", null, Clock.systemUTC().instant());
@Test
public void testBasic() {
@@ -203,7 +203,7 @@ public class OperationProcessorTest {
Queue<Document> documentQueue = new ArrayDeque<>();
for (int x = 0; x < 100; x++) {
- Document document = new Document("id:a:type::b", null, String.valueOf(x), null);
+ Document document = new Document("id:a:type::b", null, String.valueOf(x), null, Clock.systemUTC().instant());
operationProcessor.sendDocument(document);
documentQueue.add(document);
}