diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-11-23 15:20:13 +0100 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-11-23 15:20:13 +0100 |
commit | 231e8c1f4996672b1c1b21a109af90c95ec455df (patch) | |
tree | c2c94c6d3d80815151ad4ce3df8ffa34fdf9c92b | |
parent | 54f02674ba2edc2ddc7bf84714b6aca84f3282f9 (diff) |
Adaptive timeout support in java dispatch
17 files changed, 507 insertions, 85 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java index a1b3a0cd9c0..b3b530448fc 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/search/IndexedSearchCluster.java @@ -422,6 +422,15 @@ public class IndexedSearchCluster extends SearchCluster } builder.maxNodesDownPerGroup(rootDispatch.getMaxNodesDownPerFixedRow()); builder.useMultilevelDispatch(useMultilevelDispatchSetup()); + builder.searchableCopies(rootDispatch.getSearchableCopies()); + if (searchCoverage != null) { + if (searchCoverage.getMinimum() != null) + builder.minSearchCoverage(searchCoverage.getMinimum()); + if (searchCoverage.getMinWaitAfterCoverageFactor() != null) + builder.minWaitAfterCoverageFactor(searchCoverage.getMinWaitAfterCoverageFactor()); + if (searchCoverage.getMaxWaitAfterCoverageFactor() != null) + builder.maxWaitAfterCoverageFactor(searchCoverage.getMaxWaitAfterCoverageFactor()); + } } @Override diff --git a/configdefinitions/src/vespa/dispatch.def b/configdefinitions/src/vespa/dispatch.def index dce6098ee9b..50989c3ef74 100644 --- a/configdefinitions/src/vespa/dispatch.def +++ b/configdefinitions/src/vespa/dispatch.def @@ -19,6 +19,18 @@ distributionPolicy enum { ROUNDROBIN, ADAPTIVE } default=ROUNDROBIN # Is multi-level dispatch configured for this cluster useMultilevelDispatch bool default=false +# Number of document copies +searchableCopies long default=1 + +# Minimum search coverage required before returning the results of a query +minSearchCoverage double default=100 + +# Minimum wait time for full coverage after minimum coverage is achieved, factored based on time left at minimum coverage +minWaitAfterCoverageFactor double default=0 + +# Maximum wait time for full coverage after minimum coverage is achieved, factored based on time left at minimum coverage +maxWaitAfterCoverageFactor double default=1 + # The unique key of a search node node[].key int diff --git a/container-core/src/main/java/com/yahoo/container/handler/Coverage.java b/container-core/src/main/java/com/yahoo/container/handler/Coverage.java index 4a937068d81..84cc0734e7c 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/Coverage.java +++ b/container-core/src/main/java/com/yahoo/container/handler/Coverage.java @@ -28,9 +28,9 @@ public class Coverage { EXPLICITLY_FULL, EXPLICITLY_INCOMPLETE, DOCUMENT_COUNT; } - private final static int DEGRADED_BY_MATCH_PHASE = 1; - private final static int DEGRADED_BY_TIMEOUT = 2; - private final static int DEGRADED_BY_ADAPTIVE_TIMEOUT = 4; + public final static int DEGRADED_BY_MATCH_PHASE = 1; + public final static int DEGRADED_BY_TIMEOUT = 2; + public final static int DEGRADED_BY_ADAPTIVE_TIMEOUT = 4; /** * Build an invalid instance to initiate manually. diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java index de4d9c9fe8b..f40550f1f70 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Channel.java @@ -1,6 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.fs4.mplex; +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.Packet; +import com.yahoo.search.Query; +import com.yahoo.search.dispatch.ResponseMonitor; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -9,12 +16,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -import com.yahoo.concurrent.SystemTimer; -import com.yahoo.fs4.BasicPacket; -import com.yahoo.fs4.ChannelTimeoutException; -import com.yahoo.fs4.Packet; -import com.yahoo.search.Query; - /** * This class is used to represent a "channel" in the FS4 protocol. * A channel represents a session between a client and the fdispatch. @@ -34,6 +35,7 @@ public class FS4Channel { volatile private BlockingQueue<BasicPacket> responseQueue; private Query query; private boolean isPingChannel = false; + private ResponseMonitor<FS4Channel> monitor; /** for unit testing. do not use */ protected FS4Channel () { @@ -197,6 +199,9 @@ public class FS4Channel { throws InterruptedException, InvalidChannelException { ensureValidQ().put(packet); + if(monitor != null) { + monitor.responseAvailable(this); + } } /** @@ -241,4 +246,7 @@ public class FS4Channel { return "fs4 channel " + channelId + (isValid() ? " [valid]" : " [invalid]"); } + public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) { + this.monitor = monitor; + } } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index f5d082635ab..8fa8bdb66bf 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -50,7 +50,7 @@ public class FS4InvokerFactory { public SearchInvoker getSearchInvoker(Query query, Node node) { Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); - return new FS4SearchInvoker(searcher, query, backend.openChannel(), node); + return new FS4SearchInvoker(searcher, query, backend.openChannel(), Optional.of(node)); } /** @@ -70,14 +70,14 @@ public class FS4InvokerFactory { * list is invalid and the remaining coverage is not sufficient */ public Optional<SearchInvoker> getSearchInvoker(Query query, int groupId, List<Node> nodes, boolean acceptIncompleteCoverage) { - Map<Integer, SearchInvoker> invokers = new HashMap<>(); + List<SearchInvoker> invokers = new ArrayList<>(nodes.size()); Set<Integer> failed = null; for (Node node : nodes) { boolean nodeAdded = false; if (node.isWorking()) { Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); if (backend.probeConnection()) { - invokers.put(node.key(), new FS4SearchInvoker(searcher, query, backend.openChannel(), node)); + invokers.add(node.key(), new FS4SearchInvoker(searcher, query, backend.openChannel(), Optional.of(node))); nodeAdded = true; } } @@ -99,7 +99,7 @@ public class FS4InvokerFactory { } if (!searchCluster.isPartialGroupCoverageSufficient(groupId, success)) { if (acceptIncompleteCoverage) { - createCoverageErrorInvoker(invokers, nodes, failed); + invokers.add(createCoverageErrorInvoker(nodes, failed)); } else { return Optional.empty(); } @@ -107,13 +107,13 @@ public class FS4InvokerFactory { } if (invokers.size() == 1) { - return Optional.of(invokers.values().iterator().next()); + return Optional.of(invokers.get(0)); } else { - return Optional.of(new InterleavedSearchInvoker(invokers)); + return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster)); } } - private void createCoverageErrorInvoker(Map<Integer, SearchInvoker> invokers, List<Node> nodes, Set<Integer> failed) { + private SearchInvoker createCoverageErrorInvoker(List<Node> nodes, Set<Integer> failed) { long activeDocuments = 0; StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: "); Integer key = null; @@ -129,7 +129,8 @@ public class FS4InvokerFactory { } } Coverage coverage = new Coverage(0, activeDocuments, 0); - invokers.put(key, new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage)); + coverage.setNodesTried(1); + return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage); } public FillInvoker getFillInvoker(Query query, Node node) { diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index 98676890bdf..da32cfc4fda 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -6,14 +6,13 @@ import com.yahoo.fs4.ChannelTimeoutException; import com.yahoo.fs4.Packet; import com.yahoo.fs4.QueryPacket; import com.yahoo.fs4.QueryResultPacket; -import com.yahoo.fs4.mplex.Backend; import com.yahoo.fs4.mplex.FS4Channel; import com.yahoo.fs4.mplex.InvalidChannelException; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.dispatch.ResponseMonitor; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import java.io.IOException; @@ -30,29 +29,21 @@ import static java.util.Arrays.asList; * * @author ollivir */ -public class FS4SearchInvoker extends SearchInvoker { +public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<FS4Channel> { private final VespaBackEndSearcher searcher; private FS4Channel channel; - private final Optional<Node> node; private ErrorMessage pendingSearchError = null; private Query query = null; private QueryPacket queryPacket = null; - public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, Node node) { + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4Channel channel, Optional<Node> node) { + super(node); this.searcher = searcher; - this.node = Optional.of(node); this.channel = channel; channel.setQuery(query); - } - - // fdispatch code path - public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) { - this.searcher = searcher; - this.node = Optional.empty(); - this.channel = backend.openChannel(); - channel.setQuery(query); + channel.setResponseMonitor(this); } @Override @@ -68,6 +59,8 @@ public class FS4SearchInvoker extends SearchInvoker { this.query = query; this.queryPacket = queryPacket; + channel.setResponseMonitor(this); + try { boolean couldSend = channel.sendPacket(queryPacket); if (!couldSend) { @@ -115,7 +108,7 @@ public class FS4SearchInvoker extends SearchInvoker { searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result); - searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, node.map(Node::key)); + searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey()); Packet[] packets; CacheControl cacheControl = searcher.getCacheControl(); PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); @@ -130,7 +123,7 @@ public class FS4SearchInvoker extends SearchInvoker { } else { packets = new Packet[1]; packets[0] = resultPacket; - cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, node.map(Node::key)); + cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey()); } } return asList(result); @@ -138,10 +131,7 @@ public class FS4SearchInvoker extends SearchInvoker { private List<Result> errorResult(ErrorMessage errorMessage) { Result error = new Result(query, errorMessage); - node.ifPresent(n -> { - Coverage coverage = new Coverage(0, n.getActiveDocuments(), 0); - error.setCoverage(coverage); - }); + getErrorCoverage().ifPresent(error::setCoverage); return Arrays.asList(error); } @@ -164,4 +154,9 @@ public class FS4SearchInvoker extends SearchInvoker { private boolean isLoggingFine() { return getLogger().isLoggable(Level.FINE); } + + @Override + public void responseAvailable(FS4Channel from) { + responseAvailable(); + } } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index a98c34295ee..209f6faefa0 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -222,7 +222,7 @@ public class FastSearcher extends VespaBackEndSearcher { if(direct.isPresent()) { return fs4InvokerFactory.getSearchInvoker(query, direct.get()); } - return new FS4SearchInvoker(this, query, dispatchBackend); + return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty()); } /** @@ -284,6 +284,7 @@ public class FastSearcher extends VespaBackEndSearcher { result.hits().addAll(partialResult.hits().asUnorderedHits()); } if (finalCoverage != null) { + adjustCoverageDegradedReason(finalCoverage); result.setCoverage(finalCoverage); } @@ -301,6 +302,18 @@ public class FastSearcher extends VespaBackEndSearcher { return result; } + private void adjustCoverageDegradedReason(Coverage coverage) { + int asked = coverage.getNodesTried(); + int answered = coverage.getNodes(); + if (asked > answered) { + int searchableCopies = (int) dispatcher.searchCluster().dispatchConfig().searchableCopies(); + int missingNodes = (asked - answered) - (searchableCopies - 1); + if (missingNodes > 0) { + coverage.setDegradedReason(com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT); + } + } + } + private static @NonNull Optional<String> quotedSummaryClass(String summaryClass) { return Optional.of(summaryClass == null ? "[null]" : quote(summaryClass)); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 0382f47457e..1ca64be7924 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -20,7 +20,6 @@ import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -134,7 +133,7 @@ public class Dispatcher extends AbstractComponent { int max = Integer.min(searchCluster.orderedGroups().size(), MAX_GROUP_SELECTION_ATTEMPTS); Set<Integer> rejected = null; for (int i = 0; i < max; i++) { - Optional<Group> groupInCluster = loadBalancer.takeGroupForQuery(rejected); + Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected); if (!groupInCluster.isPresent()) { // No groups available break; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index 9ff43df87cf..83647b332e6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -5,12 +5,25 @@ import com.yahoo.fs4.QueryPacket; import com.yahoo.prelude.fastsearch.CacheKey; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.vespa.config.search.DispatchConfig; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; -import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT; +import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT; /** * InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content @@ -19,11 +32,25 @@ import java.util.Map; * * @author ollivir */ -public class InterleavedSearchInvoker extends SearchInvoker { - private final Collection<SearchInvoker> invokers; +public class InterleavedSearchInvoker extends SearchInvoker implements ResponseMonitor<SearchInvoker> { + private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName()); + + private final Set<SearchInvoker> invokers; + private final SearchCluster searchCluster; + private final LinkedBlockingQueue<SearchInvoker> availableForProcessing; + private Query query; + + private boolean adaptiveTimeoutCalculated = false; + private long adaptiveTimeoutMin = 0; + private long adaptiveTimeoutMax = 0; + private long deadline = 0; - public InterleavedSearchInvoker(Map<Integer, SearchInvoker> invokers) { - this.invokers = new ArrayList<>(invokers.values()); + public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, SearchCluster searchCluster) { + super(Optional.empty()); + this.invokers = Collections.newSetFromMap(new IdentityHashMap<>()); + this.invokers.addAll(invokers); + this.searchCluster = searchCluster; + this.availableForProcessing = newQueue(); } /** @@ -33,27 +60,109 @@ public class InterleavedSearchInvoker extends SearchInvoker { */ @Override protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { + this.query = query; + invokers.forEach(invoker -> invoker.setMonitor(this)); + deadline = currentTime() + query.getTimeLeft(); + int originalHits = query.getHits(); int originalOffset = query.getOffset(); query.setHits(query.getHits() + query.getOffset()); query.setOffset(0); + for (SearchInvoker invoker : invokers) { invoker.sendSearchRequest(query, null); } + query.setHits(originalHits); query.setOffset(originalOffset); } @Override protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { + int requests = invokers.size(); + int responses = 0; List<Result> results = new ArrayList<>(); - for (SearchInvoker invoker : invokers) { - results.addAll(invoker.getSearchResults(cacheKey)); + long nextTimeout = query.getTimeLeft(); + try { + while (!invokers.isEmpty() && nextTimeout >= 0) { + SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS); + if (invoker == null) { + if (log.isLoggable(Level.FINE)) { + log.fine("Search timed out with " + requests + " requests made, " + responses + " responses received"); + } + break; + } else { + invokers.remove(invoker); + results.addAll(invoker.getSearchResults(cacheKey)); + responses++; + } + nextTimeout = nextTimeout(requests, responses); + } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for search results", e); } + + insertTimeoutErrors(results); return results; } + private void insertTimeoutErrors(List<Result> results) { + int degradedReason = adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT; + + for (SearchInvoker invoker : invokers) { + Optional<Integer> dk = invoker.distributionKey(); + String message; + if (dk.isPresent()) { + message = "Backend communication timeout on node with distribution-key " + dk.get(); + } else { + message = "Backend communication timeout"; + } + Result error = new Result(query, ErrorMessage.createBackendCommunicationError(message)); + invoker.getErrorCoverage().ifPresent(coverage -> { + coverage.setDegradedReason(degradedReason); + error.setCoverage(coverage); + }); + results.add(error); + } + } + + private long nextTimeout(int requests, int responses) { + DispatchConfig config = searchCluster.dispatchConfig(); + double minimumCoverage = config.minSearchCoverage(); + + if (requests == responses || minimumCoverage >= 100.0) { + return query.getTimeLeft(); + } + int minimumResponses = (int) (requests * minimumCoverage / 100.0); + + if (responses < minimumResponses) { + return query.getTimeLeft(); + } + + long timeLeft = query.getTimeLeft(); + if (!adaptiveTimeoutCalculated) { + adaptiveTimeoutMin = (long) (timeLeft * config.minWaitAfterCoverageFactor()); + adaptiveTimeoutMax = (long) (timeLeft * config.maxWaitAfterCoverageFactor()); + adaptiveTimeoutCalculated = true; + } + + long now = currentTime(); + int pendingQueries = requests - responses; + double missWidth = ((100.0 - config.minSearchCoverage()) * requests) / 100.0 - 1.0; + double slopedWait = adaptiveTimeoutMin; + if (pendingQueries > 1 && missWidth > 0.0) { + slopedWait += ((adaptiveTimeoutMax - adaptiveTimeoutMin) * (pendingQueries - 1)) / missWidth; + } + long nextAdaptive = (long) slopedWait; + if (now + nextAdaptive >= deadline) { + return deadline - now; + } + deadline = now + nextAdaptive; + + return nextAdaptive; + } + @Override protected void release() { if (!invokers.isEmpty()) { @@ -61,4 +170,26 @@ public class InterleavedSearchInvoker extends SearchInvoker { invokers.clear(); } } + + @Override + public void responseAvailable(SearchInvoker from) { + if (availableForProcessing != null) { + availableForProcessing.add(from); + } + } + + @Override + protected void setMonitor(ResponseMonitor<SearchInvoker> monitor) { + // never to be called + } + + // For overriding in tests + protected long currentTime() { + return System.currentTimeMillis(); + } + + // For overriding in tests + protected LinkedBlockingQueue<SearchInvoker> newQueue() { + return new LinkedBlockingQueue<>(); + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 222ae6a4ea0..df6384cf81c 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -1,7 +1,6 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.search.Query; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.SearchCluster; @@ -45,13 +44,13 @@ public class LoadBalancer { } /** - * Select and allocate the search cluster group which is to be used for the provided query. Callers <b>must</b> call + * Select and allocate the search cluster group which is to be used for the next search query. Callers <b>must</b> call * {@link #releaseGroup} symmetrically for each taken allocation. * * @param rejectedGroups if not null, the load balancer will only return groups with IDs not in the set * @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used */ - public Optional<Group> takeGroupForQuery(Set<Integer> rejectedGroups) { + public Optional<Group> takeGroup(Set<Integer> rejectedGroups) { if (scoreboard == null) { return Optional.empty(); } @@ -60,7 +59,7 @@ public class LoadBalancer { } /** - * Release an allocation given by {@link #takeGroupForQuery}. The release must be done exactly once for each allocation. + * Release an allocation given by {@link #takeGroup}. The release must be done exactly once for each allocation. * * @param group * previously allocated group diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/ResponseMonitor.java b/container-search/src/main/java/com/yahoo/search/dispatch/ResponseMonitor.java new file mode 100644 index 00000000000..c2e81d43677 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/ResponseMonitor.java @@ -0,0 +1,13 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +/** + * Classes implementing ResponseMonitor can be informed by monitored objects + * that a response is available for processing. The responseAvailable method + * must be thread-safe. + * + * @author ollivir + */ +public interface ResponseMonitor<T> { + void responseAvailable(T from); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java index d5c505aa31b..01da3c20745 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchErrorInvoker.java @@ -11,6 +11,7 @@ import com.yahoo.search.result.ErrorMessage; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Optional; /** * A search invoker that will immediately produce an error that occurred during @@ -23,8 +24,10 @@ public class SearchErrorInvoker extends SearchInvoker { private final ErrorMessage message; private Query query; private final Coverage coverage; + private ResponseMonitor<SearchInvoker> monitor; public SearchErrorInvoker(ErrorMessage message, Coverage coverage) { + super(Optional.empty()); this.message = message; this.coverage = coverage; } @@ -36,6 +39,9 @@ public class SearchErrorInvoker extends SearchInvoker { @Override protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { this.query = query; + if(monitor != null) { + monitor.responseAvailable(this); + } } @Override @@ -52,4 +58,8 @@ public class SearchErrorInvoker extends SearchInvoker { // nothing to do } + @Override + protected void setMonitor(ResponseMonitor<SearchInvoker> monitor) { + this.monitor = monitor; + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java index 53e09823f32..2691b32d631 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java @@ -5,9 +5,12 @@ import com.yahoo.fs4.QueryPacket; import com.yahoo.prelude.fastsearch.CacheKey; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.result.Coverage; import java.io.IOException; import java.util.List; +import java.util.Optional; /** * SearchInvoker encapsulates an allocated connection for running a single search query. @@ -16,6 +19,13 @@ import java.util.List; * @author ollivir */ public abstract class SearchInvoker extends CloseableInvoker { + private final Optional<Node> node; + private ResponseMonitor<SearchInvoker> monitor; + + protected SearchInvoker(Optional<Node> node) { + this.node = node; + } + /** * Retrieve the hits for the given {@link Query}. The invoker may return more than one result, in which case the caller is responsible * for merging the results. If multiple results are returned and the search query had a hit offset other than zero, that offset is @@ -29,4 +39,26 @@ public abstract class SearchInvoker extends CloseableInvoker { protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException; protected abstract List<Result> getSearchResults(CacheKey cacheKey) throws IOException; + + protected void setMonitor(ResponseMonitor<SearchInvoker> monitor) { + this.monitor = monitor; + } + + protected void responseAvailable() { + if(monitor != null) { + monitor.responseAvailable(this); + } + } + + protected Optional<Integer> distributionKey() { + return node.map(Node::key); + } + + protected Optional<Coverage> getErrorCoverage() { + if(node.isPresent()) { + return Optional.of(new Coverage(0, node.get().getActiveDocuments(), 0)); + } else { + return Optional.empty(); + } + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index b8d76906f70..8e278f78d7a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -36,10 +36,7 @@ public class SearchCluster implements NodeManager<Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); - /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ - private final double minActivedocsCoveragePercentage; - private final double minGroupCoverage; - private final int maxNodesDownPerGroup; + private final DispatchConfig dispatchConfig; private final int size; private final String clusterId; private final ImmutableMap<Integer, Group> groups; @@ -62,20 +59,14 @@ public class SearchCluster implements NodeManager<Node> { private final FS4ResourcePool fs4ResourcePool; public SearchCluster(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { - this(clusterId, dispatchConfig.minActivedocsPercentage(), dispatchConfig.minGroupCoverage(), dispatchConfig.maxNodesDownPerGroup(), - toNodes(dispatchConfig), fs4ResourcePool, containerClusterSize, vipStatus); - } - - public SearchCluster(String clusterId, double minActivedocsCoverage, double minGroupCoverage, int maxNodesDownPerGroup, List<Node> nodes, FS4ResourcePool fs4ResourcePool, - int containerClusterSize, VipStatus vipStatus) { this.clusterId = clusterId; - this.minActivedocsCoveragePercentage = minActivedocsCoverage; - this.minGroupCoverage = minGroupCoverage; - this.maxNodesDownPerGroup = maxNodesDownPerGroup; - this.size = nodes.size(); + this.dispatchConfig = dispatchConfig; + this.size = dispatchConfig.node().size(); this.fs4ResourcePool = fs4ResourcePool; this.vipStatus = vipStatus; + List<Node> nodes = toNodes(dispatchConfig); + // Create groups ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { @@ -143,6 +134,10 @@ public class SearchCluster implements NodeManager<Node> { return nodesBuilder.build(); } + public DispatchConfig dispatchConfig() { + return dispatchConfig; + } + /** Returns the number of nodes in this cluster (across all groups) */ public int size() { return size; } @@ -286,7 +281,7 @@ public class SearchCluster implements NodeManager<Node> { if (averageDocumentsInOtherGroups > 0) { double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; - sufficientCoverage = coverage >= minActivedocsCoveragePercentage; + sufficientCoverage = coverage >= dispatchConfig.minActivedocsPercentage(); } if (sufficientCoverage) { sufficientCoverage = isGroupNodeCoverageSufficient(nodes); @@ -302,7 +297,8 @@ public class SearchCluster implements NodeManager<Node> { } } int numNodes = nodes.size(); - int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) numNodes * (100.0 - minGroupCoverage)) / 100.0); + int nodesAllowedDown = dispatchConfig.maxNodesDownPerGroup() + + (int) (((double) numNodes * (100.0 - dispatchConfig.minGroupCoverage())) / 100.0); return nodesUp + nodesAllowedDown >= numNodes; } @@ -325,7 +321,7 @@ public class SearchCluster implements NodeManager<Node> { */ public boolean isPartialGroupCoverageSufficient(int groupId, List<Node> nodes) { if (orderedGroups.size() == 1) { - return nodes.size() >= groupSize() - maxNodesDownPerGroup; + return nodes.size() >= groupSize() - dispatchConfig.maxNodesDownPerGroup(); } long sumOfActiveDocuments = 0; int otherGroups = 0; diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java new file mode 100644 index 00000000000..69458f25f93 --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java @@ -0,0 +1,180 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.fastsearch.CacheKey; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.test.ManualClock; +import org.junit.Test; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static com.yahoo.search.dispatch.MockSearchCluster.createDispatchConfig; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author ollivir + */ +public class InterleavedSearchInvokerTest { + private ManualClock clock = new ManualClock(Instant.now()); + private Query query = new TestQuery(); + private LinkedList<Event> expectedEvents = new LinkedList<>(); + private List<SearchInvoker> invokers = new ArrayList<>(); + + @Test + public void requireThatAdaptiveTimeoutsAreNotUsedWithFullCoverageRequirement() throws IOException { + SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(100.0), 1, 3); + SearchInvoker invoker = createInterleavedInvoker(cluster, 3); + + expectedEvents.add(new Event(5000, 100, 0)); + expectedEvents.add(new Event(4900, 100, 1)); + expectedEvents.add(new Event(4800, 100, 2)); + + invoker.search(query, null, null); + + assertTrue("All test scenario events processed", expectedEvents.isEmpty()); + } + + @Test + public void requireThatTimeoutsAreNotMarkedAsAdaptive() throws IOException { + SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(100.0), 1, 3); + SearchInvoker invoker = createInterleavedInvoker(cluster, 3); + + expectedEvents.add(new Event(5000, 300, 0)); + expectedEvents.add(new Event(4700, 300, 1)); + expectedEvents.add(null); + + List<Result> results = invoker.search(query, null, null); + + assertTrue("All test scenario events processed", expectedEvents.isEmpty()); + assertNotNull("Last invoker is marked as an error", results.get(2).hits().getErrorHit()); + assertTrue("Timed out invoker is a normal timeout", results.get(2).getCoverage(false).isDegradedByTimeout()); + } + + @Test + public void requireThatAdaptiveTimeoutDecreasesTimeoutWhenCoverageIsReached() throws IOException { + SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(50.0), 1, 4); + SearchInvoker invoker = createInterleavedInvoker(cluster, 4); + + expectedEvents.add(new Event(5000, 100, 0)); + expectedEvents.add(new Event(4900, 100, 1)); + expectedEvents.add(new Event(2400, 100, 2)); + expectedEvents.add(new Event(0, 0, null)); + + List<Result> results = invoker.search(query, null, null); + + assertTrue("All test scenario events processed", expectedEvents.isEmpty()); + assertNotNull("Last invoker is marked as an error", results.get(3).hits().getErrorHit()); + assertTrue("Timed out invoker is an adaptive timeout", results.get(3).getCoverage(false).isDegradedByAdapativeTimeout()); + } + + private InterleavedSearchInvoker createInterleavedInvoker(SearchCluster searchCluster, int numInvokers) { + for (int i = 0; i < numInvokers; i++) { + invokers.add(new TestInvoker()); + } + + return new InterleavedSearchInvoker(invokers, searchCluster) { + @Override + protected long currentTime() { + return clock.millis(); + } + + @Override + protected LinkedBlockingQueue<SearchInvoker> newQueue() { + return new LinkedBlockingQueue<SearchInvoker>() { + @Override + public SearchInvoker poll(long timeout, TimeUnit timeUnit) throws InterruptedException { + assertFalse(expectedEvents.isEmpty()); + Event ev = expectedEvents.removeFirst(); + if (ev == null) { + return null; + } else { + return ev.process(query, timeout); + } + } + }; + } + }; + } + + private class Event { + Long expectedTimeout; + long delay; + Integer invokerIndex; + + public Event(Integer expectedTimeout, int delay, Integer invokerIndex) { + this.expectedTimeout = (long) expectedTimeout; + this.delay = delay; + this.invokerIndex = invokerIndex; + } + + public SearchInvoker process(Query query, long currentTimeout) { + if (expectedTimeout != null) { + assertEquals("Expecting timeout to be " + expectedTimeout, (long) expectedTimeout, currentTimeout); + } + clock.advance(Duration.ofMillis(delay)); + if (query.getTimeLeft() < 0) { + fail("Test sequence ran out of time window"); + } + if (invokerIndex == null) { + return null; + } else { + return invokers.get(invokerIndex); + } + } + } + + private class TestInvoker extends SearchInvoker { + protected TestInvoker() { + super(Optional.of(new Node(42, "?", 0, 0))); + } + + @Override + protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { + } + + @Override + protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { + return Collections.singletonList(new Result(query)); + } + + @Override + protected void release() { + } + } + + public class TestQuery extends Query { + private long start = clock.millis(); + + public TestQuery() { + super(); + setTimeout(5000); + } + + @Override + public long getStartTime() { + return start; + } + + @Override + public long getDurationTime() { + return clock.millis() - start; + } + } +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java index 38a753360d8..c056423a9c4 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -7,9 +7,9 @@ import com.yahoo.search.dispatch.searchcluster.SearchCluster; import junit.framework.AssertionFailedError; import org.junit.Test; -import java.util.Arrays; import java.util.Optional; +import static com.yahoo.search.dispatch.MockSearchCluster.createDispatchConfig; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -22,10 +22,10 @@ public class LoadBalancerTest { @Test public void requreThatLoadBalancerServesSingleNodeSetups() { Node n1 = new Node(0, "test-node1", 0, 0); - SearchCluster cluster = new SearchCluster("a", 88.0, 99.0, 0, Arrays.asList(n1), null, 1, null); + SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); - Optional<Group> grp = lb.takeGroupForQuery(null); + Optional<Group> grp = lb.takeGroup(null); Group group = grp.orElseGet(() -> { throw new AssertionFailedError("Expected a SearchCluster.Group"); }); @@ -36,10 +36,10 @@ public class LoadBalancerTest { public void requreThatLoadBalancerServesMultiGroupSetups() { Node n1 = new Node(0, "test-node1", 0, 0); Node n2 = new Node(1, "test-node2", 1, 1); - SearchCluster cluster = new SearchCluster("a", 88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); + SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); - Optional<Group> grp = lb.takeGroupForQuery(null); + Optional<Group> grp = lb.takeGroup(null); Group group = grp.orElseGet(() -> { throw new AssertionFailedError("Expected a SearchCluster.Group"); }); @@ -52,10 +52,10 @@ public class LoadBalancerTest { Node n2 = new Node(1, "test-node2", 1, 0); Node n3 = new Node(0, "test-node3", 0, 1); Node n4 = new Node(1, "test-node4", 1, 1); - SearchCluster cluster = new SearchCluster("a", 88.0, 99.0, 0, Arrays.asList(n1, n2, n3, n4), null, 2, null); + SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2, n3, n4), null, 2, null); LoadBalancer lb = new LoadBalancer(cluster, true); - Optional<Group> grp = lb.takeGroupForQuery(null); + Optional<Group> grp = lb.takeGroup(null); assertThat(grp.isPresent(), is(true)); } @@ -63,18 +63,18 @@ public class LoadBalancerTest { public void requreThatLoadBalancerReturnsDifferentGroups() { Node n1 = new Node(0, "test-node1", 0, 0); Node n2 = new Node(1, "test-node2", 1, 1); - SearchCluster cluster = new SearchCluster("a", 88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); + SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); // get first group - Optional<Group> grp = lb.takeGroupForQuery(null); + Optional<Group> grp = lb.takeGroup(null); Group group = grp.get(); int id1 = group.id(); // release allocation lb.releaseGroup(group); // get second group - grp = lb.takeGroupForQuery(null); + grp = lb.takeGroup(null); group = grp.get(); assertThat(group.id(), not(equalTo(id1))); } @@ -83,16 +83,16 @@ public class LoadBalancerTest { public void requreThatLoadBalancerReturnsGroupWithShortestQueue() { Node n1 = new Node(0, "test-node1", 0, 0); Node n2 = new Node(1, "test-node2", 1, 1); - SearchCluster cluster = new SearchCluster("a", 88.0, 99.0, 0, Arrays.asList(n1, n2), null, 1, null); + SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); // get first group - Optional<Group> grp = lb.takeGroupForQuery(null); + Optional<Group> grp = lb.takeGroup(null); Group group = grp.get(); int id1 = group.id(); // get second group - grp = lb.takeGroupForQuery(null); + grp = lb.takeGroup(null); group = grp.get(); int id2 = group.id(); assertThat(id2, not(equalTo(id1))); @@ -100,7 +100,7 @@ public class LoadBalancerTest { lb.releaseGroup(group); // get third group - grp = lb.takeGroupForQuery(null); + grp = lb.takeGroup(null); group = grp.get(); assertThat(group.id(), equalTo(id2)); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index fc505097472..f7b92419b52 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -6,9 +6,9 @@ import com.google.common.collect.ImmutableMultimap; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.vespa.config.search.DispatchConfig; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; @@ -22,7 +22,11 @@ public class MockSearchCluster extends SearchCluster { private final ImmutableMultimap<String, Node> nodesByHost; public MockSearchCluster(String clusterId, int groups, int nodesPerGroup) { - super(clusterId, 100, 100, 0, Collections.emptyList(), null, 1, null); + this(clusterId, createDispatchConfig(), groups, nodesPerGroup); + } + + public MockSearchCluster(String clusterId, DispatchConfig dispatchConfig, int groups, int nodesPerGroup) { + super(clusterId, dispatchConfig, null, 1, null); ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder(); ImmutableMultimap.Builder<String, Node> hostBuilder = ImmutableMultimap.builder(); @@ -58,7 +62,7 @@ public class MockSearchCluster extends SearchCluster { } public Optional<Group> group(int n) { - if(n < numGroups) { + if (n < numGroups) { return Optional.of(groups.get(n)); } else { return Optional.empty(); @@ -80,4 +84,24 @@ public class MockSearchCluster extends SearchCluster { public void failed(Node node) { node.setWorking(false); } + + public static DispatchConfig createDispatchConfig(Node... nodes) { + return createDispatchConfig(100.0, nodes); + } + + public static DispatchConfig createDispatchConfig(double minSearchCoverage, Node... nodes) { + DispatchConfig.Builder builder = new DispatchConfig.Builder(); + builder.minActivedocsPercentage(88.0); + builder.minGroupCoverage(99.0); + builder.maxNodesDownPerGroup(0); + builder.minSearchCoverage(minSearchCoverage); + if(minSearchCoverage < 100.0) { + builder.minWaitAfterCoverageFactor(0); + builder.maxWaitAfterCoverageFactor(0.5); + } + for (Node n : nodes) { + builder.node(new DispatchConfig.Node.Builder().key(n.key()).host(n.hostname()).port(n.fs4port()).group(n.group())); + } + return new DispatchConfig(builder); + } } |