diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 131 |
1 files changed, 53 insertions, 78 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 36d7e7a85a9..f8c4627473d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -1,10 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMultimap; import com.google.common.math.Quantiles; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; @@ -32,11 +29,10 @@ public class SearchCluster implements NodeManager<Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); private final DispatchConfig dispatchConfig; - private final int size; private final String clusterId; - private final ImmutableMap<Integer, Group> groups; - private final ImmutableMultimap<String, Node> nodesByHost; - private final ImmutableList<Group> orderedGroups; + private final Map<Integer, Group> groups; + private final List<Group> orderedGroups; + private final List<Node> nodes; private final VipStatus vipStatus; private final PingFactory pingFactory; private final TopKEstimator hitEstimator; @@ -60,8 +56,7 @@ public class SearchCluster implements NodeManager<Node> { this.vipStatus = vipStatus; this.pingFactory = pingFactory; - List<Node> nodes = toNodes(dispatchConfig); - this.size = nodes.size(); + this.nodes = toNodes(dispatchConfig); // Create groups ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); @@ -72,16 +67,10 @@ public class SearchCluster implements NodeManager<Node> { this.groups = groupsBuilder.build(); LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>(); nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group()))); - this.orderedGroups = ImmutableList.<Group>builder().addAll(groupIntroductionOrder.values()).build(); + this.orderedGroups = List.copyOf(groupIntroductionOrder.values()); - // Index nodes by host - ImmutableMultimap.Builder<String, Node> nodesByHostBuilder = new ImmutableMultimap.Builder<>(); - for (Node node : nodes) - nodesByHostBuilder.put(node.hostname(), node); - this.nodesByHost = nodesByHostBuilder.build(); hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); - - this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodesByHost, groups); + this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups); } @Override @@ -95,13 +84,15 @@ public class SearchCluster implements NodeManager<Node> { } private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname, - ImmutableMultimap<String, Node> nodesByHost, - ImmutableMap<Integer, Group> groups) { + List<Node> nodes, + Map<Integer, Group> groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. // The search cluster to be searched has at least as many nodes as the container cluster we're running in. - ImmutableCollection<Node> localSearchNodes = nodesByHost.get(selfHostname); + List<Node> localSearchNodes = nodes.stream() + .filter(node -> node.hostname().equals(selfHostname)) + .collect(Collectors.toList()); // Only use direct dispatch if we have exactly 1 search node on the same machine: if (localSearchNodes.size() != 1) return Optional.empty(); @@ -114,25 +105,24 @@ public class SearchCluster implements NodeManager<Node> { return Optional.of(localSearchNode); } - private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { - ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) - nodesBuilder.add(new Node(node.key(), node.host(), node.group())); - return nodesBuilder.build(); + private static List<Node> toNodes(DispatchConfig dispatchConfig) { + return dispatchConfig.node().stream() + .map(n -> new Node(n.key(), n.host(), n.group())) + .collect(Collectors.toUnmodifiableList()); } public DispatchConfig dispatchConfig() { return dispatchConfig; } - /** Returns the number of nodes in this cluster (across all groups) */ - public int size() { return size; } + /** Returns an immutable list of all nodes in this. */ + public List<Node> nodes() { return nodes; } /** Returns the groups of this cluster as an immutable map indexed by group id */ - public ImmutableMap<Integer, Group> groups() { return groups; } + public Map<Integer, Group> groups() { return groups; } /** Returns the groups of this cluster as an immutable list in introduction order */ - public ImmutableList<Group> orderedGroups() { return orderedGroups; } + public List<Group> orderedGroups() { return orderedGroups; } /** Returns the n'th (zero-indexed) group in the cluster if possible */ public Optional<Group> group(int n) { @@ -143,23 +133,12 @@ public class SearchCluster implements NodeManager<Node> { } } - /** - * Returns the wanted number of nodes per group - size()/groups.size(). - * The actual node count for a given group may differ due to node retirements. - */ - public int wantedGroupSize() { - if (groups().size() == 0) return size(); - return size() / groups().size(); + public boolean allGroupsHaveSize1() { + return nodes.size() == groups.size(); } public int groupsWithSufficientCoverage() { - int covered = 0; - for (Group g : orderedGroups()) { - if (g.hasSufficientCoverage()) { - covered++; - } - } - return covered; + return (int)groups.values().stream().filter(g -> g.hasSufficientCoverage()).count(); } /** @@ -210,8 +189,8 @@ public class SearchCluster implements NodeManager<Node> { } else if (usesLocalCorpusIn(node)) { // follow the status of this node // Do not take this out of rotation if we're a combined cluster of size 1, - // as that can't be helpful, and leads to a deadlock where this node is never taken back in servic e - if (nodeIsWorking || size() > 1) + // as that can't be helpful, and leads to a deadlock where this node is never set back in service + if (nodeIsWorking || nodes.size() > 1) setInRotationOnlyIf(nodeIsWorking); } } @@ -240,11 +219,11 @@ public class SearchCluster implements NodeManager<Node> { } public boolean hasInformationAboutAllNodes() { - return nodesByHost.values().stream().allMatch(node -> node.isWorking() != null); + return nodes.stream().allMatch(node -> node.isWorking() != null); } private boolean hasWorkingNodes() { - return nodesByHost.values().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); + return nodes.stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); } private boolean usesLocalCorpusIn(Node node) { @@ -255,31 +234,6 @@ public class SearchCluster implements NodeManager<Node> { return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); } - private static class PongCallback implements PongHandler { - - private final ClusterMonitor<Node> clusterMonitor; - private final Node node; - - PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) { - this.node = node; - this.clusterMonitor = clusterMonitor; - } - - @Override - public void handle(Pong pong) { - if (pong.badResponse()) { - clusterMonitor.failed(node, pong.error().get()); - } else { - if (pong.activeDocuments().isPresent()) { - node.setActiveDocuments(pong.activeDocuments().get()); - node.setBlockingWrites(pong.isBlockingWrites()); - } - clusterMonitor.responded(node); - } - } - - } - /** Used by the cluster monitor to manage node status */ @Override public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) { @@ -293,19 +247,15 @@ public class SearchCluster implements NodeManager<Node> { // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. updateSufficientCoverage(group, true); - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), - group.activeDocuments()); + boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments()); trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments()); } private void pingIterationCompletedMultipleGroups() { orderedGroups().forEach(Group::aggregateNodeValues); long medianDocuments = medianDocumentsPerGroup(); - boolean anyGroupsSufficientCoverage = false; for (Group group : orderedGroups()) { - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), - medianDocuments); - anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage; + boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); } @@ -372,4 +322,29 @@ public class SearchCluster implements NodeManager<Node> { } } + private static class PongCallback implements PongHandler { + + private final ClusterMonitor<Node> clusterMonitor; + private final Node node; + + PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) { + this.node = node; + this.clusterMonitor = clusterMonitor; + } + + @Override + public void handle(Pong pong) { + if (pong.badResponse()) { + clusterMonitor.failed(node, pong.error().get()); + } else { + if (pong.activeDocuments().isPresent()) { + node.setActiveDocuments(pong.activeDocuments().get()); + node.setBlockingWrites(pong.isBlockingWrites()); + } + clusterMonitor.responded(node); + } + } + + } + } |