diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 109 |
1 files changed, 50 insertions, 59 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index ca2fce0b32b..bedfa965229 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -8,13 +8,13 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; -import java.util.LinkedHashMap; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Executor; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -24,7 +24,7 @@ import java.util.stream.Collectors; * * @author bratseth */ -public class SearchCluster implements NodeManager<Node> { +public class SearchCluster implements NodeManager<Node>, GroupList { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); @@ -32,9 +32,7 @@ public class SearchCluster implements NodeManager<Node> { private final String clusterId; private final VipStatus vipStatus; private final PingFactory pingFactory; - private final Map<Integer, Group> groups; - private final List<Group> orderedGroups; - private final List<Node> nodes; + private final GroupList groups; private long nextLogTime = 0; /** @@ -47,55 +45,48 @@ public class SearchCluster implements NodeManager<Node> { */ private final Node localCorpusDispatchTarget; - public SearchCluster(String clusterId, DispatchConfig dispatchConfig, + public SearchCluster(String clusterId, double minActivedocsPercentage, DispatchNodesConfig nodesConfig, VipStatus vipStatus, PingFactory pingFactory) { + this(clusterId, minActivedocsPercentage, toNodes(nodesConfig), vipStatus, pingFactory); + } + public SearchCluster(String clusterId, double minActivedocsPercentage, List<Node> nodes, + VipStatus vipStatus, PingFactory pingFactory) { + this(clusterId, minActivedocsPercentage, toGroups(nodes), vipStatus, pingFactory); + } + public SearchCluster(String clusterId, double minActivedocsPercentage, GroupList groups, + VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; - this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage(); + this.minActivedocsPercentage = minActivedocsPercentage; this.vipStatus = vipStatus; this.pingFactory = pingFactory; - - this.nodes = toNodes(nodesConfig); - - // Create groups - ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); - for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { - Group g = new Group(group.getKey(), group.getValue()); - groupsBuilder.put(group.getKey(), g); - } - this.groups = groupsBuilder.build(); - LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>(); - nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group()))); - this.orderedGroups = List.copyOf(groupIntroductionOrder.values()); - - this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups); + this.groups = groups; + this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups); } @Override public String name() { return clusterId; } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { - for (var group : orderedGroups()) { + for (var group : groups()) { for (var node : group.nodes()) clusterMonitor.add(node, true); } } - private static Node findLocalCorpusDispatchTarget(String selfHostname, - List<Node> nodes, - Map<Integer, Group> groups) { + private static Node findLocalCorpusDispatchTarget(String selfHostname, GroupList groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. // The search cluster to be searched has at least as many nodes as the container cluster we're running in. - List<Node> localSearchNodes = nodes.stream() + List<Node> localSearchNodes = groups.groups().stream().flatMap(g -> g.nodes().stream()) .filter(node -> node.hostname().equals(selfHostname)) .toList(); // Only use direct dispatch if we have exactly 1 search node on the same machine: if (localSearchNodes.size() != 1) return null; Node localSearchNode = localSearchNodes.iterator().next(); - Group localSearchGroup = groups.get(localSearchNode.group()); + Group localSearchGroup = groups.group(localSearchNode.group()); // Only use direct dispatch if the local search node has the entire corpus if (localSearchGroup.nodes().size() != 1) return null; @@ -109,27 +100,30 @@ public class SearchCluster implements NodeManager<Node> { .toList(); } - /** Returns the groups of this cluster as an immutable map indexed by group id */ - public Map<Integer, Group> groups() { return groups; } - - /** Returns the groups of this cluster as an immutable list in introduction order */ - public List<Group> orderedGroups() { return orderedGroups; } - - /** Returns the n'th (zero-indexed) group in the cluster if possible */ - public Optional<Group> group(int n) { - if (orderedGroups().size() > n) { - return Optional.of(orderedGroups().get(n)); - } else { - return Optional.empty(); + private static GroupList toGroups(Collection<Node> nodes) { + ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); + for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { + Group g = new Group(group.getKey(), group.getValue()); + groupsBuilder.put(group.getKey(), g); } + return new GroupListImpl(groupsBuilder.build()); } - public boolean allGroupsHaveSize1() { - return nodes.size() == groups.size(); + @Override + public Group group(int id) { + return groups.group(id); } + @Override + public Set<Integer> groupKeys() { return groups.groupKeys(); } + @Override + public Collection<Group> groups() { return groups.groups(); } + @Override + public int numGroups() { return groups.numGroups(); } + + public boolean allGroupsHaveSize1() { return groups().stream().allMatch(g -> g.nodes().size() == 1); } public int groupsWithSufficientCoverage() { - return (int)groups.values().stream().filter(g -> g.hasSufficientCoverage()).count(); + return (int)groups().stream().filter(Group::hasSufficientCoverage).count(); } /** @@ -140,7 +134,7 @@ public class SearchCluster implements NodeManager<Node> { if ( localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = groups().get(localCorpusDispatchTarget.group()); + Group localSearchGroup = group(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down @@ -181,7 +175,7 @@ public class SearchCluster implements NodeManager<Node> { else if (usesLocalCorpusIn(node)) { // follow the status of this node // Do not take this out of rotation if we're a combined cluster of size 1, // as that can't be helpful, and leads to a deadlock where this node is never set back in service - if (nodeIsWorking || nodes.size() > 1) + if (nodeIsWorking || groups().stream().map(Group::nodes).count() > 1) setInRotationOnlyIf(nodeIsWorking); } } @@ -203,11 +197,11 @@ public class SearchCluster implements NodeManager<Node> { } public boolean hasInformationAboutAllNodes() { - return nodes.stream().allMatch(node -> node.isWorking() != null); + return groups().stream().allMatch(g -> g.nodes().stream().allMatch(node -> node.isWorking() != null)); } private boolean hasWorkingNodes() { - return nodes.stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); + return groups().stream().anyMatch(g -> g.nodes().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE)); } private boolean usesLocalCorpusIn(Node node) { @@ -226,7 +220,7 @@ public class SearchCluster implements NodeManager<Node> { } private void pingIterationCompletedSingleGroup() { - Group group = groups().values().iterator().next(); + Group group = groups().iterator().next(); group.aggregateNodeValues(); // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. @@ -236,9 +230,9 @@ public class SearchCluster implements NodeManager<Node> { } private void pingIterationCompletedMultipleGroups() { - orderedGroups().forEach(Group::aggregateNodeValues); + groups().forEach(Group::aggregateNodeValues); long medianDocuments = medianDocumentsPerGroup(); - for (Group group : orderedGroups()) { + for (Group group : groups()) { boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); @@ -246,8 +240,8 @@ public class SearchCluster implements NodeManager<Node> { } private long medianDocumentsPerGroup() { - if (orderedGroups().isEmpty()) return 0; - var activeDocuments = orderedGroups().stream().map(Group::activeDocuments).collect(Collectors.toList()); + if (isEmpty()) return 0; + var activeDocuments = groups().stream().map(Group::activeDocuments).collect(Collectors.toList()); return (long)Quantiles.median().compute(activeDocuments); } @@ -258,8 +252,7 @@ public class SearchCluster implements NodeManager<Node> { */ @Override public void pingIterationCompleted() { - int numGroups = orderedGroups().size(); - if (numGroups == 1) { + if (numGroups() == 1) { pingIterationCompletedSingleGroup(); } else { pingIterationCompletedMultipleGroups(); @@ -268,16 +261,14 @@ public class SearchCluster implements NodeManager<Node> { private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) { double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments; - if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage) - return false; - return true; + return ! (medianDocuments > 0 && documentCoverage < minActivedocsPercentage); } /** * Calculate whether a subset of nodes in a group has enough coverage */ public boolean isPartialGroupCoverageSufficient(List<Node> nodes) { - if (orderedGroups().size() == 1) + if (numGroups() == 1) return true; long activeDocuments = nodes.stream().mapToLong(Node::getActiveDocuments).sum(); return isGroupCoverageSufficient(activeDocuments, medianDocumentsPerGroup()); |