summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java137
1 files changed, 123 insertions, 14 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
index bd47c0525ab..9d1b3565db8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
@@ -1,9 +1,12 @@
package com.yahoo.search.dispatch;
import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
+import com.yahoo.container.handler.VipStatus;
+import com.yahoo.net.HostName;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
import com.yahoo.search.result.ErrorMessage;
@@ -19,6 +22,7 @@ import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -47,19 +51,34 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
private final ImmutableMap<Integer, Group> groups;
private final ImmutableMultimap<String, Node> nodesByHost;
private final ClusterMonitor<Node> clusterMonitor;
+ private final VipStatus vipStatus;
+
+ /**
+ * A search node on this local machine having the entire corpus, which we therefore
+ * should prefer to dispatch directly to, or empty if there is no such local search node.
+ * If there is one, we also maintain the VIP status of this container based on the availability
+ * of the corpus on this local node (up + has coverage), such that this node is taken out of rotation
+ * if it only queries this cluster when the local node cannot be used, to avoid unnecessary
+ * cross-node network traffic.
+ */
+ private final Optional<Node> directDispatchTarget;
// Only needed until query requests are moved to rpc
private final FS4ResourcePool fs4ResourcePool;
- public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) {
- this(dispatchConfig.minActivedocsPercentage(), toNodes(dispatchConfig), fs4ResourcePool);
+ public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool,
+ int containerClusterSize, VipStatus vipStatus) {
+ this(dispatchConfig.minActivedocsPercentage(), toNodes(dispatchConfig), fs4ResourcePool,
+ containerClusterSize, vipStatus);
}
- public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool) {
+ public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool,
+ int containerClusterSize, VipStatus vipStatus) {
this.minActivedocsCoveragePercentage = minActivedocsCoverage;
- size = nodes.size();
+ this.size = nodes.size();
this.fs4ResourcePool = fs4ResourcePool;
-
+ this.vipStatus = vipStatus;
+
// Create groups
ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet())
@@ -77,8 +96,39 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
clusterMonitor = new ClusterMonitor<>(this);
for (Node node : nodes)
clusterMonitor.add(node, true);
+
+ this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize,
+ nodesByHost, groups);
}
-
+
+ private static Optional<Node> findDirectDispatchTarget(String selfHostname,
+ int searchClusterSize,
+ int containerClusterSize,
+ ImmutableMultimap<String, Node>nodesByHost,
+ ImmutableMap<Integer, Group> groups) {
+ // A search node in the search cluster in question is configured on the same host as the currently running container.
+ // It has all the data <==> No other nodes in the search cluster have the same group id as this node.
+ // That local search node responds.
+ // The search cluster to be searched has at least as many nodes as the container cluster we're running in.
+ ImmutableCollection<Node> localSearchNodes = nodesByHost.get(selfHostname);
+ // Only use direct dispatch if we have exactly 1 search node on the same machine:
+ if (localSearchNodes.size() != 1) return Optional.empty();
+
+ SearchCluster.Node localSearchNode = localSearchNodes.iterator().next();
+ SearchCluster.Group localSearchGroup = groups.get(localSearchNode.group());
+
+ // Only use direct dispatch if the local search node has the entire corpus
+ if (localSearchGroup.nodes().size() != 1) return Optional.empty();
+
+ // Only use direct dispatch if this container cluster has at least as many nodes as the search cluster
+ // to avoid load skew/preserve fanout in the case where a subset of the search nodes are also containers.
+ // This disregards the case where the search and container clusters are partially overlapping.
+ // Such configurations produce skewed load in any case.
+ if (containerClusterSize < searchClusterSize) return Optional.empty();
+
+ return Optional.of(localSearchNode);
+ }
+
private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) {
ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>();
for (DispatchConfig.Node node : dispatchConfig.node())
@@ -98,13 +148,63 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
*/
public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; }
+ /**
+ * Returns the recipient we should dispatch queries directly to (bypassing fdispatch),
+ * or empty if we should not dispatch directly.
+ */
+ public Optional<Node> directDispatchTarget() {
+ if ( ! directDispatchTarget.isPresent()) return Optional.empty();
+
+ // Only use direct dispatch if the local group has sufficient coverage
+ SearchCluster.Group localSearchGroup = groups.get(directDispatchTarget.get().group());
+ if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
+
+ // Only use direct dispatch if the local search node is up
+ if ( ! directDispatchTarget.get().isWorking()) return Optional.empty();
+
+ return directDispatchTarget;
+ }
+
/** Used by the cluster monitor to manage node status */
@Override
- public void working(Node node) { node.setWorking(true); }
+ public void working(Node node) {
+ node.setWorking(true);
+
+ if (usesDirectDispatchTo(node))
+ vipStatus.addToRotation(this);
+ }
/** Used by the cluster monitor to manage node status */
@Override
- public void failed(Node node) { node.setWorking(false); }
+ public void failed(Node node) {
+ node.setWorking(false);
+
+ // Take ourselves out if we usually dispatch only to our own host
+ if (usesDirectDispatchTo(node))
+ vipStatus.removeFromRotation(this);
+ }
+
+ private void updateSufficientCoverage(Group group, boolean sufficientCoverage) {
+ // update VIP status if we direct dispatch to this group and coverage status changed
+ if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) {
+ if (sufficientCoverage)
+ vipStatus.addToRotation(this);
+ else
+ vipStatus.removeFromRotation(this);
+ }
+
+ group.setHasSufficientCoverage(sufficientCoverage);
+ }
+
+ private boolean usesDirectDispatchTo(Node node) {
+ if ( ! directDispatchTarget.isPresent()) return false;
+ return directDispatchTarget.get().equals(node);
+ }
+
+ private boolean usesDirectDispatchTo(Group group) {
+ if ( ! directDispatchTarget.isPresent()) return false;
+ return directDispatchTarget.get().group() == group.id();
+ }
/** Used by the cluster monitor to manage node status */
@Override
@@ -132,7 +232,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
for (Group group : groups.values())
group.aggregateActiveDocuments();
if (groups.size() == 1) {
- groups.values().iterator().next().setHasSufficientCoverage(true); // by definition
+ updateSufficientCoverage(groups.values().iterator().next(), true); // by definition
} else {
for (Group currentGroup : groups.values()) {
long sumOfAactiveDocumentsInOtherGroups = 0;
@@ -141,10 +241,10 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
sumOfAactiveDocumentsInOtherGroups += otherGroup.getActiveDocuments();
long averageDocumentsInOtherGroups = sumOfAactiveDocumentsInOtherGroups / (groups.size() - 1);
if (averageDocumentsInOtherGroups == 0)
- currentGroup.setHasSufficientCoverage(true); // no information about any group; assume coverage
+ updateSufficientCoverage(currentGroup, true); // no information about any group; assume coverage
else
- currentGroup.setHasSufficientCoverage(
- 100 * (double) currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage);
+ updateSufficientCoverage(currentGroup,
+ 100 * (double) currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage);
}
}
}
@@ -200,7 +300,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
this.nodes = ImmutableList.copyOf(nodes);
}
- /** Returns the id of this group */
+ /** Returns the unique identity of this group */
public int id() { return id; }
/** Returns the nodes in this group as an immutable list */
@@ -218,7 +318,6 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
hasSufficientCoverage.lazySet(sufficientCoverage);
}
-
void aggregateActiveDocuments() {
long activeDocumentsInGroup = 0;
for (Node node : nodes)
@@ -235,6 +334,16 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> {
@Override
public String toString() { return "search group " + id; }
+ @Override
+ public int hashCode() { return id; }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) return true;
+ if (!(other instanceof Group)) return false;
+ return ((Group) other).id == this.id;
+ }
+
}
/** A node in a search cluster. This class is multithread safe. */