summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2022-09-14 10:22:47 +0200
committerGitHub <noreply@github.com>2022-09-14 10:22:47 +0200
commit155223379d6d2ebd9038a9b4ecc4fcb50b91eaa3 (patch)
treef44ea251a414792760dd4331915c178a76ba5370
parentbffe3963a54339eb235a80db3267b89b51efae7d (diff)
parent782962c90a4e52eda6666de98881871bb8ef7879 (diff)
Merge pull request #24040 from vespa-engine/balder/factor-out-timeout-and-coverage-handling
Factor out timeout and coverage handling to make the InterleavedSearc…
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java69
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java97
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java144
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java3
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java26
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java11
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java14
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java48
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java17
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java19
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java7
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java27
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java28
-rw-r--r--vespaclient-java/src/test/java/com/yahoo/vespafeeder/BenchmarkProgressPrinterTest.java17
-rw-r--r--vespaclient-java/src/test/java/com/yahoo/vespafeeder/ProgressPrinterTest.java17
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/ManualTimer.java18
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/Timer.java12
17 files changed, 332 insertions, 242 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java
new file mode 100644
index 00000000000..e725a152f09
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java
@@ -0,0 +1,69 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.concurrent.Timer;
+import com.yahoo.search.Query;
+import com.yahoo.vespa.config.search.DispatchConfig;
+
+import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT;
+
+/**
+ * Computes next timeout based on how many responses has been received so far
+ *
+ * @author baldersheim
+ */
+class AdaptiveTimeoutHandler implements TimeoutHandler {
+ private final double minimumCoverage;
+ private final int askedNodes;
+ private final int minimumResponses;
+ private final Query query;
+ private final Timer timer;
+ private final DispatchConfig config;
+
+ private long deadline;
+ private long adaptiveTimeoutMin;
+ private long adaptiveTimeoutMax;
+ boolean adaptiveTimeoutCalculated = false;
+
+ AdaptiveTimeoutHandler(Timer timer, DispatchConfig config, int askedNodes, Query query) {
+ minimumCoverage = config.minSearchCoverage();
+ this.config = config;
+ this.askedNodes = askedNodes;
+ this.query = query;
+ this.timer = timer;
+ minimumResponses = (int) Math.ceil(askedNodes * minimumCoverage / 100.0);
+ deadline = timer.milliTime() + query.getTimeLeft();
+ }
+
+ @Override
+ public long nextTimeout(int answeredNodes) {
+ if (askedNodes == answeredNodes) return query.getTimeLeft(); // All nodes have responded - done
+ if (answeredNodes < minimumResponses) return query.getTimeLeft(); // Minimum responses have not been received yet
+
+ if (!adaptiveTimeoutCalculated) {
+ // Recompute timeout when minimum responses have been received
+ long timeLeftMs = query.getTimeLeft();
+ adaptiveTimeoutMin = (long) (timeLeftMs * config.minWaitAfterCoverageFactor());
+ adaptiveTimeoutMax = (long) (timeLeftMs * config.maxWaitAfterCoverageFactor());
+ adaptiveTimeoutCalculated = true;
+ }
+ long now = timer.milliTime();
+ int pendingQueries = askedNodes - answeredNodes;
+ double missWidth = ((100.0 - minimumCoverage) * askedNodes) / 100.0 - 1.0;
+ double slopedWait = adaptiveTimeoutMin;
+ if (pendingQueries > 1 && missWidth > 0.0) {
+ slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth;
+ }
+ long nextAdaptive = (long) slopedWait;
+ if (now + nextAdaptive >= deadline) {
+ return deadline - now;
+ }
+ deadline = now + nextAdaptive;
+
+ return nextAdaptive;
+ }
+
+ @Override
+ public int reason() {
+ return DEGRADED_BY_ADAPTIVE_TIMEOUT;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java b/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java
new file mode 100644
index 00000000000..6d4b321d861
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java
@@ -0,0 +1,97 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.result.Coverage;
+
+import static com.yahoo.container.handler.Coverage.*;
+
+/**
+ * Aggregates coverage as responses are added.
+ *
+ * @author baldersheim
+ */
+public class CoverageAggregator {
+ private final int askedNodes;
+ private int answeredNodes = 0;
+ private int answeredNodesParticipated = 0;
+ private int failedNodes = 0;
+ private long answeredDocs = 0;
+ private long answeredActiveDocs = 0;
+ private long answeredTargetActiveDocs = 0;
+ private boolean timedOut = false;
+ private boolean degradedByMatchPhase = false;
+ CoverageAggregator(int askedNodes) {
+ this.askedNodes = askedNodes;
+ }
+ CoverageAggregator(CoverageAggregator rhs) {
+ askedNodes = rhs.askedNodes;
+ answeredNodes = rhs.answeredNodes;
+ answeredNodesParticipated = rhs.answeredNodesParticipated;
+ failedNodes = rhs.failedNodes;
+ answeredDocs = rhs.answeredDocs;
+ answeredActiveDocs = rhs.answeredActiveDocs;
+ answeredTargetActiveDocs = rhs.answeredTargetActiveDocs;
+ timedOut = rhs.timedOut;
+ degradedByMatchPhase = rhs.degradedByMatchPhase;
+ }
+ void add(Coverage source) {
+ answeredDocs += source.getDocs();
+ answeredActiveDocs += source.getActive();
+ answeredTargetActiveDocs += source.getTargetActive();
+ answeredNodesParticipated += source.getNodes();
+ answeredNodes++;
+ degradedByMatchPhase |= source.isDegradedByMatchPhase();
+ timedOut |= source.isDegradedByTimeout();
+ }
+ public int getAskedNodes() {
+ return askedNodes;
+ }
+ public int getAnswerdNodes() {
+ return answeredNodes;
+ }
+ public boolean hasNoAnswers() { return answeredNodes == 0; }
+ public void setTimedOut() { timedOut = true; }
+ public void setFailedNodes(int failedNodes) {
+ this.failedNodes += failedNodes;
+ }
+
+ public Coverage createCoverage(TimeoutHandler timeoutHandler) {
+ Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1);
+ coverage.setNodesTried(askedNodes);
+ coverage.setTargetActive(answeredTargetActiveDocs);
+ int degradedReason = 0;
+ if (timedOut) {
+ degradedReason |= timeoutHandler.reason();
+ }
+ if (degradedByMatchPhase) {
+ degradedReason |= DEGRADED_BY_MATCH_PHASE;
+ }
+ coverage.setDegradedReason(degradedReason);
+ return coverage;
+ }
+ public CoverageAggregator adjustDegradedCoverage(int searchableCopies, TimeoutHandler timeoutHandler) {
+ int askedAndFailed = askedNodes + failedNodes;
+ if (askedAndFailed == answeredNodesParticipated) {
+ return this;
+ }
+ int notAnswered = askedAndFailed - answeredNodesParticipated;
+
+ if ((timeoutHandler.reason() == DEGRADED_BY_ADAPTIVE_TIMEOUT) && answeredNodesParticipated > 0) {
+ CoverageAggregator clone = new CoverageAggregator(this);
+ clone.answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated);
+ clone.answeredTargetActiveDocs += (notAnswered * answeredTargetActiveDocs / answeredNodesParticipated);
+ return clone;
+ } else {
+ if (askedAndFailed > answeredNodesParticipated) {
+ int missingNodes = notAnswered - (searchableCopies - 1);
+ if (answeredNodesParticipated > 0) {
+ CoverageAggregator clone = new CoverageAggregator(this);
+ clone.answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated);
+ clone.answeredTargetActiveDocs += (missingNodes * answeredTargetActiveDocs / answeredNodesParticipated);
+ clone.timedOut = true;
+ return clone;
+ }
+ }
+ }
+ return this;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index 8c92e8b0270..5dace0bbff8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -1,12 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.concurrent.Timer;
import com.yahoo.prelude.fastsearch.GroupingListHit;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
-import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
@@ -25,10 +25,6 @@ import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
-import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT;
-import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE;
-import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
-
/**
* InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content
* nodes in parallel. Operationally it first sends requests to all contained invokers and then
@@ -40,38 +36,35 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName());
+ private final Timer timer;
private final Set<SearchInvoker> invokers;
private final SearchCluster searchCluster;
private final Group group;
private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
private final Set<Integer> alreadyFailedNodes;
+ private final CoverageAggregator coverageAggregator;
private Query query;
- private boolean adaptiveTimeoutCalculated = false;
- private long adaptiveTimeoutMin = 0;
- private long adaptiveTimeoutMax = 0;
- private long deadline = 0;
-
- private long answeredDocs = 0;
- private long answeredActiveDocs = 0;
- private long answeredTargetActiveDocs = 0;
- private int askedNodes = 0;
- private int answeredNodes = 0;
- private int answeredNodesParticipated = 0;
- private boolean timedOut = false;
- private boolean degradedByMatchPhase = false;
-
- public InterleavedSearchInvoker(Collection<SearchInvoker> invokers,
+ TimeoutHandler timeoutHandler;
+ public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers,
SearchCluster searchCluster,
Group group,
Set<Integer> alreadyFailedNodes) {
super(Optional.empty());
+ this.timer = timer;
this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
this.invokers.addAll(invokers);
this.searchCluster = searchCluster;
this.group = group;
this.availableForProcessing = newQueue();
this.alreadyFailedNodes = alreadyFailedNodes;
+ coverageAggregator = new CoverageAggregator(invokers.size());
+ }
+
+ TimeoutHandler create(DispatchConfig config, int askedNodes, Query query) {
+ return (config.minSearchCoverage() < 100.0D)
+ ? new AdaptiveTimeoutHandler(timer, config, askedNodes, query)
+ : new SimpleTimeoutHandler(query);
}
/**
@@ -83,7 +76,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
protected Object sendSearchRequest(Query query, Object unusedContext) throws IOException {
this.query = query;
invokers.forEach(invoker -> invoker.setMonitor(this));
- deadline = currentTime() + query.getTimeLeft();
int originalHits = query.getHits();
int originalOffset = query.getOffset();
@@ -101,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
Object context = null;
for (SearchInvoker invoker : invokers) {
context = invoker.sendSearchRequest(query, context);
- askedNodes++;
}
+ timeoutHandler = create(searchCluster.dispatchConfig(), invokers.size(), query);
query.setHits(originalHits);
query.setOffset(originalOffset);
@@ -119,14 +111,15 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
while (!invokers.isEmpty() && nextTimeout >= 0) {
SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS);
if (invoker == null) {
- log.fine(() -> "Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received");
+ log.fine(() -> "Search timed out with " + coverageAggregator.getAskedNodes() + " requests made, " +
+ coverageAggregator.getAnswerdNodes() + " responses received");
break;
} else {
InvokerResult toMerge = invoker.getSearchResult(execution);
merged = mergeResult(result.getResult(), toMerge, merged, groupingResultAggregator);
ejectInvoker(invoker);
}
- nextTimeout = nextTimeout();
+ nextTimeout = timeoutHandler.nextTimeout(coverageAggregator.getAnswerdNodes());
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for search results", e);
@@ -134,7 +127,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h));
insertNetworkErrors(result.getResult());
- result.getResult().setCoverage(createCoverage());
+ CoverageAggregator adjusted = coverageAggregator.adjustDegradedCoverage((int)searchCluster.dispatchConfig().searchableCopies(), timeoutHandler);
+ result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler));
int needed = query.getOffset() + query.getHits();
for (int index = query.getOffset(); (index < merged.size()) && (index < needed); index++) {
@@ -146,7 +140,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private void insertNetworkErrors(Result result) {
// Network errors will be reported as errors only when all nodes fail, otherwise they are just traced
- boolean asErrors = answeredNodes == 0;
+ boolean asErrors = coverageAggregator.hasNoAnswers();
if (!invokers.isEmpty()) {
String keys = invokers.stream().map(SearchInvoker::distributionKey).map(dk -> dk.map(i -> i.toString()).orElse("(unspecified)"))
@@ -158,7 +152,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
} else {
query.trace("Backend communication timeout on nodes with distribution-keys: " + keys, 2);
}
- timedOut = true;
+ coverageAggregator.setTimedOut();
}
if (alreadyFailedNodes != null) {
var message = "Connection failure on nodes with distribution-keys: "
@@ -168,51 +162,13 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
} else {
query.trace(message, 2);
}
- int failed = alreadyFailedNodes.size();
- askedNodes += failed;
- answeredNodes += failed;
- }
- }
-
- private long nextTimeout() {
- DispatchConfig config = searchCluster.dispatchConfig();
- double minimumCoverage = config.minSearchCoverage();
-
- if (askedNodes == answeredNodes || minimumCoverage >= 100.0) {
- return query.getTimeLeft();
+ coverageAggregator.setFailedNodes(alreadyFailedNodes.size());
}
- int minimumResponses = (int) Math.ceil(askedNodes * minimumCoverage / 100.0);
-
- if (answeredNodes < minimumResponses) {
- return query.getTimeLeft();
- }
-
- long timeLeft = query.getTimeLeft();
- if (!adaptiveTimeoutCalculated) {
- adaptiveTimeoutMin = (long) (timeLeft * config.minWaitAfterCoverageFactor());
- adaptiveTimeoutMax = (long) (timeLeft * config.maxWaitAfterCoverageFactor());
- adaptiveTimeoutCalculated = true;
- }
-
- long now = currentTime();
- int pendingQueries = askedNodes - answeredNodes;
- double missWidth = ((100.0 - config.minSearchCoverage()) * askedNodes) / 100.0 - 1.0;
- double slopedWait = adaptiveTimeoutMin;
- if (pendingQueries > 1 && missWidth > 0.0) {
- slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth;
- }
- long nextAdaptive = (long) slopedWait;
- if (now + nextAdaptive >= deadline) {
- return deadline - now;
- }
- deadline = now + nextAdaptive;
-
- return nextAdaptive;
}
private List<LeanHit> mergeResult(Result result, InvokerResult partialResult, List<LeanHit> current,
GroupingResultAggregator groupingResultAggregator) {
- collectCoverage(partialResult.getResult().getCoverage(true));
+ coverageAggregator.add(partialResult.getResult().getCoverage(true));
result.mergeWith(partialResult.getResult());
List<Hit> partialNonLean = partialResult.getResult().hits().asUnorderedHits();
@@ -265,55 +221,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
return merged;
}
- private void collectCoverage(Coverage source) {
- answeredDocs += source.getDocs();
- answeredActiveDocs += source.getActive();
- answeredTargetActiveDocs += source.getTargetActive();
- answeredNodesParticipated += source.getNodes();
- answeredNodes++;
- degradedByMatchPhase |= source.isDegradedByMatchPhase();
- timedOut |= source.isDegradedByTimeout();
- }
-
- private Coverage createCoverage() {
- adjustDegradedCoverage();
-
- Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1);
- coverage.setNodesTried(askedNodes);
- coverage.setTargetActive(answeredTargetActiveDocs);
- int degradedReason = 0;
- if (timedOut) {
- degradedReason |= (adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT);
- }
- if (degradedByMatchPhase) {
- degradedReason |= DEGRADED_BY_MATCH_PHASE;
- }
- coverage.setDegradedReason(degradedReason);
- return coverage;
- }
-
- private void adjustDegradedCoverage() {
- if (askedNodes == answeredNodesParticipated) {
- return;
- }
- int notAnswered = askedNodes - answeredNodesParticipated;
-
- if (adaptiveTimeoutCalculated && answeredNodesParticipated > 0) {
- answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated);
- answeredTargetActiveDocs += (notAnswered * answeredTargetActiveDocs / answeredNodesParticipated);
- } else {
- if (askedNodes > answeredNodesParticipated) {
- int searchableCopies = (int) searchCluster.dispatchConfig().searchableCopies();
- int missingNodes = notAnswered - (searchableCopies - 1);
- if (answeredNodesParticipated > 0) {
- answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated);
- answeredTargetActiveDocs += (missingNodes * answeredTargetActiveDocs / answeredNodesParticipated);
- timedOut = true;
- }
- }
- }
- }
-
private void ejectInvoker(SearchInvoker invoker) {
invokers.remove(invoker);
invoker.release();
@@ -340,11 +247,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
}
// For overriding in tests
- protected long currentTime() {
- return System.currentTimeMillis();
- }
-
- // For overriding in tests
protected LinkedBlockingQueue<SearchInvoker> newQueue() {
return new LinkedBlockingQueue<>();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index eb379a51ed4..02cf11c9fe7 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.concurrent.Timer;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
@@ -89,7 +90,7 @@ public abstract class InvokerFactory {
if (invokers.size() == 1 && failed == null) {
return Optional.of(invokers.get(0));
} else {
- return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster, group, failed));
+ return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, searchCluster, group, failed));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java
new file mode 100644
index 00000000000..8732cee7652
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java
@@ -0,0 +1,26 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.Query;
+
+import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
+
+/**
+ * Computes the timeout based solely on the query timeout
+ *
+ * @author baldersheim
+ */
+public class SimpleTimeoutHandler implements TimeoutHandler {
+ private final Query query;
+ SimpleTimeoutHandler(Query query) {
+ this.query = query;
+ }
+ @Override
+ public long nextTimeout(int answeredNodes) {
+ return query.getTimeLeft();
+ }
+
+ @Override
+ public int reason() {
+ return DEGRADED_BY_TIMEOUT;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java
new file mode 100644
index 00000000000..7c86b02d368
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java
@@ -0,0 +1,11 @@
+package com.yahoo.search.dispatch;
+
+/**
+ * Computes next timeout
+ *
+ * @author baldersheim
+ */
+public interface TimeoutHandler {
+ long nextTimeout(int answeredNodes);
+ int reason();
+}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
index a88046197e0..7fe2d2c2d9d 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.concurrent.Timer;
import com.yahoo.document.GlobalId;
import com.yahoo.document.idstring.IdString;
import com.yahoo.prelude.fastsearch.FastHit;
@@ -23,6 +24,7 @@ import com.yahoo.test.ManualClock;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -33,6 +35,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.ToLongFunction;
import java.util.stream.StreamSupport;
import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE;
@@ -347,7 +350,7 @@ public class InterleavedSearchInvokerTest {
.addAggregationResult(new MinAggregationResult().setMin(new IntegerResultNode(6)).setTag(3))));
invokers.add(new MockInvoker(0).setHits(List.of(new GroupingListHit(List.of(grouping2)))));
- InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(invokers, cluster, new Group(0, List.of()), Collections.emptySet());
+ InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, cluster, new Group(0, List.of()), Collections.emptySet());
invoker.responseAvailable(invokers.get(0));
invoker.responseAvailable(invokers.get(1));
Result result = invoker.search(query, null);
@@ -360,7 +363,7 @@ public class InterleavedSearchInvokerTest {
List<SearchInvoker> invokers = new ArrayList<>();
invokers.add(createInvoker(a, 0));
invokers.add(createInvoker(b, 1));
- InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(invokers, cluster, group, Collections.emptySet());
+ InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, cluster, group, Collections.emptySet());
invoker.responseAvailable(invokers.get(0));
invoker.responseAvailable(invokers.get(1));
return invoker;
@@ -411,12 +414,7 @@ public class InterleavedSearchInvokerTest {
invokers.add(new MockInvoker(i));
}
- return new InterleavedSearchInvoker(invokers, searchCluster, group,null) {
-
- @Override
- protected long currentTime() {
- return clock.millis();
- }
+ return new InterleavedSearchInvoker(Timer.wrap(clock), invokers, searchCluster, group,null) {
@Override
protected LinkedBlockingQueue<SearchInvoker> newQueue() {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java
index 4f937aa26ba..ad73aa92d7c 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/ScheduledEventQueueTestCase.java
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus;
-import com.yahoo.concurrent.Timer;
+import com.yahoo.concurrent.ManualTimer;
import org.junit.Test;
import java.util.concurrent.RejectedExecutionException;
@@ -13,7 +13,7 @@ import static org.junit.Assert.fail;
public class ScheduledEventQueueTestCase {
- class TestTask implements Runnable {
+ static class TestTask implements Runnable {
public long timestamp = 0;
public void run() {
@@ -46,42 +46,34 @@ public class ScheduledEventQueueTestCase {
assertNull(queue.popTask());
}
- class TestTimer implements Timer {
- public long milliTime = 0;
-
- public long milliTime() {
- return milliTime;
- }
- }
-
@Test
public void testPushTaskWithTime() {
- TestTimer timer = new TestTimer();
+ ManualTimer timer = new ManualTimer();
ScheduledEventQueue queue = new ScheduledEventQueue(timer);
TestTask task = new TestTask();
queue.pushTask(task, 1000);
assertNull(queue.popTask());
- timer.milliTime = 1000;
+ timer.set(1000);
assertEquals(task, queue.popTask());
}
@Test
public void testTwoTasksWithSameTime() {
- TestTimer timer = new TestTimer();
+ ManualTimer timer = new ManualTimer();
ScheduledEventQueue queue = new ScheduledEventQueue(timer);
TestTask task1 = new TestTask();
queue.pushTask(task1, 1000);
TestTask task2 = new TestTask();
queue.pushTask(task2, 1000);
assertNull(queue.popTask());
- timer.milliTime = 1000;
+ timer.set(1000);
assertEquals(task1, queue.popTask());
assertEquals(task2, queue.popTask());
}
@Test
public void testThreeTasksWithDifferentTime() {
- TestTimer timer = new TestTimer();
+ ManualTimer timer = new ManualTimer();
ScheduledEventQueue queue = new ScheduledEventQueue(timer);
TestTask task1 = new TestTask();
queue.pushTask(task1, 1000);
@@ -91,17 +83,17 @@ public class ScheduledEventQueueTestCase {
queue.pushTask(task3);
assertEquals(task3, queue.popTask());
assertNull(queue.popTask());
- timer.milliTime = 1000;
+ timer.set(1000);
assertEquals(task2, queue.popTask());
assertEquals(task1, queue.popTask());
}
- class ClockSetterThread implements Runnable {
+ static class ClockSetterThread implements Runnable {
ScheduledEventQueue queue;
- TestTimer timer;
+ ManualTimer timer;
long newTime;
- public ClockSetterThread(ScheduledEventQueue queue, TestTimer timer, long newTime) {
+ public ClockSetterThread(ScheduledEventQueue queue, ManualTimer timer, long newTime) {
this.queue = queue;
this.timer = timer;
this.newTime = newTime;
@@ -114,14 +106,14 @@ public class ScheduledEventQueueTestCase {
}
} catch (InterruptedException e) {
}
- timer.milliTime = newTime;
+ timer.set(newTime);
queue.wakeTasks();
}
}
@Test
public void testPushAndWaitForTask() {
- TestTimer timer = new TestTimer();
+ ManualTimer timer = new ManualTimer();
ScheduledEventQueue queue = new ScheduledEventQueue(timer);
TestTask task = new TestTask();
queue.pushTask(task, 50);
@@ -131,7 +123,7 @@ public class ScheduledEventQueueTestCase {
assertEquals(50, timer.milliTime());
}
- class TaskPusherThread implements Runnable {
+ static class TaskPusherThread implements Runnable {
ScheduledEventQueue queue;
TestTask task;
@@ -162,7 +154,7 @@ public class ScheduledEventQueueTestCase {
@Test
public void testPushAndWaitMultiple() {
- TestTimer timer = new TestTimer();
+ ManualTimer timer = new ManualTimer();
ScheduledEventQueue queue = new ScheduledEventQueue(timer);
TestTask lastTask = new TestTask();
queue.pushTask(lastTask, 250);
@@ -188,11 +180,11 @@ public class ScheduledEventQueueTestCase {
}
}
- class ShutdownThread implements Runnable {
+ static class ShutdownThread implements Runnable {
ScheduledEventQueue queue;
- TestTimer timer;
+ ManualTimer timer;
- public ShutdownThread(ScheduledEventQueue queue, TestTimer timer) {
+ public ShutdownThread(ScheduledEventQueue queue, ManualTimer timer) {
this.queue = queue;
this.timer = timer;
}
@@ -205,14 +197,14 @@ public class ScheduledEventQueueTestCase {
} catch (InterruptedException e) {
}
queue.shutdown();
- timer.milliTime = 100;
+ timer.set(100);
queue.wakeTasks();
}
}
@Test
public void testShutdownInGetNext() {
- TestTimer timer = new TestTimer();
+ ManualTimer timer = new ManualTimer();
ScheduledEventQueue queue = new ScheduledEventQueue(timer);
TestTask task = new TestTask();
queue.pushTask(task, 100);
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java b/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java
deleted file mode 100644
index 539897d6485..00000000000
--- a/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java
+++ /dev/null
@@ -1,17 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-import com.yahoo.concurrent.Timer;
-
-/**
- * @author <a href="mailto:thomasg@yahoo-inc.com">Thomas Gundersen</a>
- */
-class CustomTimer implements Timer {
-
- long millis = 0;
-
- @Override
- public long milliTime() {
- return millis;
- }
-}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
index 8f9ed2323d7..8414f6588ea 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
+import com.yahoo.concurrent.ManualTimer;
import com.yahoo.messagebus.test.SimpleMessage;
import com.yahoo.messagebus.test.SimpleReply;
import org.junit.jupiter.api.Test;
@@ -46,7 +47,7 @@ public class DynamicThrottlePolicyTest {
{ // This setup is lucky with the artificial local maxima for latency, and gives good results. See below for counter-examples.
int workPerSuccess = 8;
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
.setWindowSizeIncrement(0.1)
.setResizeRate(100);
@@ -64,7 +65,7 @@ public class DynamicThrottlePolicyTest {
{ // This setup is not so lucky, and the artificial behaviour pushes it into overload.
int workPerSuccess = 5;
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
.setWindowSizeIncrement(0.1)
.setResizeRate(100);
@@ -80,7 +81,7 @@ public class DynamicThrottlePolicyTest {
{ // This setup is not so lucky either, and the artificial behaviour keeps it far below a good throughput.
int workPerSuccess = 4;
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
.setWindowSizeIncrement(0.1)
.setResizeRate(100);
@@ -98,7 +99,7 @@ public class DynamicThrottlePolicyTest {
@Test
void singlePolicySingleWorkerWithIncreasingParallelism() {
for (int exponent = 0; exponent < 4; exponent++) {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
int scaleFactor = (int) Math.pow(10, exponent);
long operations = 3_000L * scaleFactor;
@@ -121,7 +122,7 @@ public class DynamicThrottlePolicyTest {
@Test
void singlePolicyIncreasingWorkersWithNoParallelism() {
for (int exponent = 0; exponent < 4; exponent++) {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
int scaleFactor = (int) Math.pow(10, exponent);
long operations = 2_000L * scaleFactor;
@@ -156,7 +157,7 @@ public class DynamicThrottlePolicyTest {
int numberOfWorkers = 1 + (int) (10 * Math.random());
int maximumTasksPerWorker = 100_000;
int workerParallelism = 32;
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy1 = new DynamicThrottlePolicy(timer);
DynamicThrottlePolicy policy2 = new DynamicThrottlePolicy(timer).setWeight(0.5);
Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy1, policy2);
@@ -180,7 +181,7 @@ public class DynamicThrottlePolicyTest {
int numberOfWorkers = 6;
int maximumTasksPerWorker = 180 + (int) (120 * Math.random());
int workerParallelism = 60 + (int) (40 * Math.random());
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
int p = 10;
DynamicThrottlePolicy[] policies = IntStream.range(0, p)
.mapToObj(j -> new DynamicThrottlePolicy(timer)
@@ -213,7 +214,7 @@ public class DynamicThrottlePolicyTest {
}
private Summary run(long operations, int workPerSuccess, int numberOfWorkers, int maximumTasksPerWorker,
- int workerParallelism, CustomTimer timer, DynamicThrottlePolicy... policies) {
+ int workerParallelism, ManualTimer timer, DynamicThrottlePolicy... policies) {
System.err.printf("\n### Running %d operations of %d ticks each against %d workers with parallelism %d and queue size %d\n",
operations, workPerSuccess, numberOfWorkers, workerParallelism, maximumTasksPerWorker);
@@ -250,7 +251,7 @@ public class DynamicThrottlePolicyTest {
++ticks;
totalPending += resource.pending();
resource.tick();
- ++timer.millis;
+ timer.advance(1);
}
for (int i = 0; i < windows.length; i++)
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java
index 5e579f5d622..b4ca923c4ff 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
+import com.yahoo.concurrent.ManualTimer;
import com.yahoo.messagebus.test.SimpleMessage;
import org.junit.jupiter.api.Test;
@@ -11,7 +12,7 @@ public class RateThrottlingTestCase {
@Test
void testPending() {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
RateThrottlingPolicy policy = new RateThrottlingPolicy(5.0, timer);
policy.setMaxPendingCount(200);
@@ -20,7 +21,7 @@ public class RateThrottlingTestCase {
}
public int getActualRate(double desiredRate) {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
RateThrottlingPolicy policy = new RateThrottlingPolicy(desiredRate, timer);
int ok = 0;
@@ -28,7 +29,7 @@ public class RateThrottlingTestCase {
if (policy.canSend(new SimpleMessage("test"), 0)) {
ok++;
}
- timer.millis += 10;
+ timer.advance(10);
}
return ok;
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
index 045dc1177db..2e50d561778 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
+import com.yahoo.concurrent.ManualTimer;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.network.rpc.test.TestServer;
@@ -14,7 +15,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
@@ -29,8 +30,8 @@ public class ThrottlerTestCase {
@BeforeEach
public void setUp() throws ListenFailedException {
RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
- table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
- table.addRoute("test", Arrays.asList("dst"));
+ table.addHop("dst", "test/dst/session", List.of("test/dst/session"));
+ table.addRoute("test", List.of("dst"));
slobrok = new Slobrok();
src = new TestServer("test/src", table, slobrok, null);
dst = new TestServer("test/dst", table, slobrok, null);
@@ -127,7 +128,7 @@ public class ThrottlerTestCase {
@Test
void testDynamicWindowSize() {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
policy.setWindowSizeIncrement(5)
@@ -151,7 +152,7 @@ public class ThrottlerTestCase {
@Test
void testIdleTimePeriod() {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
policy.setWindowSizeIncrement(5)
@@ -162,15 +163,15 @@ public class ThrottlerTestCase {
assertTrue(windowSize >= 90 && windowSize <= 110);
Message msg = new SimpleMessage("foo");
- timer.millis += 30 * 1000;
+ timer.advance(30 * 1000);
assertTrue(policy.canSend(msg, 0));
assertTrue(windowSize >= 90 && windowSize <= 110);
- timer.millis += 60 * 1000 + 1;
+ timer.advance(60 * 1000 + 1);
assertTrue(policy.canSend(msg, 50));
assertEquals(55, policy.getMaxPendingCount());
- timer.millis += 60 * 1000 + 1;
+ timer.advance(60 * 1000 + 1);
assertTrue(policy.canSend(msg, 0));
assertEquals(5, policy.getMaxPendingCount());
@@ -178,7 +179,7 @@ public class ThrottlerTestCase {
@Test
void testMinWindowSize() {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
policy.setWindowSizeIncrement(5)
@@ -191,7 +192,7 @@ public class ThrottlerTestCase {
@Test
void testMaxWindowSize() {
- CustomTimer timer = new CustomTimer();
+ ManualTimer timer = new ManualTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
policy.setWindowSizeIncrement(5);
@@ -202,7 +203,7 @@ public class ThrottlerTestCase {
assertTrue(windowSize >= 40 && windowSize <= 50);
}
- private int getWindowSize(DynamicThrottlePolicy policy, CustomTimer timer, int maxPending) {
+ private int getWindowSize(DynamicThrottlePolicy policy, ManualTimer timer, int maxPending) {
Message msg = new SimpleMessage("foo");
Reply reply = new SimpleReply("bar");
reply.setContext(1);
@@ -213,8 +214,8 @@ public class ThrottlerTestCase {
++numPending;
}
- long tripTime = (numPending < maxPending) ? 1000 : 1000 + (numPending - maxPending) * 1000;
- timer.millis += tripTime;
+ long tripTime = (numPending < maxPending) ? 1000L : 1000 + (numPending - maxPending) * 1000L;
+ timer.advance(tripTime);
while (--numPending >= 0) {
policy.processReply(reply);
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
index afeaa1304a1..9fb817ad12f 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
@@ -6,6 +6,7 @@ import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.concurrent.Timer;
+import com.yahoo.concurrent.ManualTimer;
import com.yahoo.messagebus.network.rpc.test.TestServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -46,7 +47,7 @@ public class TargetPoolTestCase {
// Necessary setup to be able to resolve targets.
RPCServiceAddress adr1 = registerServer();
- PoolTimer timer = new PoolTimer();
+ Timer timer = new ManualTimer();
RPCTargetPool pool1 = new RPCTargetPool(timer, 0.666, 1);
RPCTarget target1 = pool1.getTarget(orb, adr1);
@@ -80,7 +81,7 @@ public class TargetPoolTestCase {
RPCServiceAddress adr2 = registerServer();
RPCServiceAddress adr3 = registerServer();
- PoolTimer timer = new PoolTimer();
+ ManualTimer timer = new ManualTimer();
RPCTargetPool pool = new RPCTargetPool(timer, 0.666, 1);
// Assert that all connections expire.
@@ -96,7 +97,7 @@ public class TargetPoolTestCase {
pool.flushTargets(false);
assertEquals(3, pool.size());
}
- timer.millis += 999;
+ timer.advance(999);
pool.flushTargets(false);
assertEquals(0, pool.size());
@@ -108,7 +109,7 @@ public class TargetPoolTestCase {
assertNotNull(target = pool.getTarget(orb, adr3));
target.subRef();
assertEquals(3, pool.size());
- timer.millis += 444;
+ timer.advance(444);
pool.flushTargets(false);
assertEquals(3, pool.size());
assertNotNull(target = pool.getTarget(orb, adr2));
@@ -116,15 +117,15 @@ public class TargetPoolTestCase {
assertNotNull(target = pool.getTarget(orb, adr3));
target.subRef();
assertEquals(3, pool.size());
- timer.millis += 444;
+ timer.advance(444);
pool.flushTargets(false);
assertEquals(2, pool.size());
assertNotNull(target = pool.getTarget(orb, adr3));
target.subRef();
- timer.millis += 444;
+ timer.advance(444);
pool.flushTargets(false);
assertEquals(1, pool.size());
- timer.millis += 444;
+ timer.advance(444);
pool.flushTargets(false);
assertEquals(0, pool.size());
@@ -132,12 +133,12 @@ public class TargetPoolTestCase {
assertNotNull(target = pool.getTarget(orb, adr1));
assertEquals(1, pool.size());
for (int i = 0; i < 10; ++i) {
- timer.millis += 999;
+ timer.advance(999);
pool.flushTargets(false);
assertEquals(1, pool.size());
}
target.subRef();
- timer.millis += 999;
+ timer.advance(999);
pool.flushTargets(false);
assertEquals(0, pool.size());
}
@@ -147,13 +148,4 @@ public class TargetPoolTestCase {
return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec());
}
- private static class PoolTimer implements Timer {
- long millis = 0;
-
- @Override
- public long milliTime() {
- return millis;
- }
- }
-
}
diff --git a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/BenchmarkProgressPrinterTest.java b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/BenchmarkProgressPrinterTest.java
index 7268e892c7d..6eba29fe9cb 100644
--- a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/BenchmarkProgressPrinterTest.java
+++ b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/BenchmarkProgressPrinterTest.java
@@ -2,7 +2,7 @@
package com.yahoo.vespafeeder;
import com.yahoo.clientmetrics.RouteMetricSet;
-import com.yahoo.concurrent.Timer;
+import com.yahoo.concurrent.ManualTimer;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.messagebus.EmptyReply;
@@ -15,17 +15,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class BenchmarkProgressPrinterTest {
- class DummyTimer implements Timer {
- long ms;
-
- public long milliTime() { return ms; }
- }
-
@Test
void testSimple() {
ByteArrayOutputStream output = new ByteArrayOutputStream();
- DummyTimer timer = new DummyTimer();
- timer.ms = 0;
+ ManualTimer timer = new ManualTimer();
BenchmarkProgressPrinter printer = new BenchmarkProgressPrinter(timer, new PrintStream(output));
RouteMetricSet metrics = new RouteMetricSet("foobar", printer);
@@ -35,7 +28,7 @@ public class BenchmarkProgressPrinterTest {
metrics.addReply(reply);
}
- timer.ms = 1200;
+ timer.set(1200);
{
EmptyReply reply = new EmptyReply();
@@ -49,7 +42,7 @@ public class BenchmarkProgressPrinterTest {
metrics.addReply(reply);
}
- timer.ms = 2400;
+ timer.set(2400);
{
EmptyReply reply = new EmptyReply();
@@ -58,7 +51,7 @@ public class BenchmarkProgressPrinterTest {
metrics.addReply(reply);
}
- timer.ms = 62000;
+ timer.set(62000);
{
EmptyReply reply = new EmptyReply();
diff --git a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/ProgressPrinterTest.java b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/ProgressPrinterTest.java
index 495367ff4c3..2307e27b161 100644
--- a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/ProgressPrinterTest.java
+++ b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/ProgressPrinterTest.java
@@ -2,7 +2,7 @@
package com.yahoo.vespafeeder;
import com.yahoo.clientmetrics.RouteMetricSet;
-import com.yahoo.concurrent.Timer;
+import com.yahoo.concurrent.ManualTimer;
import com.yahoo.documentapi.messagebus.protocol.DocumentIgnoredReply;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
@@ -16,17 +16,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class ProgressPrinterTest {
- class DummyTimer implements Timer {
- long ms;
-
- public long milliTime() { return ms; }
- }
-
@Test
void testSimple() {
ByteArrayOutputStream output = new ByteArrayOutputStream();
- DummyTimer timer = new DummyTimer();
- timer.ms = 0;
+ ManualTimer timer = new ManualTimer();
ProgressPrinter printer = new ProgressPrinter(timer, new PrintStream(output));
RouteMetricSet metrics = new RouteMetricSet("foobar", printer);
@@ -36,7 +29,7 @@ public class ProgressPrinterTest {
metrics.addReply(reply);
}
- timer.ms = 1200;
+ timer.set(1200);
{
EmptyReply reply = new EmptyReply();
@@ -50,7 +43,7 @@ public class ProgressPrinterTest {
metrics.addReply(reply);
}
- timer.ms = 2400;
+ timer.set(2400);
{
DocumentIgnoredReply reply = new DocumentIgnoredReply();
@@ -65,7 +58,7 @@ public class ProgressPrinterTest {
metrics.addReply(reply);
}
- timer.ms = 62000;
+ timer.set(62000);
{
EmptyReply reply = new EmptyReply();
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ManualTimer.java b/vespajlib/src/main/java/com/yahoo/concurrent/ManualTimer.java
new file mode 100644
index 00000000000..ffa6acd446a
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/ManualTimer.java
@@ -0,0 +1,18 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+/**
+ * Simple manual Timer for use in tests
+ * @author baldersheim
+ */
+public class ManualTimer implements Timer {
+
+ private long millis = 0;
+ public void set(long ms) { millis = ms; }
+ public void advance(long ms) { millis += ms; }
+
+ @Override
+ public long milliTime() {
+ return millis;
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java b/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java
index 1793e860af8..282524c0d54 100644
--- a/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java
@@ -1,6 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.concurrent;
+import java.time.Clock;
+import java.util.concurrent.TimeUnit;
+
/**
* This interface wraps access to some timer that can be used to measure elapsed time, in milliseconds. This
* abstraction allows for unit testing the behavior of time-based constructs.
@@ -16,5 +19,14 @@ public interface Timer {
* @return The current value of the timer, in milliseconds.
*/
long milliTime();
+ Timer monotonic = () -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ static Timer wrap(Clock original) {
+ return new Timer() {
+ private final Clock clock = original;
+ @Override
+ public long milliTime() {
+ return clock.millis();
+ }
+ }; }
}