summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java12
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java45
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java25
6 files changed, 50 insertions, 51 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 9b92a78a7c9..af731e3ade0 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -186,8 +186,8 @@ public class Dispatcher extends AbstractComponent {
if (nodes.isEmpty()) return Optional.empty();
query.trace(false, 2, "Dispatching with search path ", searchPath);
- return invokerFactory.createSearchInvoker(searcher, query,
- OptionalInt.empty(),
+ return invokerFactory.createSearchInvoker(searcher,
+ query,
nodes,
true,
maxHitsPerNode);
@@ -203,7 +203,6 @@ public class Dispatcher extends AbstractComponent {
query.trace(false, 2, "Dispatching to ", node);
return invokerFactory.createSearchInvoker(searcher,
query,
- OptionalInt.empty(),
List.of(node),
true,
maxHitsPerNode)
@@ -222,7 +221,6 @@ public class Dispatcher extends AbstractComponent {
boolean acceptIncompleteCoverage = (i == max - 1);
Optional<SearchInvoker> invoker = invokerFactory.createSearchInvoker(searcher,
query,
- OptionalInt.of(group.id()),
group.nodes(),
acceptIncompleteCoverage,
maxHitsPerNode);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index adf7368faa2..fb04c8299e9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -3,6 +3,7 @@ package com.yahoo.search.dispatch;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
@@ -41,9 +42,9 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private final Set<SearchInvoker> invokers;
private final SearchCluster searchCluster;
+ private final Group group;
private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
private final Set<Integer> alreadyFailedNodes;
- private final boolean isContentWellBalanced;
private Query query;
private boolean adaptiveTimeoutCalculated = false;
@@ -60,14 +61,17 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private boolean timedOut = false;
private boolean degradedByMatchPhase = false;
- public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, boolean isContentWellBalanced, SearchCluster searchCluster, Set<Integer> alreadyFailedNodes) {
+ public InterleavedSearchInvoker(Collection<SearchInvoker> invokers,
+ SearchCluster searchCluster,
+ Group group,
+ Set<Integer> alreadyFailedNodes) {
super(Optional.empty());
this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
this.invokers.addAll(invokers);
this.searchCluster = searchCluster;
+ this.group = group;
this.availableForProcessing = newQueue();
this.alreadyFailedNodes = alreadyFailedNodes;
- this.isContentWellBalanced = isContentWellBalanced;
}
/**
@@ -85,7 +89,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
int originalOffset = query.getOffset();
int neededHits = originalHits + originalOffset;
int q = neededHits;
- if (isContentWellBalanced) {
+ if (group.isBalanced() && !group.isSparse()) {
Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability);
q = (topkProbabilityOverrride != null)
? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride)
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index 1de274ce6cf..e602afadcfb 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -4,6 +4,7 @@ package com.yahoo.search.dispatch;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
@@ -13,7 +14,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
-import java.util.OptionalInt;
import java.util.Set;
/**
@@ -39,8 +39,7 @@ public abstract class InvokerFactory {
*
* @param searcher the searcher processing the query
* @param query the search query being processed
- * @param groupId the id of the node group to which the nodes belong
- * @param nodes pre-selected list of content nodes
+ * @param nodes pre-selected list of content nodes, all in a group or a subset of a group
* @param acceptIncompleteCoverage if some of the nodes are unavailable and this parameter is
* false, verify that the remaining set of nodes has sufficient coverage
* @return the invoker or empty if some node in the
@@ -48,10 +47,10 @@ public abstract class InvokerFactory {
*/
Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher,
Query query,
- OptionalInt groupId,
List<Node> nodes,
boolean acceptIncompleteCoverage,
int maxHits) {
+ Group group = searchCluster.group(nodes.get(0).group()).get(); // Nodes must be of the same group
List<SearchInvoker> invokers = new ArrayList<>(nodes.size());
Set<Integer> failed = null;
for (Node node : nodes) {
@@ -90,7 +89,7 @@ public abstract class InvokerFactory {
if (invokers.size() == 1 && failed == null) {
return Optional.of(invokers.get(0));
} else {
- return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster.isGroupWellBalanced(groupId), searchCluster, failed));
+ return Optional.of(new InterleavedSearchInvoker(invokers, searchCluster, group, failed));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java
index f6480f80c01..b29c3297aea 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchPath.java
@@ -41,7 +41,7 @@ public class SearchPath {
if (sp.isPresent()) {
return sp.get().mapToNodes(cluster);
} else {
- return Collections.emptyList();
+ return List.of();
}
}
@@ -75,7 +75,7 @@ public class SearchPath {
private List<Node> mapToNodes(SearchCluster cluster) {
if (cluster.groups().isEmpty()) {
- return Collections.emptyList();
+ return List.of();
}
Group selectedGroup = selectGroup(cluster);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index 7faad9d51cc..727fb64faef 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -16,16 +16,17 @@ import java.util.logging.Logger;
*/
public class Group {
+ private static final Logger log = Logger.getLogger(Group.class.getName());
+ private final static double maxContentSkew = 0.10; // If documents on a node is more than 10% off from the average the group is unbalanced
+ private final static int minDocsPerNodeToRequireLowSkew = 100;
+
private final int id;
private final ImmutableList<Node> nodes;
-
private final AtomicBoolean hasSufficientCoverage = new AtomicBoolean(true);
private final AtomicBoolean hasFullCoverage = new AtomicBoolean(true);
private final AtomicLong activeDocuments = new AtomicLong(0);
private final AtomicBoolean isBlockingWrites = new AtomicBoolean(false);
- private final AtomicBoolean isContentWellBalanced = new AtomicBoolean(true);
- private final static double MAX_UNBALANCE = 0.10; // If documents on a node is more than 10% off from the average the group is unbalanced
- private static final Logger log = Logger.getLogger(Group.class.getName());
+ private final AtomicBoolean isBalanced = new AtomicBoolean(true);
public Group(int id, List<Node> nodes) {
this.id = id;
@@ -60,37 +61,43 @@ public class Group {
return (int) nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).count();
}
- void aggregateNodeValues() {
+ public void aggregateNodeValues() {
long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum();
activeDocuments.set(activeDocs);
isBlockingWrites.set(nodes.stream().anyMatch(Node::isBlockingWrites));
int numWorkingNodes = workingNodes();
if (numWorkingNodes > 0) {
long average = activeDocs / numWorkingNodes;
- long deviation = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
- boolean isDeviationSmall = deviation <= maxUnbalance(activeDocs);
- if ((!isContentWellBalanced.get() || isDeviationSmall != isContentWellBalanced.get()) && (activeDocs > 0)) {
- log.info("Content is " + (isDeviationSmall ? "" : "not ") + "well balanced. Current deviation = " + deviation*100/activeDocs + " %" +
- ". activeDocs = " + activeDocs + ", deviation = " + deviation + ", average = " + average);
- isContentWellBalanced.set(isDeviationSmall);
+ long skew = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
+ boolean balanced = skew <= activeDocs * maxContentSkew;
+ if (!isBalanced.get() || balanced != isBalanced.get()) {
+ if (!isSparse())
+ log.info("Content is " + (balanced ? "" : "not ") + "well balanced. Current deviation = " +
+ skew * 100 / activeDocs + " %. activeDocs = " + activeDocs + ", skew = " + skew +
+ ", average = " + average);
+ isBalanced.set(balanced);
}
} else {
- isContentWellBalanced.set(true);
+ isBalanced.set(true);
}
}
- double maxUnbalance(long activeDocs) {
- return Math.max(1, activeDocs * MAX_UNBALANCE);
- }
-
/** Returns the active documents on this group. If unknown, 0 is returned. */
- long getActiveDocuments() { return activeDocuments.get(); }
+ long activeDocuments() { return activeDocuments.get(); }
/** Returns whether any node in this group is currently blocking write operations */
public boolean isBlockingWrites() { return isBlockingWrites.get(); }
- public boolean isContentWellBalanced() { return isContentWellBalanced.get(); }
- public boolean isFullCoverageStatusChanged(boolean hasFullCoverageNow) {
+ /** Returns whether the nodes in the group have about the same number of documents */
+ public boolean isBalanced() { return isBalanced.get(); }
+
+ /** Returns whether this group has too few documents per node to expect it to be balanced */
+ public boolean isSparse() {
+ if (nodes.isEmpty()) return false;
+ return activeDocuments.get() / nodes.size() < minDocsPerNodeToRequireLowSkew;
+ }
+
+ public boolean fullCoverageStatusChanged(boolean hasFullCoverageNow) {
boolean previousState = hasFullCoverage.getAndSet(hasFullCoverageNow);
return previousState != hasFullCoverageNow;
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 9ae25518969..54d5dfc91af 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -14,13 +14,10 @@ import com.yahoo.search.cluster.NodeManager;
import com.yahoo.search.dispatch.TopKEstimator;
import com.yahoo.vespa.config.search.DispatchConfig;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.OptionalInt;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -311,9 +308,9 @@ public class SearchCluster implements NodeManager<Node> {
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
updateSufficientCoverage(group, true);
- boolean sufficientCoverage = isGroupCoverageSufficient(group.getActiveDocuments(),
- group.getActiveDocuments());
- trackGroupCoverageChanges(group, sufficientCoverage, group.getActiveDocuments());
+ boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(),
+ group.activeDocuments());
+ trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments());
}
private void pingIterationCompletedMultipleGroups() {
@@ -321,7 +318,7 @@ public class SearchCluster implements NodeManager<Node> {
long medianDocuments = medianDocumentsPerGroup();
boolean anyGroupsSufficientCoverage = false;
for (Group group : orderedGroups()) {
- boolean sufficientCoverage = isGroupCoverageSufficient(group.getActiveDocuments(),
+ boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(),
medianDocuments);
anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage;
updateSufficientCoverage(group, sufficientCoverage);
@@ -331,7 +328,7 @@ public class SearchCluster implements NodeManager<Node> {
private long medianDocumentsPerGroup() {
if (orderedGroups().isEmpty()) return 0;
- var activeDocuments = orderedGroups().stream().map(Group::getActiveDocuments).collect(Collectors.toList());
+ var activeDocuments = orderedGroups().stream().map(Group::activeDocuments).collect(Collectors.toList());
return (long)Quantiles.median().compute(activeDocuments);
}
@@ -357,12 +354,6 @@ public class SearchCluster implements NodeManager<Node> {
return true;
}
- public boolean isGroupWellBalanced(OptionalInt groupId) {
- if (groupId.isEmpty()) return false;
- Group group = groups().get(groupId.getAsInt());
- return (group != null) && group.isContentWellBalanced();
- }
-
/**
* Calculate whether a subset of nodes in a group has enough coverage
*/
@@ -375,12 +366,12 @@ public class SearchCluster implements NodeManager<Node> {
private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) {
if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about.
- boolean changed = group.isFullCoverageStatusChanged(fullCoverage);
+ boolean changed = group.fullCoverageStatusChanged(fullCoverage);
if (changed || (!fullCoverage && System.currentTimeMillis() > nextLogTime)) {
nextLogTime = System.currentTimeMillis() + 30 * 1000;
if (fullCoverage) {
log.info("Cluster " + clusterId + ": " + group + " has full coverage. " +
- "Active documents: " + group.getActiveDocuments() + "/" + medianDocuments + ", " +
+ "Active documents: " + group.activeDocuments() + "/" + medianDocuments + ", " +
"working nodes: " + group.workingNodes() + "/" + group.nodes().size());
} else {
StringBuilder unresponsive = new StringBuilder();
@@ -389,7 +380,7 @@ public class SearchCluster implements NodeManager<Node> {
unresponsive.append('\n').append(node);
}
log.warning("Cluster " + clusterId + ": " + group + " has reduced coverage: " +
- "Active documents: " + group.getActiveDocuments() + "/" + medianDocuments + ", " +
+ "Active documents: " + group.activeDocuments() + "/" + medianDocuments + ", " +
"working nodes: " + group.workingNodes() + "/" + group.nodes().size() +
", unresponsive nodes: " + (unresponsive.toString().isEmpty() ? " none" : unresponsive));
}