diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java new file mode 100644 index 00000000000..b0e63d20931 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -0,0 +1,343 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.net.HostName; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.cluster.NodeManager; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.vespa.config.search.DispatchConfig; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * A model of a search cluster we might want to dispatch queries to. + * + * @author bratseth + */ +public class SearchCluster implements NodeManager<Node> { + + private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); + + /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ + private final double minActivedocsCoveragePercentage; + private final double minGroupCoverage; + private final int maxNodesDownPerGroup; + private final int size; + private final ImmutableMap<Integer, Group> groups; + private final ImmutableMultimap<String, Node> nodesByHost; + private final ImmutableList<Group> orderedGroups; + private final ClusterMonitor<Node> clusterMonitor; + private final VipStatus vipStatus; + + /** + * A search node on this local machine having the entire corpus, which we therefore + * should prefer to dispatch directly to, or empty if there is no such local search node. + * If there is one, we also maintain the VIP status of this container based on the availability + * of the corpus on this local node (up + has coverage), such that this node is taken out of rotation + * if it only queries this cluster when the local node cannot be used, to avoid unnecessary + * cross-node network traffic. + */ + private final Optional<Node> directDispatchTarget; + + // Only needed until query requests are moved to rpc + private final FS4ResourcePool fs4ResourcePool; + + public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { + this(dispatchConfig.minActivedocsPercentage(), dispatchConfig.minGroupCoverage(), dispatchConfig.maxNodesDownPerGroup(), + toNodes(dispatchConfig), fs4ResourcePool, containerClusterSize, vipStatus); + } + + public SearchCluster(double minActivedocsCoverage, double minGroupCoverage, int maxNodesDownPerGroup, List<Node> nodes, FS4ResourcePool fs4ResourcePool, + int containerClusterSize, VipStatus vipStatus) { + this.minActivedocsCoveragePercentage = minActivedocsCoverage; + this.minGroupCoverage = minGroupCoverage; + this.maxNodesDownPerGroup = maxNodesDownPerGroup; + this.size = nodes.size(); + this.fs4ResourcePool = fs4ResourcePool; + this.vipStatus = vipStatus; + + // Create groups + ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); + for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { + Group g = new Group(group.getKey(), group.getValue()); + groupsBuilder.put(group.getKey(), g); + } + this.groups = groupsBuilder.build(); + LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>(); + nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group))); + this.orderedGroups = ImmutableList.<Group>builder().addAll(groupIntroductionOrder.values()).build(); + + // Index nodes by host + ImmutableMultimap.Builder<String, Node> nodesByHostBuilder = new ImmutableMultimap.Builder<>(); + for (Node node : nodes) + nodesByHostBuilder.put(node.hostname(), node); + this.nodesByHost = nodesByHostBuilder.build(); + + this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, + nodesByHost, groups); + + // Set up monitoring of the fs4 interface of the nodes + // We can switch to monitoring the rpc interface instead when we move the query phase to rpc + this.clusterMonitor = new ClusterMonitor<>(this); + for (Node node : nodes) { + // cluster monitor will only call working() when the + // node transitions from down to up, so we need to + // register the initial (working) state here: + working(node); + clusterMonitor.add(node, true); + } + } + + private static Optional<Node> findDirectDispatchTarget(String selfHostname, + int searchClusterSize, + int containerClusterSize, + ImmutableMultimap<String, Node>nodesByHost, + ImmutableMap<Integer, Group> groups) { + // A search node in the search cluster in question is configured on the same host as the currently running container. + // It has all the data <==> No other nodes in the search cluster have the same group id as this node. + // That local search node responds. + // The search cluster to be searched has at least as many nodes as the container cluster we're running in. + ImmutableCollection<Node> localSearchNodes = nodesByHost.get(selfHostname); + // Only use direct dispatch if we have exactly 1 search node on the same machine: + if (localSearchNodes.size() != 1) return Optional.empty(); + + Node localSearchNode = localSearchNodes.iterator().next(); + Group localSearchGroup = groups.get(localSearchNode.group()); + + // Only use direct dispatch if the local search node has the entire corpus + if (localSearchGroup.nodes().size() != 1) return Optional.empty(); + + // Only use direct dispatch if this container cluster has at least as many nodes as the search cluster + // to avoid load skew/preserve fanout in the case where a subset of the search nodes are also containers. + // This disregards the case where the search and container clusters are partially overlapping. + // Such configurations produce skewed load in any case. + if (containerClusterSize < searchClusterSize) return Optional.empty(); + + return Optional.of(localSearchNode); + } + + private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { + ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>(); + for (DispatchConfig.Node node : dispatchConfig.node()) + nodesBuilder.add(new Node(node.key(), node.host(), node.fs4port(), node.group())); + return nodesBuilder.build(); + } + + /** Returns the number of nodes in this cluster (across all groups) */ + public int size() { return size; } + + /** Returns the groups of this cluster as an immutable map indexed by group id */ + public ImmutableMap<Integer, Group> groups() { return groups; } + + /** Returns the groups of this cluster as an immutable list in introduction order */ + public ImmutableList<Group> orderedGroups() { return orderedGroups; } + + /** Returns the n'th (zero-indexed) group in the cluster if possible */ + public Optional<Group> group(int n) { + if (orderedGroups.size() > n) { + return Optional.of(orderedGroups.get(n)); + } else { + return Optional.empty(); + } + } + + /** Returns the number of nodes per group - size()/groups.size() */ + public int groupSize() { + if (groups.size() == 0) return size(); + return size() / groups.size(); + } + + /** + * Returns the nodes of this cluster as an immutable map indexed by host. + * One host may contain multiple nodes (on different ports), so this is a multi-map. + */ + public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; } + + /** + * Returns the recipient we should dispatch queries directly to (bypassing fdispatch), + * or empty if we should not dispatch directly. + */ + public Optional<Node> directDispatchTarget() { + if ( ! directDispatchTarget.isPresent()) return Optional.empty(); + + // Only use direct dispatch if the local group has sufficient coverage + Group localSearchGroup = groups.get(directDispatchTarget.get().group()); + if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); + + // Only use direct dispatch if the local search node is up + if ( ! directDispatchTarget.get().isWorking()) return Optional.empty(); + + return directDispatchTarget; + } + + /** Used by the cluster monitor to manage node status */ + @Override + public void working(Node node) { + node.setWorking(true); + + if (usesDirectDispatchTo(node)) + vipStatus.addToRotation(this); + } + + /** Used by the cluster monitor to manage node status */ + @Override + public void failed(Node node) { + node.setWorking(false); + + // Take ourselves out if we usually dispatch only to our own host + if (usesDirectDispatchTo(node)) + vipStatus.removeFromRotation(this); + } + + private void updateSufficientCoverage(Group group, boolean sufficientCoverage) { + // update VIP status if we direct dispatch to this group and coverage status changed + if (usesDirectDispatchTo(group) && sufficientCoverage != group.hasSufficientCoverage()) { + if (sufficientCoverage) { + vipStatus.addToRotation(this); + } else { + vipStatus.removeFromRotation(this); + } + } + group.setHasSufficientCoverage(sufficientCoverage); + } + + private boolean usesDirectDispatchTo(Node node) { + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().equals(node); + } + + private boolean usesDirectDispatchTo(Group group) { + if ( ! directDispatchTarget.isPresent()) return false; + return directDispatchTarget.get().group() == group.id(); + } + + /** Used by the cluster monitor to manage node status */ + @Override + public void ping(Node node, Executor executor) { + Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool); + FutureTask<Pong> futurePong = new FutureTask<>(pinger); + executor.execute(futurePong); + Pong pong = getPong(futurePong, node); + futurePong.cancel(true); + + if (pong.badResponse()) + clusterMonitor.failed(node, pong.getError(0)); + else + clusterMonitor.responded(node); + } + + /** + * Update statistics after a round of issuing pings. + * Note that this doesn't wait for pings to return, so it will typically accumulate data from + * last rounds pinging, or potentially (although unlikely) some combination of new and old data. + */ + @Override + public void pingIterationCompleted() { + int numGroups = orderedGroups.size(); + if (numGroups == 1) { + Group group = groups.values().iterator().next(); + group.aggregateActiveDocuments(); + updateSufficientCoverage(group, true); // by definition + return; + } + + // Update active documents per group and use it to decide if the group should be active + + long[] activeDocumentsInGroup = new long[numGroups]; + long sumOfActiveDocuments = 0; + for(int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); + group.aggregateActiveDocuments(); + activeDocumentsInGroup[i] = group.getActiveDocuments(); + sumOfActiveDocuments += activeDocumentsInGroup[i]; + } + + for (int i = 0; i < numGroups; i++) { + Group group = orderedGroups.get(i); + long activeDocuments = activeDocumentsInGroup[i]; + long averageDocumentsInOtherGroups = (sumOfActiveDocuments - activeDocuments) / (numGroups - 1); + boolean sufficientCoverage = isGroupCoverageSufficient(group.nodes(), activeDocuments, averageDocumentsInOtherGroups); + updateSufficientCoverage(group, sufficientCoverage); + } + } + + private boolean isGroupCoverageSufficient(List<Node> nodes, long activeDocuments, long averageDocumentsInOtherGroups) { + boolean sufficientCoverage = true; + + if (averageDocumentsInOtherGroups > 0) { + double coverage = 100.0 * (double) activeDocuments / averageDocumentsInOtherGroups; + sufficientCoverage = coverage >= minActivedocsCoveragePercentage; + } + if (sufficientCoverage) { + sufficientCoverage = isGroupNodeCoverageSufficient(nodes); + } + return sufficientCoverage; + } + + private boolean isGroupNodeCoverageSufficient(List<Node> nodes) { + int nodesUp = 0; + for (Node node : nodes) { + if (node.isWorking()) { + nodesUp++; + } + } + int numNodes = nodes.size(); + int nodesAllowedDown = maxNodesDownPerGroup + (int) (((double) numNodes * (100.0 - minGroupCoverage)) / 100.0); + return nodesUp + nodesAllowedDown >= numNodes; + } + + private Pong getPong(FutureTask<Pong> futurePong, Node node) { + try { + return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.log(Level.WARNING, "Exception pinging " + node, e); + return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); + } catch (ExecutionException e) { + log.log(Level.WARNING, "Exception pinging " + node, e); + return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); + } catch (TimeoutException e) { + return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); + } + } + + /** + * Calculate whether a subset of nodes in a group has enough coverage + */ + public boolean isPartialGroupCoverageSufficient(int groupId, List<Node> nodes) { + if (orderedGroups.size() == 1) { + return true; + } + long sumOfActiveDocuments = 0; + int otherGroups = 0; + for (Group g : orderedGroups) { + if (g.id() != groupId) { + sumOfActiveDocuments += g.getActiveDocuments(); + otherGroups++; + } + } + long activeDocuments = 0; + for (Node n : nodes) { + activeDocuments += n.getActiveDocuments(); + } + long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups; + return isGroupCoverageSufficient(nodes, activeDocuments, averageDocumentsInOtherGroups); + } +} |