diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-08-28 12:08:27 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-08-28 12:08:27 +0200 |
commit | 72bdb502a00cef23150d61e9c309938121763343 (patch) | |
tree | 0c1fa9745fe5c62571484997849381e349958c9c /vespa-http-client | |
parent | 364db9b44900a0453d6ab509bc5523e55295d30c (diff) |
Completely parametrize time
Diffstat (limited to 'vespa-http-client')
20 files changed, 247 insertions, 186 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java index 473b9494ba4..16374ec07cc 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/Result.java @@ -78,7 +78,6 @@ public class Result { private final Endpoint endpoint; private final Exception exception; private final String traceMessage; - private final long timeStampMillis = System.currentTimeMillis(); public Detail(Endpoint endpoint, ResultType resultType, String traceMessage, Exception e) { this.endpoint = endpoint; @@ -133,7 +132,6 @@ public class Result { b.append(" trace='").append(traceMessage).append("'"); if (endpoint != null) b.append(" endpoint=").append(endpoint); - b.append(" resultTimeLocally=").append(timeStampMillis).append("\n"); return b.toString(); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java index a08064c8ce5..98fd2f9da84 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/Document.java @@ -7,6 +7,8 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; /** @@ -18,44 +20,40 @@ final public class Document { private final String documentId; private final ByteBuffer data; - private final long createTimeMillis = System.currentTimeMillis(); - // This is initialized lazily to reduce work on calling thread (which is the thread calling the API). + private final Instant createTime; + // This is initialized lazily to reduce work on calling thread (which is the thread calling the API) private String operationId = null; private final Object context; - private long queueInsertTimestampMillis; + private Instant queueInsertTime; - public Document(String documentId, byte[] data, Object context) { - this.documentId = documentId; - this.context = context; - this.data = ByteBuffer.wrap(data); + public Document(String documentId, byte[] data, Object context, Instant createTime) { + this(documentId, null, ByteBuffer.wrap(data), context, createTime); } - public Document(String documentId, String operationId, CharSequence data, Object context) { + public Document(String documentId, String operationId, CharSequence data, Object context, Instant createTime) { + this(documentId, operationId, encode(data, documentId), context, createTime); + } + + private Document(String documentId, String operationId, ByteBuffer data, Object context, Instant createTime) { this.documentId = documentId; this.operationId = operationId; + this.data = data; this.context = context; - try { - this.data = StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data)); - } catch (CharacterCodingException e) { - throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e); - } + this.createTime = Objects.requireNonNull(createTime, "createTime cannot be null"); + this.queueInsertTime = createTime; } - public void resetQueueTime() { - queueInsertTimestampMillis = System.currentTimeMillis(); + public void setQueueInsertTime(Instant queueInsertTime) { + this.queueInsertTime = queueInsertTime; } - public long timeInQueueMillis() { - return System.currentTimeMillis() - queueInsertTimestampMillis; - } + public Instant getQueueInsertTime() { return queueInsertTime; } public CharSequence getDataAsString() { return StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer()); } - public Object getContext() { - return context; - } + public Object getContext() { return context; } public static class DocumentException extends IOException { private static final long serialVersionUID = 29832833292L; @@ -65,9 +63,7 @@ final public class Document { } } - public String getDocumentId() { - return documentId; - } + public String getDocumentId() { return documentId; } public ByteBuffer getData() { return data.asReadOnlyBuffer(); @@ -77,9 +73,7 @@ final public class Document { return data.remaining(); } - public long createTimeMillis() { - return createTimeMillis; - } + public Instant createTime() { return createTime; } public String getOperationId() { if (operationId == null) { @@ -91,4 +85,12 @@ final public class Document { @Override public String toString() { return "document '" + documentId + "'"; } + private static ByteBuffer encode(CharSequence data, String documentId) { + try { + return StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(data)); + } catch (CharacterCodingException e) { + throw new RuntimeException("Error encoding document data into UTF8 " + documentId, e); + } + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index afa4cd0ae14..a950cb545de 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; */ public class FeedClientImpl implements FeedClient { + private final Clock clock; private final OperationProcessor operationProcessor; private final long closeTimeoutMs; private final long sleepTimeMs = 500; @@ -32,15 +33,15 @@ public class FeedClientImpl implements FeedClient { ResultCallback resultCallback, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) { - this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * ( - sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + - sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS)); + this.clock = clock; + this.closeTimeoutMs = (10 + 3 * sessionParams.getConnectionParams().getMaxRetries()) * + (sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + + sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS)); this.operationProcessor = new OperationProcessor( - new IncompleteResultsThrottler( - sessionParams.getThrottlerMinSize(), - sessionParams.getClientQueueSize(), - ()->System.currentTimeMillis(), - new ThrottlePolicy()), + new IncompleteResultsThrottler(sessionParams.getThrottlerMinSize(), + sessionParams.getClientQueueSize(), + clock, + new ThrottlePolicy()), resultCallback, sessionParams, timeoutExecutor, @@ -53,7 +54,7 @@ public class FeedClientImpl implements FeedClient { charsetEncoder.onMalformedInput(CodingErrorAction.REPORT); charsetEncoder.onUnmappableCharacter(CodingErrorAction.REPORT); - Document document = new Document(documentId, operationId, documentData, context); + Document document = new Document(documentId, operationId, documentData, context, clock.instant()); operationProcessor.sendDocument(document); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java index bf55a46277d..e09cecf7161 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/MultiClusterSessionOutputStream.java @@ -6,6 +6,7 @@ import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.time.Clock; /** * Class for wiring up the Session API. It is the return value of stream() in the Session API. @@ -17,19 +18,21 @@ class MultiClusterSessionOutputStream extends ByteArrayOutputStream { private final CharSequence documentId; private final OperationProcessor operationProcessor; private final Object context; + private final Clock clock; - public MultiClusterSessionOutputStream( - CharSequence documentId, - OperationProcessor operationProcessor, - Object context) { + public MultiClusterSessionOutputStream(CharSequence documentId, + OperationProcessor operationProcessor, + Object context, + Clock clock) { this.documentId = documentId; this.context = context; this.operationProcessor = operationProcessor; + this.clock = clock; } @Override public void close() throws IOException { - Document document = new Document(documentId.toString(), toByteArray(), context); + Document document = new Document(documentId.toString(), toByteArray(), context, clock.instant()); operationProcessor.sendDocument(document); super.close(); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java index 1663e876d83..a68d7eb7524 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/SessionImpl.java @@ -24,14 +24,15 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session { private final OperationProcessor operationProcessor; private final BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>(); - + private final Clock clock; public SessionImpl(SessionParams sessionParams, ScheduledThreadPoolExecutor timeoutExecutor, Clock clock) { + this.clock = clock; this.operationProcessor = new OperationProcessor( new IncompleteResultsThrottler( sessionParams.getThrottlerMinSize(), sessionParams.getClientQueueSize(), - ()->System.currentTimeMillis(), + clock, new ThrottlePolicy()), new FeedClient.ResultCallback() { @Override @@ -46,7 +47,7 @@ public class SessionImpl implements com.yahoo.vespa.http.client.Session { @Override public OutputStream stream(CharSequence documentId) { - return new MultiClusterSessionOutputStream(documentId, operationProcessor, null); + return new MultiClusterSessionOutputStream(documentId, operationProcessor, null, clock); } @Override diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index bc537c42f88..a232ceeacf5 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -56,7 +56,7 @@ public class ClusterConnection implements AutoCloseable { throw new IllegalArgumentException("At least 1 persistent connection per endpoint is required in " + cluster); int maxInFlightPerSession = Math.max(1, feedParams.getMaxInFlightRequests() / totalNumberOfEndpointsInThisCluster); - documentQueue = new DocumentQueue(clientQueueSizePerCluster); + documentQueue = new DocumentQueue(clientQueueSizePerCluster, clock); ioThreadGroup = operationProcessor.getIoThreadGroup(); singleEndpoint = cluster.getEndpoints().size() == 1 ? cluster.getEndpoints().get(0) : null; Double idlePollFrequency = feedParams.getIdlePollFrequency(); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java index ce867d001aa..3536013e043 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.core.Document; +import java.time.Clock; import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; @@ -22,10 +23,12 @@ class DocumentQueue { private final Deque<Document> queue; private final int maxSize; private boolean closed = false; + private final Clock clock; - DocumentQueue(int maxSize) { + DocumentQueue(int maxSize, Clock clock) { this.maxSize = maxSize; this.queue = new ArrayDeque<>(maxSize); + this.clock = clock; } List<Document> removeAllDocuments() { @@ -40,7 +43,7 @@ class DocumentQueue { } void put(Document document, boolean calledFromIoThreadGroup) throws InterruptedException { - document.resetQueueTime(); + document.setQueueInsertTime(clock.instant()); synchronized (queue) { while (!closed && (queue.size() >= maxSize) && !calledFromIoThreadGroup) { queue.wait(); @@ -57,9 +60,9 @@ class DocumentQueue { synchronized (queue) { long remainingToWait = unit.toMillis(timeout); while (queue.isEmpty()) { - long startTime = System.currentTimeMillis(); + long startTime = clock.millis(); queue.wait(remainingToWait); - remainingToWait -= (System.currentTimeMillis() - startTime); + remainingToWait -= (clock.millis() - startTime); if (remainingToWait <= 0) { break; } @@ -109,14 +112,13 @@ class DocumentQueue { Optional<Document> pollDocumentIfTimedoutInQueue(Duration localQueueTimeOut) { synchronized (queue) { - if (queue.isEmpty()) { - return Optional.empty(); - } + if (queue.isEmpty()) return Optional.empty(); + Document document = queue.peek(); - if (document.timeInQueueMillis() > localQueueTimeOut.toMillis()) { - return Optional.of(queue.poll()); - } - return Optional.empty(); + if (document.getQueueInsertTime().plus(localQueueTimeOut).isBefore(clock.instant())) + return Optional.ofNullable(queue.poll()); + else + return Optional.empty(); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 40c100fa1eb..99aff8b4baa 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -263,11 +263,11 @@ class IOThread implements Runnable, AutoCloseable { private ProcessResponse feedDocumentAndProcessResults(List<Document> docs) throws ServerResponseException, IOException { addDocumentsToResultQueue(docs); - long startTime = System.currentTimeMillis(); + long startTime = clock.millis(); InputStream serverResponse = sendAndReceive(docs); ProcessResponse processResponse = processResponse(serverResponse); - lastGatewayProcessTimeMillis.set((int) (System.currentTimeMillis() - startTime)); + lastGatewayProcessTimeMillis.set((int) (clock.millis() - startTime)); return processResponse; } @@ -418,7 +418,7 @@ class IOThread implements Runnable, AutoCloseable { EndpointResult endpointResult = EndPointResultFactory.createTransientError( endpoint, document.get().getOperationId(), new Exception("Not sending document operation, timed out in queue after " + - document.get().timeInQueueMillis() + " ms.")); + (clock.millis() - document.get().getQueueInsertTime().toEpochMilli()) + " ms.")); resultQueue.failOperation(endpointResult, clusterId); } } @@ -467,7 +467,7 @@ class IOThread implements Runnable, AutoCloseable { if (connection.lastPollTime() == null) return true; // Poll less the closer the connection comes to closing time - double newness = ( closingTime(connection).toEpochMilli() - clock.instant().toEpochMilli() ) / + double newness = ( closingTime(connection).toEpochMilli() - clock.millis() ) / (double)localQueueTimeOut.toMillis(); if (newness < 0) return true; // connection retired prematurely if (newness > 1) return false; // closing time reached diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java index 883cea7e6f0..27ad88c123e 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/DocumentSendInfo.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor; import com.yahoo.vespa.http.client.Result; import com.yahoo.vespa.http.client.core.Document; +import java.time.Clock; import java.util.HashMap; import java.util.Map; @@ -18,24 +19,25 @@ class DocumentSendInfo { // This is lazily populated as normal cases does not require retries. private Map<Integer, Integer> attemptedRetriesByClusterId = null; private final StringBuilder localTrace; + private final Clock clock; - DocumentSendInfo(Document document, boolean traceThisDoc) { + DocumentSendInfo(Document document, boolean traceThisDoc, Clock clock) { this.document = document; - localTrace = traceThisDoc - ? new StringBuilder("\n" + document.createTimeMillis() + " Trace starting " + "\n") - : null; + localTrace = traceThisDoc ? new StringBuilder("\n" + document.createTime() + " Trace starting " + "\n") + : null; + this.clock = clock; } boolean addIfNotAlreadyThere(Result.Detail detail, int clusterId) { if (detailByClusterId.containsKey(clusterId)) { if (localTrace != null) { - localTrace.append(System.currentTimeMillis() + " Got duplicate detail, ignoring this: " - + detail.toString() + "\n"); + localTrace.append(clock.millis() + " Got duplicate detail, ignoring this: " + + detail.toString() + "\n"); } return false; } if (localTrace != null) { - localTrace.append(System.currentTimeMillis() + " Got detail: " + detail.toString() + "\n"); + localTrace.append(clock.millis() + " Got detail: " + detail.toString() + "\n"); } detailByClusterId.put(clusterId, detail); return true; @@ -60,7 +62,7 @@ class DocumentSendInfo { retries++; attemptedRetriesByClusterId.put(clusterId, retries); if (localTrace != null) { - localTrace.append(System.currentTimeMillis() + " Asked about retrying for cluster ID " + localTrace.append(clock.millis() + " Asked about retrying for cluster ID " + clusterId + ", number of retries is " + retries + " Detail:\n" + detail.toString()); } return retries; diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java index 205153a7a00..3d662eca3e7 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/EndPointResultFactory.java @@ -21,12 +21,11 @@ import java.util.logging.Logger; */ public final class EndPointResultFactory { - private static Logger log = Logger.getLogger(EndPointResultFactory.class.getName()); - + private static final Logger log = Logger.getLogger(EndPointResultFactory.class.getName()); private static final String EMPTY_MESSAGE = "-"; - public static Collection<EndpointResult> createResult( - Endpoint endpoint, InputStream inputStream) throws IOException { + public static Collection<EndpointResult> createResult(Endpoint endpoint, + InputStream inputStream) throws IOException { List<EndpointResult> results = new ArrayList<>(); try (BufferedReader reader = new BufferedReader( new InputStreamReader(inputStream, StandardCharsets.US_ASCII))) { @@ -82,9 +81,9 @@ public final class EndPointResultFactory { return new EndpointResult( reply.operationId, new Result.Detail(endpoint, - replyToResultType(reply), - reply.traceMessage, - exception)); + replyToResultType(reply), + reply.traceMessage, + exception)); } catch (Throwable t) { throw new IllegalArgumentException("Bad result line from server: '" + line + "'", t); } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java index 7cf4e32a880..ebeee802303 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client.core.operationProcessor; import com.yahoo.vespa.http.client.core.ThrottlePolicy; +import java.time.Clock; import java.util.concurrent.ThreadLocalRandom; /** @@ -57,6 +58,7 @@ public class IncompleteResultsThrottler { /** * Creates the throttler. + * * @param minInFlightValue the throttler will never throttle beyond this limit. * @param maxInFlightValue the throttler will never throttle above this limit. If zero, no limit. * @param clock use to calculate window size. Can be null if minWindowSize and maxInFlightValue are equal. @@ -68,7 +70,7 @@ public class IncompleteResultsThrottler { this.policy = policy; this.clock = clock; if (minInFlightValue != maxInFlightValue) { - this.sampleStartTimeMs = clock.getTimeMillis(); + this.sampleStartTimeMs = clock.millis(); } setNewSemaphoreSize(INITIAL_MAX_IN_FLIGHT_VALUE); } @@ -96,10 +98,6 @@ public class IncompleteResultsThrottler { } } - public interface Clock { - long getTimeMillis(); - } - public void resultReady(boolean success) { blocker.operationDone(); if (!success) { @@ -147,9 +145,8 @@ public class IncompleteResultsThrottler { } private void adjustThrottling() { - if (clock.getTimeMillis() < sampleStartTimeMs + phaseSizeMs) { - return; - } + if (clock.millis() < sampleStartTimeMs + phaseSizeMs) return; + sampleStartTimeMs += phaseSizeMs; if (stabilizingPhasesLeft-- == 0) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 735b7332c03..702a316422d 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -56,6 +56,7 @@ public class OperationProcessor { private final boolean traceToStderr; private final ThreadGroup ioThreadGroup; private final String clientId = new BigInteger(130, random).toString(32); + private final Clock clock; public OperationProcessor(IncompleteResultsThrottler incompleteResultsThrottler, FeedClient.ResultCallback resultCallback, @@ -67,6 +68,7 @@ public class OperationProcessor { this.incompleteResultsThrottler = incompleteResultsThrottler; this.timeoutExecutor = timeoutExecutor; this.ioThreadGroup = new ThreadGroup("operationprocessor"); + this.clock = clock; if (sessionParams.getClusters().isEmpty()) throw new IllegalArgumentException("Cannot feed to 0 clusters."); @@ -184,7 +186,7 @@ public class OperationProcessor { } } if (blockedDocumentToSend != null) { - sendToClusters(blockedDocumentToSend); + sendToClusters(blockedDocumentToSend, clock); } return result; } @@ -228,13 +230,13 @@ public class OperationProcessor { inflightDocumentIds.add(document.getDocumentId()); } - sendToClusters(document); + sendToClusters(document, clock); } - private void sendToClusters(Document document) { + private void sendToClusters(Document document, Clock clock) { synchronized (monitor) { boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0; - docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc)); + docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc, clock)); } for (ClusterConnection clusterConnection : clusters) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java index 7c034cab75f..926b4cf8c79 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/runner/Runner.java @@ -10,6 +10,7 @@ import com.yahoo.vespa.http.client.core.XmlFeedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.time.Clock; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; @@ -34,11 +35,11 @@ public class Runner { boolean isJson, AtomicInteger numSent, boolean verbose) { - + Clock clock = Clock.systemUTC(); if (verbose) System.err.println("Now sending data."); - long sendStartTime = System.currentTimeMillis(); + long sendStartTime = clock.millis(); if (isJson) { JsonReader.read(inputStream, feedClient, numSent); } else { @@ -49,7 +50,7 @@ public class Runner { } } - long sendTotalTime = System.currentTimeMillis() - sendStartTime; + long sendTotalTime = clock.millis() - sendStartTime; if (verbose) System.err.println("Waiting for all results, sent " + numSent.get() + " docs."); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/ManualClock.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/ManualClock.java new file mode 100644 index 00000000000..b32d1eaa859 --- /dev/null +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/ManualClock.java @@ -0,0 +1,55 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.http.client; + +import java.time.Clock; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAmount; + +/** + * A clock which initially has the time of its creation but can only be advanced by calling advance + * + * @author bratseth + */ +public class ManualClock extends Clock { + + private Instant currentTime = Instant.now(); + + public ManualClock() {} + + public ManualClock(String utcIsoTime) { + this(at(utcIsoTime)); + } + + public ManualClock(Instant currentTime) { + this.currentTime = currentTime; + } + + public void advance(TemporalAmount temporal) { + currentTime = currentTime.plus(temporal); + } + + public void setInstant(Instant time) { + currentTime = time; + } + + @Override + public Instant instant() { return currentTime; } + + @Override + public ZoneId getZone() { return null; } + + @Override + public Clock withZone(ZoneId zone) { return null; } + + @Override + public long millis() { return currentTime.toEpochMilli(); } + + public static Instant at(String utcIsoTime) { + return LocalDateTime.parse(utcIsoTime, DateTimeFormatter.ISO_DATE_TIME).atZone(ZoneOffset.UTC).toInstant(); + } + +} diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java index b5c03eade51..ee2f021df6a 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/DocumentTest.java @@ -5,9 +5,9 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.nio.ReadOnlyBufferException; +import java.time.Clock; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; public class DocumentTest { @@ -15,25 +15,25 @@ public class DocumentTest { public void simpleCaseOk() { String docId = "doc id"; String docContent = "foo"; - Document document = new Document(docId, docContent.getBytes(), null); - assertThat(document.getDocumentId(), is(docId)); - assertThat(document.getData(), is(ByteBuffer.wrap(docContent.getBytes()))); - assertThat(document.getDataAsString().toString(), is(docContent)); + Document document = new Document(docId, docContent.getBytes(), null, Clock.systemUTC().instant()); + assertEquals(docId, document.getDocumentId()); + assertEquals(ByteBuffer.wrap(docContent.getBytes()), document.getData()); + assertEquals(docContent, document.getDataAsString().toString()); // Make sure that data is not modified on retrieval. - assertThat(document.getDataAsString().toString(), is(docContent)); - assertThat(document.getData(), is(ByteBuffer.wrap(docContent.getBytes()))); - assertThat(document.getDocumentId(), is(docId)); + assertEquals(docContent, document.getDataAsString().toString()); + assertEquals(ByteBuffer.wrap(docContent.getBytes()), document.getData()); + assertEquals(docId, document.getDocumentId()); } @Test(expected = ReadOnlyBufferException.class) public void notMutablePutTest() { - Document document = new Document("id", null, "data", null /* context */); + Document document = new Document("id", null, "data", null, Clock.systemUTC().instant()); document.getData().put("a".getBytes()); } @Test(expected = ReadOnlyBufferException.class) public void notMutableCompactTest() { - Document document = new Document("id", null, "data", null /* context */); + Document document = new Document("id", null, "data", null, Clock.systemUTC().instant()); document.getData().compact(); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java index 13859329515..2ee1a146dfa 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnectionTest.java @@ -33,8 +33,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.any; @@ -83,11 +81,10 @@ public class ApacheGatewayConnectionTest { @Test(expected=IllegalArgumentException.class) public void testServerReturnsBadSessionInV3() throws Exception { - final Endpoint endpoint = Endpoint.create("localhost", 666, false); - final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build(); - final String clusterSpecificRoute = ""; - final ConnectionParams connectionParams = new ConnectionParams.Builder() - .build(); + Endpoint endpoint = Endpoint.create("localhost", 666, false); + FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build(); + String clusterSpecificRoute = ""; + ConnectionParams connectionParams = new ConnectionParams.Builder().build(); // This is the fake server, returns wrong session Id. ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> httpResponse("Wrong Id from server", "3")); @@ -102,35 +99,33 @@ public class ApacheGatewayConnectionTest { "clientId", Clock.systemUTC()); apacheGatewayConnection.connect(); - final List<Document> documents = new ArrayList<>(); + List<Document> documents = new ArrayList<>(); apacheGatewayConnection.write(documents); } @Test public void testJsonDocumentHeader() throws Exception { - final Endpoint endpoint = Endpoint.create("localhost", 666, false); - final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build(); - final String clusterSpecificRoute = ""; - final ConnectionParams connectionParams = new ConnectionParams.Builder() - .setUseCompression(true) - .build(); - final List<Document> documents = new ArrayList<>(); + Endpoint endpoint = Endpoint.create("localhost", 666, false); + FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.JSON_UTF8).build(); + String clusterSpecificRoute = ""; + ConnectionParams connectionParams = new ConnectionParams.Builder().setUseCompression(true).build(); + List<Document> documents = new ArrayList<>(); - final String vespaDocContent ="Hello, I a JSON doc."; - final String docId = "42"; + String vespaDocContent ="Hello, I a JSON doc."; + String docId = "42"; - final AtomicInteger requestsReceived = new AtomicInteger(0); + AtomicInteger requestsReceived = new AtomicInteger(0); // This is the fake server, checks that DATA_FORMAT header is set properly. ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> { - final Header header = post.getFirstHeader(Headers.DATA_FORMAT); + Header header = post.getFirstHeader(Headers.DATA_FORMAT); if (requestsReceived.incrementAndGet() == 1) { // This is handshake, it is not json. assert (header == null); return httpResponse("clientId", "3"); } assertNotNull(header); - assertThat(header.getValue(), is(FeedParams.DataFormat.JSON_UTF8.name())); + assertEquals(FeedParams.DataFormat.JSON_UTF8.name(), header.getValue()); // Test is done. return httpResponse("clientId", "3"); }); @@ -154,13 +149,13 @@ public class ApacheGatewayConnectionTest { @Test public void testZipAndCreateEntity() throws IOException { - final String testString = "Hello world"; + String testString = "Hello world"; InputStream stream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8)); // Send in test data to method. InputStreamEntity inputStreamEntity = ApacheGatewayConnection.zipAndCreateEntity(stream); // Verify zipped data by comparing unzipped data with test data. - final String rawContent = TestUtils.zipStreamToString(inputStreamEntity.getContent()); - assert(testString.equals(rawContent)); + String rawContent = TestUtils.zipStreamToString(inputStreamEntity.getContent()); + assertEquals(testString, rawContent); } /** @@ -168,32 +163,28 @@ public class ApacheGatewayConnectionTest { */ @Test public void testCompressedWriteOperations() throws Exception { - final Endpoint endpoint = Endpoint.create("localhost", 666, false); - final FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.XML_UTF8).build(); - final String clusterSpecificRoute = ""; - final ConnectionParams connectionParams = new ConnectionParams.Builder() - .setUseCompression(true) - .build(); - final List<Document> documents = new ArrayList<>(); + Endpoint endpoint = Endpoint.create("localhost", 666, false); + FeedParams feedParams = new FeedParams.Builder().setDataFormat(FeedParams.DataFormat.XML_UTF8).build(); + String clusterSpecificRoute = ""; + ConnectionParams connectionParams = new ConnectionParams.Builder().setUseCompression(true).build(); + List<Document> documents = new ArrayList<>(); - final String vespaDocContent ="Hello, I am the document data."; - final String docId = "42"; + String vespaDocContent ="Hello, I am the document data."; + String docId = "42"; - final Document doc = createDoc(docId, vespaDocContent, false); + Document doc = createDoc(docId, vespaDocContent, false); // When sending data on http client, check if it is compressed. If compressed, unzip, check result, // and count down latch. ApacheGatewayConnection.HttpClientFactory mockFactory = mockHttpClientFactory(post -> { - final Header header = post.getFirstHeader("Content-Encoding"); + Header header = post.getFirstHeader("Content-Encoding"); if (header != null && header.getValue().equals("gzip")) { final String rawContent = TestUtils.zipStreamToString(post.getEntity().getContent()); final String vespaHeaderText = "<vespafeed>\n"; final String vespaFooterText = "</vespafeed>\n"; - assertThat(rawContent, is( - doc.getOperationId() + " 38\n" + vespaHeaderText + vespaDocContent + "\n" - + vespaFooterText)); - + assertEquals(doc.getOperationId() + " 38\n" + vespaHeaderText + vespaDocContent + "\n" + vespaFooterText, + rawContent); } return httpResponse("clientId", "3"); }); @@ -301,16 +292,12 @@ public class ApacheGatewayConnectionTest { HttpResponse execute(HttpPost httpPost) throws IOException; } - private Document createDoc(final String docId, final String content, boolean useJson) throws IOException { - return new Document(docId, content.getBytes(), null /* context */); + private Document createDoc(String docId, String content, boolean useJson) { + return new Document(docId, content.getBytes(), null, Clock.systemUTC().instant()); } - private void addMockedHeader( - final HttpResponse httpResponseMock, - final String name, - final String value, - HeaderElement[] elements) { - final Header header = new Header() { + private void addMockedHeader(HttpResponse httpResponseMock, String name, String value, HeaderElement[] elements) { + Header header = new Header() { @Override public String getName() { return name; @@ -328,7 +315,7 @@ public class ApacheGatewayConnectionTest { } private HttpResponse httpResponse(String sessionIdInResult, String version) throws IOException { - final HttpResponse httpResponseMock = mock(HttpResponse.class); + HttpResponse httpResponseMock = mock(HttpResponse.class); StatusLine statusLineMock = mock(StatusLine.class); when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java index 35a06258f86..af354b8feea 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/CloseableQTestCase.java @@ -4,21 +4,24 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.core.Document; import org.junit.Test; +import java.time.Clock; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class CloseableQTestCase { + @Test public void requestThatPutIsInterruptedOnClose() throws InterruptedException { - final DocumentQueue q = new DocumentQueue(1); - q.put(new Document("id", null, "data", null), false); + Clock clock = Clock.systemUTC(); + DocumentQueue q = new DocumentQueue(1, clock); + q.put(new Document("id", null, "data", null, clock.instant()), false); Thread t = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { - } q.close(); q.clear(); @@ -26,7 +29,7 @@ public class CloseableQTestCase { }); t.start(); try { - q.put(new Document("id2", null, "data2", null), false); + q.put(new Document("id2", null, "data2", null, Clock.systemUTC().instant()), false); fail("This shouldn't have worked."); } catch (IllegalStateException ise) { // ok! @@ -39,10 +42,11 @@ public class CloseableQTestCase { @Test public void requireThatSelfIsUnbounded() throws InterruptedException { - DocumentQueue q = new DocumentQueue(1); - q.put(new Document("1", null, "data", null), true); - q.put(new Document("2", null, "data", null), true); - q.put(new Document("3", null, "data", null), true); + DocumentQueue q = new DocumentQueue(1, Clock.systemUTC()); + q.put(new Document("1", null, "data", null, Clock.systemUTC().instant()), true); + q.put(new Document("2", null, "data", null, Clock.systemUTC().instant()), true); + q.put(new Document("3", null, "data", null, Clock.systemUTC().instant()), true); assertEquals(3, q.size()); } + } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 6836a0a1d2c..8eb9513065e 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -47,11 +47,15 @@ public class IOThreadTest { CountDownLatch latch = new CountDownLatch(1); String docId1 = V3HttpAPITest.documents.get(0).getDocumentId(); Document doc1 = new Document(V3HttpAPITest.documents.get(0).getDocumentId(), - V3HttpAPITest.documents.get(0).getContents(), null /* context */); + V3HttpAPITest.documents.get(0).getContents(), + null, + Clock.systemUTC().instant()); String docId2 = V3HttpAPITest.documents.get(1).getDocumentId(); Document doc2 = new Document(V3HttpAPITest.documents.get(1).getDocumentId(), - V3HttpAPITest.documents.get(1).getContents(), null /* context */); - DocumentQueue documentQueue = new DocumentQueue(4); + V3HttpAPITest.documents.get(1).getContents(), + null, + Clock.systemUTC().instant()); + DocumentQueue documentQueue = new DocumentQueue(4, Clock.systemUTC()); public IOThreadTest() { when(apacheGatewayConnection.getEndpoint()).thenReturn(ENDPOINT); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java index baf6e2f2df3..ec929d68efb 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottlerTest.java @@ -1,9 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.http.client.core.operationProcessor; +import com.yahoo.vespa.http.client.ManualClock; import com.yahoo.vespa.http.client.core.ThrottlePolicy; import org.junit.Test; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; @@ -12,6 +15,7 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyDouble; @@ -42,14 +46,14 @@ public class IncompleteResultsThrottlerTest { * @return median queue length. */ int getAverageQueue(int clientCount, int breakPoint, int simulationTimeMs) { - final AtomicLong timeMs = new AtomicLong(0); + ManualClock clock = new ManualClock(Instant.ofEpochMilli(0)); ArrayList<IncompleteResultsThrottler> incompleteResultsThrottlers = new ArrayList<>(); MockServer mockServer = new MockServer(breakPoint); for (int x = 0; x < clientCount; x++) { IncompleteResultsThrottler incompleteResultsThrottler = - new IncompleteResultsThrottler(10, 50000, () -> timeMs.get(), new ThrottlePolicy()); + new IncompleteResultsThrottler(10, 50000, clock, new ThrottlePolicy()); incompleteResultsThrottlers.add(incompleteResultsThrottler); } long sum = 0; @@ -68,8 +72,8 @@ public class IncompleteResultsThrottlerTest { if (fastForward) { time = mockServer.nextRequestFinished(); } - timeMs.set(time); - mockServer.moveTime(timeMs.get()); + clock.setInstant(Instant.ofEpochMilli(time)); + mockServer.moveTime(clock.instant().toEpochMilli()); for (int y = 0; y < clientCount; y++) { // Fill up, but don't block as that would stop the simulation. while (incompleteResultsThrottlers.get(y).availableCapacity() > 0) { @@ -140,45 +144,46 @@ public class IncompleteResultsThrottlerTest { } } - private void moveToNextCycle(final IncompleteResultsThrottler throttler, AtomicLong timeMs) + private void moveToNextCycle(final IncompleteResultsThrottler throttler, ManualClock clock) throws InterruptedException { waitForThreads(); // Enter an adaption phase, we don't care about this phase. - timeMs.addAndGet(throttler.phaseSizeMs); + clock.advance(Duration.ofMillis(throttler.phaseSizeMs)); throttler.operationStart(); throttler.resultReady(false); // Now enter the real next phase. - timeMs.addAndGet(throttler.phaseSizeMs); + clock.advance(Duration.ofMillis(throttler.phaseSizeMs)); throttler.operationStart(); throttler.resultReady(false); } @Test public void testInteractionWithPolicyByMockingPolicy() throws InterruptedException { + ManualClock clock = new ManualClock(Instant.ofEpochMilli(0)); final int MAX_SIZE = 1000; final int MORE_THAN_MAX_SIZE = MAX_SIZE + 20; final int SIZE_AFTER_CYCLE_FIRST = 30; final int SIZE_AFTER_CYCLE_SECOND = 5000; ThrottlePolicy policy = mock(ThrottlePolicy.class); - final AtomicLong timeMs = new AtomicLong(0); IncompleteResultsThrottler incompleteResultsThrottler = - new IncompleteResultsThrottler(2, MAX_SIZE, ()->timeMs.get(), policy); + new IncompleteResultsThrottler(2, MAX_SIZE, clock, policy); long bucketSizeMs = incompleteResultsThrottler.phaseSizeMs; // Cycle 1 - Algorithm has fixed value for max-in-flight: INITIAL_MAX_IN_FLIGHT_VALUE. // We post a few operations, not all finishing in this cycle. We explicitly do not fill the window // size to test the argument about any requests blocked. - assertThat(incompleteResultsThrottler.availableCapacity(), - is(IncompleteResultsThrottler.INITIAL_MAX_IN_FLIGHT_VALUE)); + assertEquals(IncompleteResultsThrottler.INITIAL_MAX_IN_FLIGHT_VALUE, + incompleteResultsThrottler.availableCapacity()); postOperations(20, incompleteResultsThrottler); postSuccesses(15, incompleteResultsThrottler); - moveToNextCycle(incompleteResultsThrottler, timeMs); + moveToNextCycle(incompleteResultsThrottler, clock); // Cycle 2 - Algorithm has fixed value also for second iteration: SECOND_MAX_IN_FLIGHT_VALUE. // Test verifies that this value is used, and insert a value to be used for next phase SIZE_AFTER_CYCLE_FIRST. - assertThat(incompleteResultsThrottler.availableCapacity(), - is(IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE - 5)); // 5 slots already taken earlier + assertEquals("5 slots already taken earlier", + IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE - 5, + incompleteResultsThrottler.availableCapacity()); postSuccesses(5, incompleteResultsThrottler); when(policy.calcNewMaxInFlight( anyDouble(), // Max performance change @@ -188,12 +193,11 @@ public class IncompleteResultsThrottlerTest { eq(IncompleteResultsThrottler.SECOND_MAX_IN_FLIGHT_VALUE), // current size eq(false))) // is any request blocked, should be false since we only posted 20 docs. .thenReturn(SIZE_AFTER_CYCLE_FIRST); - moveToNextCycle(incompleteResultsThrottler, timeMs); + moveToNextCycle(incompleteResultsThrottler, clock); // Cycle 3 - Test that value set in previous phase is used. Now return a very large number. // However, this number should be cropped by the system (tested in next cycle). - assertThat(incompleteResultsThrottler.availableCapacity(), - is(SIZE_AFTER_CYCLE_FIRST)); + assertEquals(SIZE_AFTER_CYCLE_FIRST, incompleteResultsThrottler.availableCapacity()); postOperations(MORE_THAN_MAX_SIZE, incompleteResultsThrottler); postSuccesses(MORE_THAN_MAX_SIZE, incompleteResultsThrottler); when(policy.calcNewMaxInFlight( @@ -204,11 +208,10 @@ public class IncompleteResultsThrottlerTest { eq(SIZE_AFTER_CYCLE_FIRST),// current size eq(true))) // is any request blocked, should be true since we posted MORE_THAN_MAX_SIZE docs. .thenReturn(SIZE_AFTER_CYCLE_SECOND); - moveToNextCycle(incompleteResultsThrottler, timeMs); + moveToNextCycle(incompleteResultsThrottler, clock); // Cycle 4 - Test that the large number from previous cycle is cropped and that max value is used instead. - assertThat(incompleteResultsThrottler.availableCapacity(), - is(MAX_SIZE)); + assertEquals(MAX_SIZE, incompleteResultsThrottler.availableCapacity()); } private long inversesU(int size, int sweetSpot) { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java index 2f801012d6d..e4ae138054d 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java @@ -33,10 +33,10 @@ import static org.mockito.Mockito.when; public class OperationProcessorTest { final Queue<Result> queue = new ArrayDeque<>(); - final Document doc1 = new Document("id:a:type::b", null, "data doc 1", null); - final Document doc1b = new Document("id:a:type::b", null, "data doc 1b", null); - final Document doc2 = new Document("id:a:type::b2", null, "data doc 2", null); - final Document doc3 = new Document("id:a:type::b3", null, "data doc 3", null); + final Document doc1 = new Document("id:a:type::b", null, "data doc 1", null, Clock.systemUTC().instant()); + final Document doc1b = new Document("id:a:type::b", null, "data doc 1b", null, Clock.systemUTC().instant()); + final Document doc2 = new Document("id:a:type::b2", null, "data doc 2", null, Clock.systemUTC().instant()); + final Document doc3 = new Document("id:a:type::b3", null, "data doc 3", null, Clock.systemUTC().instant()); @Test public void testBasic() { @@ -203,7 +203,7 @@ public class OperationProcessorTest { Queue<Document> documentQueue = new ArrayDeque<>(); for (int x = 0; x < 100; x++) { - Document document = new Document("id:a:type::b", null, String.valueOf(x), null); + Document document = new Document("id:a:type::b", null, String.valueOf(x), null, Clock.systemUTC().instant()); operationProcessor.sendDocument(document); documentQueue.add(document); } |