summaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client/src/main')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java2
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java56
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java19
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java7
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java2
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java24
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java8
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java18
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java10
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java7
13 files changed, 100 insertions, 92 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
index 473b9494ba4..16374ec07cc 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java
@@ -78,7 +78,6 @@ public class Result {
private final Endpoint endpoint;
private final Exception exception;
private final String traceMessage;
- private final long timeStampMillis = System.currentTimeMillis();
public Detail(Endpoint endpoint, ResultType resultType, String traceMessage, Exception e) {
this.endpoint = endpoint;
@@ -133,7 +132,6 @@ public class Result {
b.append(" trace='").append(traceMessage).append("'");
if (endpoint != null)
b.append(" endpoint=").append(endpoint);
- b.append(" resultTimeLocally=").append(timeStampMillis).append("\n");
return b.toString();
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
index a08064c8ce5..98fd2f9da84 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java
@@ -7,6 +7,8 @@ import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -18,44 +20,40 @@ final public class Document {
private final String documentId;
private final ByteBuffer data;
- private final long createTimeMillis = System.currentTimeMillis();
- // This is initialized lazily to reduce work on calling thread (which is the thread calling the API).
+ private final Instant createTime;
+ // This is initialized lazily to reduce work on calling thread (which is the thread calling the API)
private String operationId = null;
private final Object context;
- private long queueInsertTimestampMillis;
+ private Instant queueInsertTime;
- public Document(String documentId, byte[] data, Object context) {
- this.documentId = documentId;
- this.context = context;
- this.data = ByteBuffer.wrap(data);
+ public Document(String documentId, byte[] data, Object context, Instant createTime) {
+ this(documentId, null, ByteBuffer.wrap(data), context, createTime);
}
- public Document(String documentId, String operationId, CharSequence data, Object context) {
+ public Document(String documentId, String operationId, CharSequence data, Object context, Instant createTime) {
+ this(documentId, operationId, encode(data, documentId), context, createTime);
+ }
+
+ private Document(String documentId, String operationId, ByteBuffer data, Object context, Instant createTime) {
this.documentId = documentId;
this.operationId = operationId;
+ this.data = data;
this.context = context;
- try {
- this.data = StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data));
- } catch (CharacterCodingException e) {
- throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e);
- }
+ this.createTime = Objects.requireNonNull(createTime, "createTime cannot be null");
+ this.queueInsertTime = createTime;
}
- public void resetQueueTime() {
- queueInsertTimestampMillis = System.currentTimeMillis();
+ public void setQueueInsertTime(Instant queueInsertTime) {
+ this.queueInsertTime = queueInsertTime;
}
- public long timeInQueueMillis() {
- return System.currentTimeMillis() - queueInsertTimestampMillis;
- }
+ public Instant getQueueInsertTime() { return queueInsertTime; }
public CharSequence getDataAsString() {
return StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer());
}
- public Object getContext() {
- return context;
- }
+ public Object getContext() { return context; }
public static class DocumentException extends IOException {
private static final long serialVersionUID = 29832833292L;
@@ -65,9 +63,7 @@ final public class Document {
}
}
- public String getDocumentId() {
- return documentId;
- }
+ public String getDocumentId() { return documentId; }
public ByteBuffer getData() {
return data.asReadOnlyBuffer();
@@ -77,9 +73,7 @@ final public class Document {
return data.remaining();
}
- public long createTimeMillis() {
- return createTimeMillis;
- }
+ public Instant createTime() { return createTime; }
public String getOperationId() {
if (operationId == null) {
@@ -91,4 +85,12 @@ final public class Document {
@Override
public String toString() { return "document '" + documentId + "'"; }
+ private static ByteBuffer encode(CharSequence data, String documentId) {
+ try {
+ return StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data));
+ } catch (CharacterCodingException e) {
+ throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e);
+ }
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
index afa4cd0ae14..a950cb545de 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
*/
public class FeedClientImpl implements FeedClient {
+ private final Clock clock;
private final OperationProcessor operationProcessor;
private final long closeTimeoutMs;
private final long sleepTimeMs = 500;
@@ -32,15 +33,15 @@ public class FeedClientImpl implements FeedClient {
ResultCallback resultCallback,
ScheduledThreadPoolExecutor timeoutExecutor,
Clock clock) {
- this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * (
- sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
- sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
+ this.clock = clock;
+ this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) *
+ (sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
+ sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
this.operationProcessor = new OperationProcessor(
- new IncompleteResultsThrottler(
- sessionParams.getThrottlerMinSize(),
- sessionParams.getClientQueueSize(),
- ()->System.currentTimeMillis(),
- new ThrottlePolicy()),
+ new IncompleteResultsThrottler(sessionParams.getThrottlerMinSize(),
+ sessionParams.getClientQueueSize(),
+ clock,
+ new ThrottlePolicy()),
resultCallback,
sessionParams,
timeoutExecutor,
@@ -53,7 +54,7 @@ public class FeedClientImpl implements FeedClient {
charsetEncoder.onMalformedInput(CodingErrorAction.REPORT);
charsetEncoder.onUnmappableCharacter(CodingErrorAction.REPORT);
- Document document = new Document(documentId, operationId, documentData, context);
+ Document document = new Document(documentId, operationId, documentData, context, clock.instant());
operationProcessor.sendDocument(document);
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
index bf55a46277d..e09cecf7161 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java
@@ -6,6 +6,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.time.Clock;
/**
* Class for wiring up the Session API. It is the return value of stream() in the Session API.
@@ -17,19 +18,21 @@ class MultiClusterSessionOutputStream extends ByteArrayOutputStream {
private final CharSequence documentId;
private final OperationProcessor operationProcessor;
private final Object context;
+ private final Clock clock;
- public MultiClusterSessionOutputStream(
- CharSequence documentId,
- OperationProcessor operationProcessor,
- Object context) {
+ public MultiClusterSessionOutputStream(CharSequence documentId,
+ OperationProcessor operationProcessor,
+ Object context,
+ Clock clock) {
this.documentId = documentId;
this.context = context;
this.operationProcessor = operationProcessor;
+ this.clock = clock;
}
@Override
public void close() throws IOException {
- Document document = new Document(documentId.toString(), toByteArray(), context);
+ Document document = new Document(documentId.toString(), toByteArray(), context, clock.instant());
operationProcessor.sendDocument(document);
super.close();
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
index 1663e876d83..a68d7eb7524 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java
@@ -24,14 +24,15 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session {
private final OperationProcessor operationProcessor;
private final BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
-
+ private final Clock clock;
public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) {
+ this.clock = clock;
this.operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(
sessionParams.getThrottlerMinSize(),
sessionParams.getClientQueueSize(),
- ()->System.currentTimeMillis(),
+ clock,
new ThrottlePolicy()),
new FeedClient.ResultCallback() {
@Override
@@ -46,7 +47,7 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session {
@Override
public OutputStream stream(CharSequence documentId) {
- return new MultiClusterSessionOutputStream(documentId, operationProcessor, null);
+ return new MultiClusterSessionOutputStream(documentId, operationProcessor, null, clock);
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index bc537c42f88..a232ceeacf5 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -56,7 +56,7 @@ public class ClusterConnection implements AutoCloseable {
throw new IllegalArgumentException("At least 1 persistent connection per endpoint is required in " + cluster);
int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster);
- documentQueue = new DocumentQueue(clientQueueSizePerCluster);
+ documentQueue = new DocumentQueue(clientQueueSizePerCluster, clock);
ioThreadGroup = operationProcessor.getIoThreadGroup();
singleEndpoint = cluster.getEndpoints().size() == 1 ? cluster.getEndpoints().get(0) : null;
Double idlePollFrequency = feedParams.getIdlePollFrequency();
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
index ce867d001aa..3536013e043 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.core.Document;
+import java.time.Clock;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -22,10 +23,12 @@ class DocumentQueue {
private final Deque<Document> queue;
private final int maxSize;
private boolean closed = false;
+ private final Clock clock;
- DocumentQueue(int maxSize) {
+ DocumentQueue(int maxSize, Clock clock) {
this.maxSize = maxSize;
this.queue = new ArrayDeque<>(maxSize);
+ this.clock = clock;
}
List<Document> removeAllDocuments() {
@@ -40,7 +43,7 @@ class DocumentQueue {
}
void put(Document document, boolean calledFromIoThreadGroup) throws InterruptedException {
- document.resetQueueTime();
+ document.setQueueInsertTime(clock.instant());
synchronized (queue) {
while (!closed && (queue.size() >= maxSize) && !calledFromIoThreadGroup) {
queue.wait();
@@ -57,9 +60,9 @@ class DocumentQueue {
synchronized (queue) {
long remainingToWait = unit.toMillis(timeout);
while (queue.isEmpty()) {
- long startTime = System.currentTimeMillis();
+ long startTime = clock.millis();
queue.wait(remainingToWait);
- remainingToWait -= (System.currentTimeMillis() - startTime);
+ remainingToWait -= (clock.millis() - startTime);
if (remainingToWait <= 0) {
break;
}
@@ -109,14 +112,13 @@ class DocumentQueue {
Optional<Document> pollDocumentIfTimedoutInQueue(Duration localQueueTimeOut) {
synchronized (queue) {
- if (queue.isEmpty()) {
- return Optional.empty();
- }
+ if (queue.isEmpty()) return Optional.empty();
+
Document document = queue.peek();
- if (document.timeInQueueMillis() > localQueueTimeOut.toMillis()) {
- return Optional.of(queue.poll());
- }
- return Optional.empty();
+ if (document.getQueueInsertTime().plus(localQueueTimeOut).isBefore(clock.instant()))
+ return Optional.ofNullable(queue.poll());
+ else
+ return Optional.empty();
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 40c100fa1eb..99aff8b4baa 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -263,11 +263,11 @@ class IOThread implements Runnable, AutoCloseable {
private ProcessResponse feedDocumentAndProcessResults(List<Document> docs)
throws ServerResponseException, IOException {
addDocumentsToResultQueue(docs);
- long startTime = System.currentTimeMillis();
+ long startTime = clock.millis();
InputStream serverResponse = sendAndReceive(docs);
ProcessResponse processResponse = processResponse(serverResponse);
- lastGatewayProcessTimeMillis.set((int) (System.currentTimeMillis() - startTime));
+ lastGatewayProcessTimeMillis.set((int) (clock.millis() - startTime));
return processResponse;
}
@@ -418,7 +418,7 @@ class IOThread implements Runnable, AutoCloseable {
EndpointResult endpointResult = EndPointResultFactory.createTransientError(
endpoint, document.get().getOperationId(),
new Exception("Not sending document operation, timed out in queue after " +
- document.get().timeInQueueMillis() + " ms."));
+ (clock.millis() - document.get().getQueueInsertTime().toEpochMilli()) + " ms."));
resultQueue.failOperation(endpointResult, clusterId);
}
}
@@ -467,7 +467,7 @@ class IOThread implements Runnable, AutoCloseable {
if (connection.lastPollTime() == null) return true;
// Poll less the closer the connection comes to closing time
- double newness = ( closingTime(connection).toEpochMilli() - clock.instant().toEpochMilli() ) /
+ double newness = ( closingTime(connection).toEpochMilli() - clock.millis() ) /
(double)localQueueTimeOut.toMillis();
if (newness < 0) return true; // connection retired prematurely
if (newness > 1) return false; // closing time reached
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
index 883cea7e6f0..27ad88c123e 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.core.Document;
+import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
@@ -18,24 +19,25 @@ class DocumentSendInfo {
// This is lazily populated as normal cases does not require retries.
private Map<Integer, Integer> attemptedRetriesByClusterId = null;
private final StringBuilder localTrace;
+ private final Clock clock;
- DocumentSendInfo(Document document, boolean traceThisDoc) {
+ DocumentSendInfo(Document document, boolean traceThisDoc, Clock clock) {
this.document = document;
- localTrace = traceThisDoc
- ? new StringBuilder("\n" + document.createTimeMillis() + " Trace starting " + "\n")
- : null;
+ localTrace = traceThisDoc ? new StringBuilder("\n" + document.createTime() + " Trace starting " + "\n")
+ : null;
+ this.clock = clock;
}
boolean addIfNotAlreadyThere(Result.Detail detail, int clusterId) {
if (detailByClusterId.containsKey(clusterId)) {
if (localTrace != null) {
- localTrace.append(System.currentTimeMillis() + " Got duplicate detail, ignoring this: "
- + detail.toString() + "\n");
+ localTrace.append(clock.millis() + " Got duplicate detail, ignoring this: " +
+ detail.toString() + "\n");
}
return false;
}
if (localTrace != null) {
- localTrace.append(System.currentTimeMillis() + " Got detail: " + detail.toString() + "\n");
+ localTrace.append(clock.millis() + " Got detail: " + detail.toString() + "\n");
}
detailByClusterId.put(clusterId, detail);
return true;
@@ -60,7 +62,7 @@ class DocumentSendInfo {
retries++;
attemptedRetriesByClusterId.put(clusterId, retries);
if (localTrace != null) {
- localTrace.append(System.currentTimeMillis() + " Asked about retrying for cluster ID "
+ localTrace.append(clock.millis() + " Asked about retrying for cluster ID "
+ clusterId + ", number of retries is " + retries + " Detail:\n" + detail.toString());
}
return retries;
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
index 205153a7a00..3d662eca3e7 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java
@@ -21,12 +21,11 @@ import java.util.logging.Logger;
*/
public final class EndPointResultFactory {
- private static Logger log = Logger.getLogger(EndPointResultFactory.class.getName());
-
+ private static final Logger log = Logger.getLogger(EndPointResultFactory.class.getName());
private static final String EMPTY_MESSAGE = "-";
- public static Collection<EndpointResult> createResult(
- Endpoint endpoint, InputStream inputStream) throws IOException {
+ public static Collection<EndpointResult> createResult(Endpoint endpoint,
+ InputStream inputStream) throws IOException {
List<EndpointResult> results = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.US_ASCII))) {
@@ -82,9 +81,9 @@ public final class EndPointResultFactory {
return new EndpointResult(
reply.operationId,
new Result.Detail(endpoint,
- replyToResultType(reply),
- reply.traceMessage,
- exception));
+ replyToResultType(reply),
+ reply.traceMessage,
+ exception));
} catch (Throwable t) {
throw new IllegalArgumentException("Bad result line from server: '" + line + "'", t);
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
index 7cf4e32a880..ebeee802303 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor;
import com.yahoo.vespa.http.client.core.ThrottlePolicy;
+import java.time.Clock;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -57,6 +58,7 @@ public class IncompleteResultsThrottler {
/**
* Creates the throttler.
+ *
* @param minInFlightValue the throttler will never throttle beyond this limit.
* @param maxInFlightValue the throttler will never throttle above this limit. If zero, no limit.
* @param clock use to calculate window size. Can be null if minWindowSize and maxInFlightValue are equal.
@@ -68,7 +70,7 @@ public class IncompleteResultsThrottler {
this.policy = policy;
this.clock = clock;
if (minInFlightValue != maxInFlightValue) {
- this.sampleStartTimeMs = clock.getTimeMillis();
+ this.sampleStartTimeMs = clock.millis();
}
setNewSemaphoreSize(INITIAL_MAX_IN_FLIGHT_VALUE);
}
@@ -96,10 +98,6 @@ public class IncompleteResultsThrottler {
}
}
- public interface Clock {
- long getTimeMillis();
- }
-
public void resultReady(boolean success) {
blocker.operationDone();
if (!success) {
@@ -147,9 +145,8 @@ public class IncompleteResultsThrottler {
}
private void adjustThrottling() {
- if (clock.getTimeMillis() < sampleStartTimeMs + phaseSizeMs) {
- return;
- }
+ if (clock.millis() < sampleStartTimeMs + phaseSizeMs) return;
+
sampleStartTimeMs += phaseSizeMs;
if (stabilizingPhasesLeft-- == 0) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 735b7332c03..702a316422d 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -56,6 +56,7 @@ public class OperationProcessor {
private final boolean traceToStderr;
private final ThreadGroup ioThreadGroup;
private final String clientId = new BigInteger(130, random).toString(32);
+ private final Clock clock;
public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler,
FeedClient.ResultCallback resultCallback,
@@ -67,6 +68,7 @@ public class OperationProcessor {
this.incompleteResultsThrottler = incompleteResultsThrottler;
this.timeoutExecutor = timeoutExecutor;
this.ioThreadGroup = new ThreadGroup("operationprocessor");
+ this.clock = clock;
if (sessionParams.getClusters().isEmpty())
throw new IllegalArgumentException("Cannot feed to 0 clusters.");
@@ -184,7 +186,7 @@ public class OperationProcessor {
}
}
if (blockedDocumentToSend != null) {
- sendToClusters(blockedDocumentToSend);
+ sendToClusters(blockedDocumentToSend, clock);
}
return result;
}
@@ -228,13 +230,13 @@ public class OperationProcessor {
inflightDocumentIds.add(document.getDocumentId());
}
- sendToClusters(document);
+ sendToClusters(document, clock);
}
- private void sendToClusters(Document document) {
+ private void sendToClusters(Document document, Clock clock) {
synchronized (monitor) {
boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0;
- docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc));
+ docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc, clock));
}
for (ClusterConnection clusterConnection : clusters) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
index 7c034cab75f..926b4cf8c79 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java
@@ -10,6 +10,7 @@ import com.yahoo.vespa.http.client.core.XmlFeedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,11 +35,11 @@ public class Runner {
boolean isJson,
AtomicInteger numSent,
boolean verbose) {
-
+ Clock clock = Clock.systemUTC();
if (verbose)
System.err.println("Now sending data.");
- long sendStartTime = System.currentTimeMillis();
+ long sendStartTime = clock.millis();
if (isJson) {
JsonReader.read(inputStream, feedClient, numSent);
} else {
@@ -49,7 +50,7 @@ public class Runner {
}
}
- long sendTotalTime = System.currentTimeMillis() - sendStartTime;
+ long sendTotalTime = clock.millis() - sendStartTime;
if (verbose)
System.err.println("Waiting for all results, sent " + numSent.get() + " docs.");