diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-09-13 23:04:07 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-09-14 07:27:23 +0200 |
commit | bb54881376421e09daa8287173e06458b0c1f45a (patch) | |
tree | ec65a19ce5ad918fa181c9d76b3664f3390d9a80 /container-search/src/main/java/com/yahoo/search | |
parent | 04c3414342c1cc296f8a56d4112f77b1a463cc70 (diff) |
Factor out timeout and coverage handling to make the InterleavedSearchInvoker easier to understand and modify.
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search')
6 files changed, 230 insertions, 122 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java new file mode 100644 index 00000000000..e725a152f09 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/AdaptiveTimeoutHandler.java @@ -0,0 +1,69 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.concurrent.Timer; +import com.yahoo.search.Query; +import com.yahoo.vespa.config.search.DispatchConfig; + +import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT; + +/** + * Computes next timeout based on how many responses has been received so far + * + * @author baldersheim + */ +class AdaptiveTimeoutHandler implements TimeoutHandler { + private final double minimumCoverage; + private final int askedNodes; + private final int minimumResponses; + private final Query query; + private final Timer timer; + private final DispatchConfig config; + + private long deadline; + private long adaptiveTimeoutMin; + private long adaptiveTimeoutMax; + boolean adaptiveTimeoutCalculated = false; + + AdaptiveTimeoutHandler(Timer timer, DispatchConfig config, int askedNodes, Query query) { + minimumCoverage = config.minSearchCoverage(); + this.config = config; + this.askedNodes = askedNodes; + this.query = query; + this.timer = timer; + minimumResponses = (int) Math.ceil(askedNodes * minimumCoverage / 100.0); + deadline = timer.milliTime() + query.getTimeLeft(); + } + + @Override + public long nextTimeout(int answeredNodes) { + if (askedNodes == answeredNodes) return query.getTimeLeft(); // All nodes have responded - done + if (answeredNodes < minimumResponses) return query.getTimeLeft(); // Minimum responses have not been received yet + + if (!adaptiveTimeoutCalculated) { + // Recompute timeout when minimum responses have been received + long timeLeftMs = query.getTimeLeft(); + adaptiveTimeoutMin = (long) (timeLeftMs * config.minWaitAfterCoverageFactor()); + adaptiveTimeoutMax = (long) (timeLeftMs * config.maxWaitAfterCoverageFactor()); + adaptiveTimeoutCalculated = true; + } + long now = timer.milliTime(); + int pendingQueries = askedNodes - answeredNodes; + double missWidth = ((100.0 - minimumCoverage) * askedNodes) / 100.0 - 1.0; + double slopedWait = adaptiveTimeoutMin; + if (pendingQueries > 1 && missWidth > 0.0) { + slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth; + } + long nextAdaptive = (long) slopedWait; + if (now + nextAdaptive >= deadline) { + return deadline - now; + } + deadline = now + nextAdaptive; + + return nextAdaptive; + } + + @Override + public int reason() { + return DEGRADED_BY_ADAPTIVE_TIMEOUT; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java b/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java new file mode 100644 index 00000000000..6d4b321d861 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CoverageAggregator.java @@ -0,0 +1,97 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.search.result.Coverage; + +import static com.yahoo.container.handler.Coverage.*; + +/** + * Aggregates coverage as responses are added. + * + * @author baldersheim + */ +public class CoverageAggregator { + private final int askedNodes; + private int answeredNodes = 0; + private int answeredNodesParticipated = 0; + private int failedNodes = 0; + private long answeredDocs = 0; + private long answeredActiveDocs = 0; + private long answeredTargetActiveDocs = 0; + private boolean timedOut = false; + private boolean degradedByMatchPhase = false; + CoverageAggregator(int askedNodes) { + this.askedNodes = askedNodes; + } + CoverageAggregator(CoverageAggregator rhs) { + askedNodes = rhs.askedNodes; + answeredNodes = rhs.answeredNodes; + answeredNodesParticipated = rhs.answeredNodesParticipated; + failedNodes = rhs.failedNodes; + answeredDocs = rhs.answeredDocs; + answeredActiveDocs = rhs.answeredActiveDocs; + answeredTargetActiveDocs = rhs.answeredTargetActiveDocs; + timedOut = rhs.timedOut; + degradedByMatchPhase = rhs.degradedByMatchPhase; + } + void add(Coverage source) { + answeredDocs += source.getDocs(); + answeredActiveDocs += source.getActive(); + answeredTargetActiveDocs += source.getTargetActive(); + answeredNodesParticipated += source.getNodes(); + answeredNodes++; + degradedByMatchPhase |= source.isDegradedByMatchPhase(); + timedOut |= source.isDegradedByTimeout(); + } + public int getAskedNodes() { + return askedNodes; + } + public int getAnswerdNodes() { + return answeredNodes; + } + public boolean hasNoAnswers() { return answeredNodes == 0; } + public void setTimedOut() { timedOut = true; } + public void setFailedNodes(int failedNodes) { + this.failedNodes += failedNodes; + } + + public Coverage createCoverage(TimeoutHandler timeoutHandler) { + Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1); + coverage.setNodesTried(askedNodes); + coverage.setTargetActive(answeredTargetActiveDocs); + int degradedReason = 0; + if (timedOut) { + degradedReason |= timeoutHandler.reason(); + } + if (degradedByMatchPhase) { + degradedReason |= DEGRADED_BY_MATCH_PHASE; + } + coverage.setDegradedReason(degradedReason); + return coverage; + } + public CoverageAggregator adjustDegradedCoverage(int searchableCopies, TimeoutHandler timeoutHandler) { + int askedAndFailed = askedNodes + failedNodes; + if (askedAndFailed == answeredNodesParticipated) { + return this; + } + int notAnswered = askedAndFailed - answeredNodesParticipated; + + if ((timeoutHandler.reason() == DEGRADED_BY_ADAPTIVE_TIMEOUT) && answeredNodesParticipated > 0) { + CoverageAggregator clone = new CoverageAggregator(this); + clone.answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated); + clone.answeredTargetActiveDocs += (notAnswered * answeredTargetActiveDocs / answeredNodesParticipated); + return clone; + } else { + if (askedAndFailed > answeredNodesParticipated) { + int missingNodes = notAnswered - (searchableCopies - 1); + if (answeredNodesParticipated > 0) { + CoverageAggregator clone = new CoverageAggregator(this); + clone.answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated); + clone.answeredTargetActiveDocs += (missingNodes * answeredTargetActiveDocs / answeredNodesParticipated); + clone.timedOut = true; + return clone; + } + } + } + return this; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index 8c92e8b0270..5dace0bbff8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -1,12 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.yahoo.concurrent.Timer; import com.yahoo.prelude.fastsearch.GroupingListHit; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.SearchCluster; -import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; @@ -25,10 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; -import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT; -import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE; -import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT; - /** * InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content * nodes in parallel. Operationally it first sends requests to all contained invokers and then @@ -40,38 +36,35 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName()); + private final Timer timer; private final Set<SearchInvoker> invokers; private final SearchCluster searchCluster; private final Group group; private final LinkedBlockingQueue<SearchInvoker> availableForProcessing; private final Set<Integer> alreadyFailedNodes; + private final CoverageAggregator coverageAggregator; private Query query; - private boolean adaptiveTimeoutCalculated = false; - private long adaptiveTimeoutMin = 0; - private long adaptiveTimeoutMax = 0; - private long deadline = 0; - - private long answeredDocs = 0; - private long answeredActiveDocs = 0; - private long answeredTargetActiveDocs = 0; - private int askedNodes = 0; - private int answeredNodes = 0; - private int answeredNodesParticipated = 0; - private boolean timedOut = false; - private boolean degradedByMatchPhase = false; - - public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, + TimeoutHandler timeoutHandler; + public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers, SearchCluster searchCluster, Group group, Set<Integer> alreadyFailedNodes) { super(Optional.empty()); + this.timer = timer; this.invokers = Collections.newSetFromMap(new IdentityHashMap<>()); this.invokers.addAll(invokers); this.searchCluster = searchCluster; this.group = group; this.availableForProcessing = newQueue(); this.alreadyFailedNodes = alreadyFailedNodes; + coverageAggregator = new CoverageAggregator(invokers.size()); + } + + TimeoutHandler create(DispatchConfig config, int askedNodes, Query query) { + return (config.minSearchCoverage() < 100.0D) + ? new AdaptiveTimeoutHandler(timer, config, askedNodes, query) + : new SimpleTimeoutHandler(query); } /** @@ -83,7 +76,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM protected Object sendSearchRequest(Query query, Object unusedContext) throws IOException { this.query = query; invokers.forEach(invoker -> invoker.setMonitor(this)); - deadline = currentTime() + query.getTimeLeft(); int originalHits = query.getHits(); int originalOffset = query.getOffset(); @@ -101,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM Object context = null; for (SearchInvoker invoker : invokers) { context = invoker.sendSearchRequest(query, context); - askedNodes++; } + timeoutHandler = create(searchCluster.dispatchConfig(), invokers.size(), query); query.setHits(originalHits); query.setOffset(originalOffset); @@ -119,14 +111,15 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM while (!invokers.isEmpty() && nextTimeout >= 0) { SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS); if (invoker == null) { - log.fine(() -> "Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received"); + log.fine(() -> "Search timed out with " + coverageAggregator.getAskedNodes() + " requests made, " + + coverageAggregator.getAnswerdNodes() + " responses received"); break; } else { InvokerResult toMerge = invoker.getSearchResult(execution); merged = mergeResult(result.getResult(), toMerge, merged, groupingResultAggregator); ejectInvoker(invoker); } - nextTimeout = nextTimeout(); + nextTimeout = timeoutHandler.nextTimeout(coverageAggregator.getAnswerdNodes()); } } catch (InterruptedException e) { throw new RuntimeException("Interrupted while waiting for search results", e); @@ -134,7 +127,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h)); insertNetworkErrors(result.getResult()); - result.getResult().setCoverage(createCoverage()); + CoverageAggregator adjusted = coverageAggregator.adjustDegradedCoverage((int)searchCluster.dispatchConfig().searchableCopies(), timeoutHandler); + result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler)); int needed = query.getOffset() + query.getHits(); for (int index = query.getOffset(); (index < merged.size()) && (index < needed); index++) { @@ -146,7 +140,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private void insertNetworkErrors(Result result) { // Network errors will be reported as errors only when all nodes fail, otherwise they are just traced - boolean asErrors = answeredNodes == 0; + boolean asErrors = coverageAggregator.hasNoAnswers(); if (!invokers.isEmpty()) { String keys = invokers.stream().map(SearchInvoker::distributionKey).map(dk -> dk.map(i -> i.toString()).orElse("(unspecified)")) @@ -158,7 +152,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM } else { query.trace("Backend communication timeout on nodes with distribution-keys: " + keys, 2); } - timedOut = true; + coverageAggregator.setTimedOut(); } if (alreadyFailedNodes != null) { var message = "Connection failure on nodes with distribution-keys: " @@ -168,51 +162,13 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM } else { query.trace(message, 2); } - int failed = alreadyFailedNodes.size(); - askedNodes += failed; - answeredNodes += failed; - } - } - - private long nextTimeout() { - DispatchConfig config = searchCluster.dispatchConfig(); - double minimumCoverage = config.minSearchCoverage(); - - if (askedNodes == answeredNodes || minimumCoverage >= 100.0) { - return query.getTimeLeft(); + coverageAggregator.setFailedNodes(alreadyFailedNodes.size()); } - int minimumResponses = (int) Math.ceil(askedNodes * minimumCoverage / 100.0); - - if (answeredNodes < minimumResponses) { - return query.getTimeLeft(); - } - - long timeLeft = query.getTimeLeft(); - if (!adaptiveTimeoutCalculated) { - adaptiveTimeoutMin = (long) (timeLeft * config.minWaitAfterCoverageFactor()); - adaptiveTimeoutMax = (long) (timeLeft * config.maxWaitAfterCoverageFactor()); - adaptiveTimeoutCalculated = true; - } - - long now = currentTime(); - int pendingQueries = askedNodes - answeredNodes; - double missWidth = ((100.0 - config.minSearchCoverage()) * askedNodes) / 100.0 - 1.0; - double slopedWait = adaptiveTimeoutMin; - if (pendingQueries > 1 && missWidth > 0.0) { - slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth; - } - long nextAdaptive = (long) slopedWait; - if (now + nextAdaptive >= deadline) { - return deadline - now; - } - deadline = now + nextAdaptive; - - return nextAdaptive; } private List<LeanHit> mergeResult(Result result, InvokerResult partialResult, List<LeanHit> current, GroupingResultAggregator groupingResultAggregator) { - collectCoverage(partialResult.getResult().getCoverage(true)); + coverageAggregator.add(partialResult.getResult().getCoverage(true)); result.mergeWith(partialResult.getResult()); List<Hit> partialNonLean = partialResult.getResult().hits().asUnorderedHits(); @@ -265,55 +221,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM return merged; } - private void collectCoverage(Coverage source) { - answeredDocs += source.getDocs(); - answeredActiveDocs += source.getActive(); - answeredTargetActiveDocs += source.getTargetActive(); - answeredNodesParticipated += source.getNodes(); - answeredNodes++; - degradedByMatchPhase |= source.isDegradedByMatchPhase(); - timedOut |= source.isDegradedByTimeout(); - } - - private Coverage createCoverage() { - adjustDegradedCoverage(); - - Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1); - coverage.setNodesTried(askedNodes); - coverage.setTargetActive(answeredTargetActiveDocs); - int degradedReason = 0; - if (timedOut) { - degradedReason |= (adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT); - } - if (degradedByMatchPhase) { - degradedReason |= DEGRADED_BY_MATCH_PHASE; - } - coverage.setDegradedReason(degradedReason); - return coverage; - } - - private void adjustDegradedCoverage() { - if (askedNodes == answeredNodesParticipated) { - return; - } - int notAnswered = askedNodes - answeredNodesParticipated; - - if (adaptiveTimeoutCalculated && answeredNodesParticipated > 0) { - answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated); - answeredTargetActiveDocs += (notAnswered * answeredTargetActiveDocs / answeredNodesParticipated); - } else { - if (askedNodes > answeredNodesParticipated) { - int searchableCopies = (int) searchCluster.dispatchConfig().searchableCopies(); - int missingNodes = notAnswered - (searchableCopies - 1); - if (answeredNodesParticipated > 0) { - answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated); - answeredTargetActiveDocs += (missingNodes * answeredTargetActiveDocs / answeredNodesParticipated); - timedOut = true; - } - } - } - } - private void ejectInvoker(SearchInvoker invoker) { invokers.remove(invoker); invoker.release(); @@ -340,11 +247,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM } // For overriding in tests - protected long currentTime() { - return System.currentTimeMillis(); - } - - // For overriding in tests protected LinkedBlockingQueue<SearchInvoker> newQueue() { return new LinkedBlockingQueue<>(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index eb379a51ed4..b0505fc5517 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.yahoo.concurrent.MonotonicTimer; +import com.yahoo.concurrent.Timer; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; @@ -22,6 +24,7 @@ import java.util.Set; public abstract class InvokerFactory { protected final SearchCluster searchCluster; + private static final Timer timer = new MonotonicTimer(); public InvokerFactory(SearchCluster searchCluster) { this.searchCluster = searchCluster; @@ -89,7 +92,7 @@ public abstract class InvokerFactory { if (invokers.size() == 1 && failed == null) { return Optional.of(invokers.get(0)); } else { - return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster, group, failed)); + return Optional.of(new InterleavedSearchInvoker(timer, invokers, searchCluster, group, failed)); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java new file mode 100644 index 00000000000..8732cee7652 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SimpleTimeoutHandler.java @@ -0,0 +1,26 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.search.Query; + +import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT; + +/** + * Computes the timeout based solely on the query timeout + * + * @author baldersheim + */ +public class SimpleTimeoutHandler implements TimeoutHandler { + private final Query query; + SimpleTimeoutHandler(Query query) { + this.query = query; + } + @Override + public long nextTimeout(int answeredNodes) { + return query.getTimeLeft(); + } + + @Override + public int reason() { + return DEGRADED_BY_TIMEOUT; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java new file mode 100644 index 00000000000..7c86b02d368 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/TimeoutHandler.java @@ -0,0 +1,11 @@ +package com.yahoo.search.dispatch; + +/** + * Computes next timeout + * + * @author baldersheim + */ +public interface TimeoutHandler { + long nextTimeout(int answeredNodes); + int reason(); +} |