diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-08 09:41:10 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-08 14:43:42 +0000 |
commit | 3f3021757fbfa2c1002c0bf3055a734a374fc8bd (patch) | |
tree | 56680ae3fdd437b141ece35a3be89f6b91699da9 /container-search/src/main/java/com/yahoo | |
parent | 6d60d82bebd617880b2737d9f549b8402a1e1f24 (diff) |
Revert "Revert "Disable topk optimisation on dispatch when content distribution is se…""
Diffstat (limited to 'container-search/src/main/java/com/yahoo')
4 files changed, 40 insertions, 22 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index d8fb7b46440..c60e1bf39cb 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -43,6 +43,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private final SearchCluster searchCluster; private final LinkedBlockingQueue<SearchInvoker> availableForProcessing; private final Set<Integer> alreadyFailedNodes; + private final boolean isContentWellBalanced; private Query query; private boolean adaptiveTimeoutCalculated = false; @@ -59,13 +60,14 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private boolean timedOut = false; private boolean degradedByMatchPhase = false; - public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, SearchCluster searchCluster, Set<Integer> alreadyFailedNodes) { + public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, boolean isContentWellBalanced, SearchCluster searchCluster, Set<Integer> alreadyFailedNodes) { super(Optional.empty()); this.invokers = Collections.newSetFromMap(new IdentityHashMap<>()); this.invokers.addAll(invokers); this.searchCluster = searchCluster; this.availableForProcessing = newQueue(); this.alreadyFailedNodes = alreadyFailedNodes; + this.isContentWellBalanced = isContentWellBalanced; } /** @@ -82,10 +84,13 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM int originalHits = query.getHits(); int originalOffset = query.getOffset(); int neededHits = originalHits + originalOffset; - Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability); - int q = (topkProbabilityOverrride != null) - ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride) - : searchCluster.estimateHitsToFetch(neededHits, invokers.size()); + int q = neededHits; + if (isContentWellBalanced) { + Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability); + q = (topkProbabilityOverrride != null) + ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride) + : searchCluster.estimateHitsToFetch(neededHits, invokers.size()); + } query.setHits(q); query.setOffset(0); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 03160e6c9c7..f65e0e43757 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -46,12 +46,12 @@ public abstract class InvokerFactory { * @return the invoker or empty if some node in the * list is invalid and the remaining coverage is not sufficient */ - public Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, - Query query, - OptionalInt groupId, - List<Node> nodes, - boolean acceptIncompleteCoverage, - int maxHits) { + Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, + Query query, + OptionalInt groupId, + List<Node> nodes, + boolean acceptIncompleteCoverage, + int maxHits) { List<SearchInvoker> invokers = new ArrayList<>(nodes.size()); Set<Integer> failed = null; for (Node node : nodes) { @@ -90,7 +90,7 @@ public abstract class InvokerFactory { if (invokers.size() == 1 && failed == null) { return Optional.of(invokers.get(0)); } else { - return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster, failed)); + return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster.isGroupWellBalanced(groupId), searchCluster, failed)); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index ec616a18e09..23255fff330 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -22,6 +22,8 @@ public class Group { private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false); + private final AtomicBoolean isContentWellBalanced = new AtomicBoolean(true); + private final static double MAX_UNBALANCE = 0.10; // If documents on a node is more than 10% off from the average the group is unbalanced public Group(int id, List<Node> nodes) { this.id = id; @@ -53,25 +55,30 @@ public class Group { } public int workingNodes() { - int nodesUp = 0; - for (Node node : nodes) { - if (node.isWorking() == Boolean.TRUE) { - nodesUp++; - } - } - return nodesUp; + return (int) nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).count(); } void aggregateNodeValues() { - activeDocuments.set(nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum()); - isBlockingWrites.set(nodes.stream().anyMatch(node -> node.isBlockingWrites())); + long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum(); + activeDocuments.set(activeDocs); + isBlockingWrites.set(nodes.stream().anyMatch(Node::isBlockingWrites)); + int numWorkingNodes = workingNodes(); + if (numWorkingNodes > 0) { + long average = activeDocs / numWorkingNodes; + long deviation = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); + isContentWellBalanced.set(deviation <= (activeDocs * MAX_UNBALANCE)); + } else { + isContentWellBalanced.set(true); + } + } - /** Returns the active documents on this node. If unknown, 0 is returned. */ + /** Returns the active documents on this group. If unknown, 0 is returned. */ long getActiveDocuments() { return activeDocuments.get(); } /** Returns whether any node in this group is currently blocking write operations */ public boolean isBlockingWrites() { return isBlockingWrites.get(); } + public boolean isContentWellBalanced() { return isContentWellBalanced.get(); } public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) { boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 2f62b07ac04..56165ec150b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -368,6 +368,12 @@ public class SearchCluster implements NodeManager<Node> { return workingNodes + nodesAllowedDown >= nodesInGroup; } + public boolean isGroupWellBalanced(OptionalInt groupId) { + if (groupId.isEmpty()) return false; + Group group = groups().get(groupId); + return (group != null) && group.isContentWellBalanced(); + } + /** * Calculate whether a subset of nodes in a group has enough coverage */ |