summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-09-13 23:04:07 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2022-09-14 07:27:23 +0200
commitbb54881376421e09daa8287173e06458b0c1f45a (patch)
treeec65a19ce5ad918fa181c9d76b3664f3390d9a80
parent04c3414342c1cc296f8a56d4112f77b1a463cc70 (diff)
Factor out timeout and coverage handling to make the InterleavedSearchInvoker easier to understand and modify.
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java69
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java97
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java144
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java26
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java11
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java25
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/MonotonicTimer.java15
8 files changed, 262 insertions, 130 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java
new file mode 100644
index 00000000000..e725a152f09
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java
@@ -0,0 +1,69 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.concurrent.Timer;
+import com.yahoo.search.Query;
+import com.yahoo.vespa.config.search.DispatchConfig;
+
+import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT;
+
+/**
+ * Computes next timeout based on how many responses has been received so far
+ *
+ * @author baldersheim
+ */
+class AdaptiveTimeoutHandler implements TimeoutHandler {
+ private final double minimumCoverage;
+ private final int askedNodes;
+ private final int minimumResponses;
+ private final Query query;
+ private final Timer timer;
+ private final DispatchConfig config;
+
+ private long deadline;
+ private long adaptiveTimeoutMin;
+ private long adaptiveTimeoutMax;
+ boolean adaptiveTimeoutCalculated = false;
+
+ AdaptiveTimeoutHandler(Timer timer, DispatchConfig config, int askedNodes, Query query) {
+ minimumCoverage = config.minSearchCoverage();
+ this.config = config;
+ this.askedNodes = askedNodes;
+ this.query = query;
+ this.timer = timer;
+ minimumResponses = (int) Math.ceil(askedNodes * minimumCoverage / 100.0);
+ deadline = timer.milliTime() + query.getTimeLeft();
+ }
+
+ @Override
+ public long nextTimeout(int answeredNodes) {
+ if (askedNodes == answeredNodes) return query.getTimeLeft(); // All nodes have responded - done
+ if (answeredNodes < minimumResponses) return query.getTimeLeft(); // Minimum responses have not been received yet
+
+ if (!adaptiveTimeoutCalculated) {
+ // Recompute timeout when minimum responses have been received
+ long timeLeftMs = query.getTimeLeft();
+ adaptiveTimeoutMin = (long) (timeLeftMs * config.minWaitAfterCoverageFactor());
+ adaptiveTimeoutMax = (long) (timeLeftMs * config.maxWaitAfterCoverageFactor());
+ adaptiveTimeoutCalculated = true;
+ }
+ long now = timer.milliTime();
+ int pendingQueries = askedNodes - answeredNodes;
+ double missWidth = ((100.0 - minimumCoverage) * askedNodes) / 100.0 - 1.0;
+ double slopedWait = adaptiveTimeoutMin;
+ if (pendingQueries > 1 && missWidth > 0.0) {
+ slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth;
+ }
+ long nextAdaptive = (long) slopedWait;
+ if (now + nextAdaptive >= deadline) {
+ return deadline - now;
+ }
+ deadline = now + nextAdaptive;
+
+ return nextAdaptive;
+ }
+
+ @Override
+ public int reason() {
+ return DEGRADED_BY_ADAPTIVE_TIMEOUT;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java b/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java
new file mode 100644
index 00000000000..6d4b321d861
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java
@@ -0,0 +1,97 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.result.Coverage;
+
+import static com.yahoo.container.handler.Coverage.*;
+
+/**
+ * Aggregates coverage as responses are added.
+ *
+ * @author baldersheim
+ */
+public class CoverageAggregator {
+ private final int askedNodes;
+ private int answeredNodes = 0;
+ private int answeredNodesParticipated = 0;
+ private int failedNodes = 0;
+ private long answeredDocs = 0;
+ private long answeredActiveDocs = 0;
+ private long answeredTargetActiveDocs = 0;
+ private boolean timedOut = false;
+ private boolean degradedByMatchPhase = false;
+ CoverageAggregator(int askedNodes) {
+ this.askedNodes = askedNodes;
+ }
+ CoverageAggregator(CoverageAggregator rhs) {
+ askedNodes = rhs.askedNodes;
+ answeredNodes = rhs.answeredNodes;
+ answeredNodesParticipated = rhs.answeredNodesParticipated;
+ failedNodes = rhs.failedNodes;
+ answeredDocs = rhs.answeredDocs;
+ answeredActiveDocs = rhs.answeredActiveDocs;
+ answeredTargetActiveDocs = rhs.answeredTargetActiveDocs;
+ timedOut = rhs.timedOut;
+ degradedByMatchPhase = rhs.degradedByMatchPhase;
+ }
+ void add(Coverage source) {
+ answeredDocs += source.getDocs();
+ answeredActiveDocs += source.getActive();
+ answeredTargetActiveDocs += source.getTargetActive();
+ answeredNodesParticipated += source.getNodes();
+ answeredNodes++;
+ degradedByMatchPhase |= source.isDegradedByMatchPhase();
+ timedOut |= source.isDegradedByTimeout();
+ }
+ public int getAskedNodes() {
+ return askedNodes;
+ }
+ public int getAnswerdNodes() {
+ return answeredNodes;
+ }
+ public boolean hasNoAnswers() { return answeredNodes == 0; }
+ public void setTimedOut() { timedOut = true; }
+ public void setFailedNodes(int failedNodes) {
+ this.failedNodes += failedNodes;
+ }
+
+ public Coverage createCoverage(TimeoutHandler timeoutHandler) {
+ Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1);
+ coverage.setNodesTried(askedNodes);
+ coverage.setTargetActive(answeredTargetActiveDocs);
+ int degradedReason = 0;
+ if (timedOut) {
+ degradedReason |= timeoutHandler.reason();
+ }
+ if (degradedByMatchPhase) {
+ degradedReason |= DEGRADED_BY_MATCH_PHASE;
+ }
+ coverage.setDegradedReason(degradedReason);
+ return coverage;
+ }
+ public CoverageAggregator adjustDegradedCoverage(int searchableCopies, TimeoutHandler timeoutHandler) {
+ int askedAndFailed = askedNodes + failedNodes;
+ if (askedAndFailed == answeredNodesParticipated) {
+ return this;
+ }
+ int notAnswered = askedAndFailed - answeredNodesParticipated;
+
+ if ((timeoutHandler.reason() == DEGRADED_BY_ADAPTIVE_TIMEOUT) && answeredNodesParticipated > 0) {
+ CoverageAggregator clone = new CoverageAggregator(this);
+ clone.answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated);
+ clone.answeredTargetActiveDocs += (notAnswered * answeredTargetActiveDocs / answeredNodesParticipated);
+ return clone;
+ } else {
+ if (askedAndFailed > answeredNodesParticipated) {
+ int missingNodes = notAnswered - (searchableCopies - 1);
+ if (answeredNodesParticipated > 0) {
+ CoverageAggregator clone = new CoverageAggregator(this);
+ clone.answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated);
+ clone.answeredTargetActiveDocs += (missingNodes * answeredTargetActiveDocs / answeredNodesParticipated);
+ clone.timedOut = true;
+ return clone;
+ }
+ }
+ }
+ return this;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index 8c92e8b0270..5dace0bbff8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -1,12 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.concurrent.Timer;
import com.yahoo.prelude.fastsearch.GroupingListHit;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
-import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
@@ -25,10 +25,6 @@ import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
-import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT;
-import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE;
-import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
-
/**
* InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content
* nodes in parallel. Operationally it first sends requests to all contained invokers and then
@@ -40,38 +36,35 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName());
+ private final Timer timer;
private final Set<SearchInvoker> invokers;
private final SearchCluster searchCluster;
private final Group group;
private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
private final Set<Integer> alreadyFailedNodes;
+ private final CoverageAggregator coverageAggregator;
private Query query;
- private boolean adaptiveTimeoutCalculated = false;
- private long adaptiveTimeoutMin = 0;
- private long adaptiveTimeoutMax = 0;
- private long deadline = 0;
-
- private long answeredDocs = 0;
- private long answeredActiveDocs = 0;
- private long answeredTargetActiveDocs = 0;
- private int askedNodes = 0;
- private int answeredNodes = 0;
- private int answeredNodesParticipated = 0;
- private boolean timedOut = false;
- private boolean degradedByMatchPhase = false;
-
- public InterleavedSearchInvoker(Collection<SearchInvoker> invokers,
+ TimeoutHandler timeoutHandler;
+ public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers,
SearchCluster searchCluster,
Group group,
Set<Integer> alreadyFailedNodes) {
super(Optional.empty());
+ this.timer = timer;
this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
this.invokers.addAll(invokers);
this.searchCluster = searchCluster;
this.group = group;
this.availableForProcessing = newQueue();
this.alreadyFailedNodes = alreadyFailedNodes;
+ coverageAggregator = new CoverageAggregator(invokers.size());
+ }
+
+ TimeoutHandler create(DispatchConfig config, int askedNodes, Query query) {
+ return (config.minSearchCoverage() < 100.0D)
+ ? new AdaptiveTimeoutHandler(timer, config, askedNodes, query)
+ : new SimpleTimeoutHandler(query);
}
/**
@@ -83,7 +76,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
protected Object sendSearchRequest(Query query, Object unusedContext) throws IOException {
this.query = query;
invokers.forEach(invoker -> invoker.setMonitor(this));
- deadline = currentTime() + query.getTimeLeft();
int originalHits = query.getHits();
int originalOffset = query.getOffset();
@@ -101,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
Object context = null;
for (SearchInvoker invoker : invokers) {
context = invoker.sendSearchRequest(query, context);
- askedNodes++;
}
+ timeoutHandler = create(searchCluster.dispatchConfig(), invokers.size(), query);
query.setHits(originalHits);
query.setOffset(originalOffset);
@@ -119,14 +111,15 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
while (!invokers.isEmpty() && nextTimeout >= 0) {
SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS);
if (invoker == null) {
- log.fine(() -> "Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received");
+ log.fine(() -> "Search timed out with " + coverageAggregator.getAskedNodes() + " requests made, " +
+ coverageAggregator.getAnswerdNodes() + " responses received");
break;
} else {
InvokerResult toMerge = invoker.getSearchResult(execution);
merged = mergeResult(result.getResult(), toMerge, merged, groupingResultAggregator);
ejectInvoker(invoker);
}
- nextTimeout = nextTimeout();
+ nextTimeout = timeoutHandler.nextTimeout(coverageAggregator.getAnswerdNodes());
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for search results", e);
@@ -134,7 +127,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h));
insertNetworkErrors(result.getResult());
- result.getResult().setCoverage(createCoverage());
+ CoverageAggregator adjusted = coverageAggregator.adjustDegradedCoverage((int)searchCluster.dispatchConfig().searchableCopies(), timeoutHandler);
+ result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler));
int needed = query.getOffset() + query.getHits();
for (int index = query.getOffset(); (index < merged.size()) && (index < needed); index++) {
@@ -146,7 +140,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private void insertNetworkErrors(Result result) {
// Network errors will be reported as errors only when all nodes fail, otherwise they are just traced
- boolean asErrors = answeredNodes == 0;
+ boolean asErrors = coverageAggregator.hasNoAnswers();
if (!invokers.isEmpty()) {
String keys = invokers.stream().map(SearchInvoker::distributionKey).map(dk -> dk.map(i -> i.toString()).orElse("(unspecified)"))
@@ -158,7 +152,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
} else {
query.trace("Backend communication timeout on nodes with distribution-keys: " + keys, 2);
}
- timedOut = true;
+ coverageAggregator.setTimedOut();
}
if (alreadyFailedNodes != null) {
var message = "Connection failure on nodes with distribution-keys: "
@@ -168,51 +162,13 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
} else {
query.trace(message, 2);
}
- int failed = alreadyFailedNodes.size();
- askedNodes += failed;
- answeredNodes += failed;
- }
- }
-
- private long nextTimeout() {
- DispatchConfig config = searchCluster.dispatchConfig();
- double minimumCoverage = config.minSearchCoverage();
-
- if (askedNodes == answeredNodes || minimumCoverage >= 100.0) {
- return query.getTimeLeft();
+ coverageAggregator.setFailedNodes(alreadyFailedNodes.size());
}
- int minimumResponses = (int) Math.ceil(askedNodes * minimumCoverage / 100.0);
-
- if (answeredNodes < minimumResponses) {
- return query.getTimeLeft();
- }
-
- long timeLeft = query.getTimeLeft();
- if (!adaptiveTimeoutCalculated) {
- adaptiveTimeoutMin = (long) (timeLeft * config.minWaitAfterCoverageFactor());
- adaptiveTimeoutMax = (long) (timeLeft * config.maxWaitAfterCoverageFactor());
- adaptiveTimeoutCalculated = true;
- }
-
- long now = currentTime();
- int pendingQueries = askedNodes - answeredNodes;
- double missWidth = ((100.0 - config.minSearchCoverage()) * askedNodes) / 100.0 - 1.0;
- double slopedWait = adaptiveTimeoutMin;
- if (pendingQueries > 1 && missWidth > 0.0) {
- slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth;
- }
- long nextAdaptive = (long) slopedWait;
- if (now + nextAdaptive >= deadline) {
- return deadline - now;
- }
- deadline = now + nextAdaptive;
-
- return nextAdaptive;
}
private List<LeanHit> mergeResult(Result result, InvokerResult partialResult, List<LeanHit> current,
GroupingResultAggregator groupingResultAggregator) {
- collectCoverage(partialResult.getResult().getCoverage(true));
+ coverageAggregator.add(partialResult.getResult().getCoverage(true));
result.mergeWith(partialResult.getResult());
List<Hit> partialNonLean = partialResult.getResult().hits().asUnorderedHits();
@@ -265,55 +221,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
return merged;
}
- private void collectCoverage(Coverage source) {
- answeredDocs += source.getDocs();
- answeredActiveDocs += source.getActive();
- answeredTargetActiveDocs += source.getTargetActive();
- answeredNodesParticipated += source.getNodes();
- answeredNodes++;
- degradedByMatchPhase |= source.isDegradedByMatchPhase();
- timedOut |= source.isDegradedByTimeout();
- }
-
- private Coverage createCoverage() {
- adjustDegradedCoverage();
-
- Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1);
- coverage.setNodesTried(askedNodes);
- coverage.setTargetActive(answeredTargetActiveDocs);
- int degradedReason = 0;
- if (timedOut) {
- degradedReason |= (adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT);
- }
- if (degradedByMatchPhase) {
- degradedReason |= DEGRADED_BY_MATCH_PHASE;
- }
- coverage.setDegradedReason(degradedReason);
- return coverage;
- }
-
- private void adjustDegradedCoverage() {
- if (askedNodes == answeredNodesParticipated) {
- return;
- }
- int notAnswered = askedNodes - answeredNodesParticipated;
-
- if (adaptiveTimeoutCalculated && answeredNodesParticipated > 0) {
- answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated);
- answeredTargetActiveDocs += (notAnswered * answeredTargetActiveDocs / answeredNodesParticipated);
- } else {
- if (askedNodes > answeredNodesParticipated) {
- int searchableCopies = (int) searchCluster.dispatchConfig().searchableCopies();
- int missingNodes = notAnswered - (searchableCopies - 1);
- if (answeredNodesParticipated > 0) {
- answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated);
- answeredTargetActiveDocs += (missingNodes * answeredTargetActiveDocs / answeredNodesParticipated);
- timedOut = true;
- }
- }
- }
- }
-
private void ejectInvoker(SearchInvoker invoker) {
invokers.remove(invoker);
invoker.release();
@@ -340,11 +247,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
}
// For overriding in tests
- protected long currentTime() {
- return System.currentTimeMillis();
- }
-
- // For overriding in tests
protected LinkedBlockingQueue<SearchInvoker> newQueue() {
return new LinkedBlockingQueue<>();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index eb379a51ed4..b0505fc5517 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.concurrent.MonotonicTimer;
+import com.yahoo.concurrent.Timer;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
@@ -22,6 +24,7 @@ import java.util.Set;
public abstract class InvokerFactory {
protected final SearchCluster searchCluster;
+ private static final Timer timer = new MonotonicTimer();
public InvokerFactory(SearchCluster searchCluster) {
this.searchCluster = searchCluster;
@@ -89,7 +92,7 @@ public abstract class InvokerFactory {
if (invokers.size() == 1 && failed == null) {
return Optional.of(invokers.get(0));
} else {
- return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster, group, failed));
+ return Optional.of(new InterleavedSearchInvoker(timer, invokers, searchCluster, group, failed));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java
new file mode 100644
index 00000000000..8732cee7652
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java
@@ -0,0 +1,26 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.Query;
+
+import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
+
+/**
+ * Computes the timeout based solely on the query timeout
+ *
+ * @author baldersheim
+ */
+public class SimpleTimeoutHandler implements TimeoutHandler {
+ private final Query query;
+ SimpleTimeoutHandler(Query query) {
+ this.query = query;
+ }
+ @Override
+ public long nextTimeout(int answeredNodes) {
+ return query.getTimeLeft();
+ }
+
+ @Override
+ public int reason() {
+ return DEGRADED_BY_TIMEOUT;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java
new file mode 100644
index 00000000000..7c86b02d368
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java
@@ -0,0 +1,11 @@
+package com.yahoo.search.dispatch;
+
+/**
+ * Computes next timeout
+ *
+ * @author baldersheim
+ */
+public interface TimeoutHandler {
+ long nextTimeout(int answeredNodes);
+ int reason();
+}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
index a88046197e0..b935e07cf4d 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.concurrent.MonotonicTimer;
+import com.yahoo.concurrent.Timer;
import com.yahoo.document.GlobalId;
import com.yahoo.document.idstring.IdString;
import com.yahoo.prelude.fastsearch.FastHit;
@@ -23,6 +25,7 @@ import com.yahoo.test.ManualClock;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -33,6 +36,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.ToLongFunction;
import java.util.stream.StreamSupport;
import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE;
@@ -347,7 +351,7 @@ public class InterleavedSearchInvokerTest {
.addAggregationResult(new MinAggregationResult().setMin(new IntegerResultNode(6)).setTag(3))));
invokers.add(new MockInvoker(0).setHits(List.of(new GroupingListHit(List.of(grouping2)))));
- InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(invokers, cluster, new Group(0, List.of()), Collections.emptySet());
+ InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(new MonotonicTimer(), invokers, cluster, new Group(0, List.of()), Collections.emptySet());
invoker.responseAvailable(invokers.get(0));
invoker.responseAvailable(invokers.get(1));
Result result = invoker.search(query, null);
@@ -360,7 +364,7 @@ public class InterleavedSearchInvokerTest {
List<SearchInvoker> invokers = new ArrayList<>();
invokers.add(createInvoker(a, 0));
invokers.add(createInvoker(b, 1));
- InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(invokers, cluster, group, Collections.emptySet());
+ InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(new MonotonicTimer(), invokers, cluster, group, Collections.emptySet());
invoker.responseAvailable(invokers.get(0));
invoker.responseAvailable(invokers.get(1));
return invoker;
@@ -406,17 +410,22 @@ public class InterleavedSearchInvokerTest {
assertTrue(cov.isDegradedByTimeout());
}
+ private static class ClockAsTimer implements Timer {
+ private final Clock clock;
+ ClockAsTimer(Clock clock) { this.clock = clock; }
+
+ @Override
+ public long milliTime() {
+ return clock.millis();
+ }
+ }
+
private InterleavedSearchInvoker createInterleavedInvoker(SearchCluster searchCluster, Group group, int numInvokers) {
for (int i = 0; i < numInvokers; i++) {
invokers.add(new MockInvoker(i));
}
- return new InterleavedSearchInvoker(invokers, searchCluster, group,null) {
-
- @Override
- protected long currentTime() {
- return clock.millis();
- }
+ return new InterleavedSearchInvoker(new ClockAsTimer(clock), invokers, searchCluster, group,null) {
@Override
protected LinkedBlockingQueue<SearchInvoker> newQueue() {
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/MonotonicTimer.java b/vespajlib/src/main/java/com/yahoo/concurrent/MonotonicTimer.java
new file mode 100644
index 00000000000..03cad7b37a6
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/MonotonicTimer.java
@@ -0,0 +1,15 @@
+package com.yahoo.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implements the Timer interface by using System.nanoTime.
+ *
+ * @author baldersheim
+ */
+public class MonotonicTimer implements Timer {
+ @Override
+ public long milliTime() {
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ }
+}