diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java | 137 |
1 files changed, 123 insertions, 14 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index bd47c0525ab..9d1b3565db8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -1,9 +1,12 @@ package com.yahoo.search.dispatch; import com.google.common.annotations.Beta; +import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.net.HostName; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; import com.yahoo.search.result.ErrorMessage; @@ -19,6 +22,7 @@ import com.yahoo.prelude.fastsearch.FS4ResourcePool; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -47,19 +51,34 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; private final ClusterMonitor<Node> clusterMonitor; + private final VipStatus vipStatus; + + /** + * A search node on this local machine having the entire corpus, which we therefore + * should prefer to dispatch directly to, or empty if there is no such local search node. + * If there is one, we also maintain the VIP status of this container based on the availability + * of the corpus on this local node (up + has coverage), such that this node is taken out of rotation + * if it only queries this cluster when the local node cannot be used, to avoid unnecessary + * cross-node network traffic. + */ + private final Optional<Node> directDispatchTarget; // Only needed until query requests are moved to rpc private final FS4ResourcePool fs4ResourcePool; - public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) { - this(dispatchConfig.minActivedocsPercentage(), toNodes(dispatchConfig), fs4ResourcePool); + public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, + int containerClusterSize, VipStatus vipStatus) { + this(dispatchConfig.minActivedocsPercentage(), toNodes(dispatchConfig), fs4ResourcePool, + containerClusterSize, vipStatus); } - public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool) { + public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool, + int containerClusterSize, VipStatus vipStatus) { this.minActivedocsCoveragePercentage = minActivedocsCoverage; - size = nodes.size(); + this.size = nodes.size(); this.fs4ResourcePool = fs4ResourcePool; - + this.vipStatus = vipStatus; + // Create groups ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) @@ -77,8 +96,39 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { clusterMonitor = new ClusterMonitor<>(this); for (Node node : nodes) clusterMonitor.add(node, true); + + this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, + nodesByHost, groups); } - + + private static Optional<Node> findDirectDispatchTarget(String selfHostname, + int searchClusterSize, + int containerClusterSize, + ImmutableMultimap<String, Node>nodesByHost, + ImmutableMap<Integer, Group> groups) { + // A search node in the search cluster in question is configured on the same host as the currently running container. + // It has all the data <==> No other nodes in the search cluster have the same group id as this node. + // That local search node responds. + // The search cluster to be searched has at least as many nodes as the container cluster we're running in. + ImmutableCollection<Node> localSearchNodes = nodesByHost.get(selfHostname); + // Only use direct dispatch if we have exactly 1 search node on the same machine: + if (localSearchNodes.size() != 1) return Optional.empty(); + + SearchCluster.Node localSearchNode = localSearchNodes.iterator().next(); + SearchCluster.Group localSearchGroup = groups.get(localSearchNode.group()); + + // Only use direct dispatch if the local search node has the entire corpus + if (localSearchGroup.nodes().size() != 1) return Optional.empty(); + + // Only use direct dispatch if this container cluster has at least as many nodes as the search cluster + // to avoid load skew/preserve fanout in the case where a subset of the search nodes are also containers. + // This disregards the case where the search and container clusters are partially overlapping. + // Such configurations produce skewed load in any case. + if (containerClusterSize < searchClusterSize) return Optional.empty(); + + return Optional.of(localSearchNode); + } + private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>(); for (DispatchConfig.Node node : dispatchConfig.node()) @@ -98,13 +148,63 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { */ public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; } + /** + * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), + * or empty if we should not dispatch directly. + */ + public Optional<Node> directDispatchTarget() { + if ( ! directDispatchTarget.isPresent()) return Optional.empty(); + + // Only use direct dispatch if the local group has sufficient coverage + SearchCluster.Group localSearchGroup = groups.get(directDispatchTarget.get().group()); + if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); + + // Only use direct dispatch if the local search node is up + if ( ! directDispatchTarget.get().isWorking()) return Optional.empty(); + + return directDispatchTarget; + } + /** Used by the cluster monitor to manage node status */ @Override - public void working(Node node) { node.setWorking(true); } + public void working(Node node) { + node.setWorking(true); + + if (usesDirectDispatchTo(node)) + vipStatus.addToRotation(this); + } /** Used by the cluster monitor to manage node status */ @Override - public void failed(Node node) { node.setWorking(false); } + public void failed(Node node) { + node.setWorking(false); + + // Take ourselves out if we usually dispatch only to our own host + if (usesDirectDispatchTo(node)) + vipStatus.removeFromRotation(this); + } + + private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { + // update VIP status if we direct dispatch to this group and coverage status changed + if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { + if (sufficientCoverage) + vipStatus.addToRotation(this); + else + vipStatus.removeFromRotation(this); + } + + group.setHasSufficientCoverage(sufficientCoverage); + } + + private boolean usesDirectDispatchTo(Node node) { + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().equals(node); + } + + private boolean usesDirectDispatchTo(Group group) { + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().group() == group.id(); + } /** Used by the cluster monitor to manage node status */ @Override @@ -132,7 +232,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { for (Group group : groups.values()) group.aggregateActiveDocuments(); if (groups.size() == 1) { - groups.values().iterator().next().setHasSufficientCoverage(true); // by definition + updateSufficientCoverage(groups.values().iterator().next(), true); // by definition } else { for (Group currentGroup : groups.values()) { long sumOfAactiveDocumentsInOtherGroups = 0; @@ -141,10 +241,10 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { sumOfAactiveDocumentsInOtherGroups += otherGroup.getActiveDocuments(); long averageDocumentsInOtherGroups = sumOfAactiveDocumentsInOtherGroups / (groups.size() - 1); if (averageDocumentsInOtherGroups == 0) - currentGroup.setHasSufficientCoverage(true); // no information about any group; assume coverage + updateSufficientCoverage(currentGroup, true); // no information about any group; assume coverage else - currentGroup.setHasSufficientCoverage( - 100 * (double) currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage); + updateSufficientCoverage(currentGroup, + 100 * (double) currentGroup.getActiveDocuments() / averageDocumentsInOtherGroups > minActivedocsCoveragePercentage); } } } @@ -200,7 +300,7 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { this.nodes = ImmutableList.copyOf(nodes); } - /** Returns the id of this group */ + /** Returns the unique identity of this group */ public int id() { return id; } /** Returns the nodes in this group as an immutable list */ @@ -218,7 +318,6 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { hasSufficientCoverage.lazySet(sufficientCoverage); } - void aggregateActiveDocuments() { long activeDocumentsInGroup = 0; for (Node node : nodes) @@ -235,6 +334,16 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { @Override public String toString() { return "search group " + id; } + @Override + public int hashCode() { return id; } + + @Override + public boolean equals(Object other) { + if (other == this) return true; + if (!(other instanceof Group)) return false; + return ((Group) other).id == this.id; + } + } /** A node in a search cluster. This class is multithread safe. */ |