diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-03-09 13:45:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-09 13:45:19 +0100 |
commit | bd0f51961308733184b8bfb40c49ddbb9dbe7119 (patch) | |
tree | 5ccd1f1ab084dfa67790f9d7fb6740130e091993 /container-search | |
parent | 7dec6d6a1ed9a93fd0368d6477a74817996e2e78 (diff) | |
parent | f035afd9460684334860eac91698bc45a6558fbd (diff) |
Merge pull request #21611 from vespa-engine/bratseth/dispatch
Bratseth/dispatch
Diffstat (limited to 'container-search')
5 files changed, 80 insertions, 122 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 7f411bbc80f..27a45753bb5 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -81,7 +81,7 @@ public class FastSearcher extends VespaBackEndSearcher { @Override public Result doSearch2(Query query, Execution execution) { - if (dispatcher.searchCluster().wantedGroupSize() == 1) + if (dispatcher.searchCluster().allGroupsHaveSize1()) forceSinglePassGrouping(query); try (SearchInvoker invoker = getSearchInvoker(query)) { Result result = invoker.search(query, execution); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 7f4d8fc4739..4c0bcb38d15 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -12,13 +12,14 @@ import java.util.Set; import java.util.logging.Logger; /** - * LoadBalancer determines which group of content nodes should be accessed next for each search query when the internal java dispatcher is - * used. + * LoadBalancer determines which group of content nodes should be accessed next for each search query when the + * internal java dispatcher is used. + * + * The implementation here is a simplistic least queries in flight + round-robin load balancer * * @author ollivir */ public class LoadBalancer { - // The implementation here is a simplistic least queries in flight + round-robin load balancer private static final Logger log = Logger.getLogger(LoadBalancer.class.getName()); @@ -84,6 +85,7 @@ public class LoadBalancer { } static class GroupStatus { + private final Group group; private int allocations = 0; private long queries = 0; @@ -174,24 +176,10 @@ public class LoadBalancer { * @return the better of the two */ private static GroupStatus betterGroup(GroupStatus first, GroupStatus second) { - if (second == null) { - return first; - } - if (first == null) { - return second; - } - - // different coverage - if (first.group.hasSufficientCoverage() != second.group.hasSufficientCoverage()) { - if (!first.group.hasSufficientCoverage()) { - // first doesn't have coverage, second does - return second; - } else { - // second doesn't have coverage, first does - return first; - } - } - + if (second == null) return first; + if (first == null) return second; + if (first.group.hasSufficientCoverage() != second.group.hasSufficientCoverage()) + return first.group.hasSufficientCoverage() ? first : second; return first; } @@ -246,11 +234,8 @@ public class LoadBalancer { public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) { double needle = random.nextDouble(); Optional<GroupStatus> gs = selectGroup(needle, true, rejectedGroups); - if (gs.isPresent()) { - return gs; - } - // fallback - any coverage better than none - return selectGroup(needle, false, rejectedGroups); + if (gs.isPresent()) return gs; + return selectGroup(needle, false, rejectedGroups); // any coverage better than none } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index e99c1d5ad32..8f6e1b1ee2f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -17,7 +17,9 @@ import java.util.logging.Logger; public class Group { private static final Logger log = Logger.getLogger(Group.class.getName()); - private final static double maxContentSkew = 0.10; // If documents on a node is more than 10% off from the average the group is unbalanced + + // If documents on a node is more than 10% off from the average the group is unbalanced + private final static double maxContentSkew = 0.10; private final static int minDocsPerNodeToRequireLowSkew = 100; private final int id; @@ -41,8 +43,7 @@ public class Group { /** * Returns the unique identity of this group. - * NOTE: This is a contiguous index from 0, NOT necessarily the group id assigned - * by the user or node repo. + * NOTE: This is a contiguous index from 0, NOT necessarily the group id assigned by the user or node repo. */ public int id() { return id; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 36d7e7a85a9..f8c4627473d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -1,10 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMultimap; import com.google.common.math.Quantiles; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; @@ -32,11 +29,10 @@ public class SearchCluster implements NodeManager<Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); private final DispatchConfig dispatchConfig; - private final int size; private final String clusterId; - private final ImmutableMap<Integer, Group> groups; - private final ImmutableMultimap<String, Node> nodesByHost; - private final ImmutableList<Group> orderedGroups; + private final Map<Integer, Group> groups; + private final List<Group> orderedGroups; + private final List<Node> nodes; private final VipStatus vipStatus; private final PingFactory pingFactory; private final TopKEstimator hitEstimator; @@ -60,8 +56,7 @@ public class SearchCluster implements NodeManager<Node> { this.vipStatus = vipStatus; this.pingFactory = pingFactory; - List<Node> nodes = toNodes(dispatchConfig); - this.size = nodes.size(); + this.nodes = toNodes(dispatchConfig); // Create groups ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); @@ -72,16 +67,10 @@ public class SearchCluster implements NodeManager<Node> { this.groups = groupsBuilder.build(); LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>(); nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group()))); - this.orderedGroups = ImmutableList.<Group>builder().addAll(groupIntroductionOrder.values()).build(); + this.orderedGroups = List.copyOf(groupIntroductionOrder.values()); - // Index nodes by host - ImmutableMultimap.Builder<String, Node> nodesByHostBuilder = new ImmutableMultimap.Builder<>(); - for (Node node : nodes) - nodesByHostBuilder.put(node.hostname(), node); - this.nodesByHost = nodesByHostBuilder.build(); hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); - - this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodesByHost, groups); + this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups); } @Override @@ -95,13 +84,15 @@ public class SearchCluster implements NodeManager<Node> { } private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname, - ImmutableMultimap<String, Node> nodesByHost, - ImmutableMap<Integer, Group> groups) { + List<Node> nodes, + Map<Integer, Group> groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. // It has all the data <==> No other nodes in the search cluster have the same group id as this node. // That local search node responds. // The search cluster to be searched has at least as many nodes as the container cluster we're running in. - ImmutableCollection<Node> localSearchNodes = nodesByHost.get(selfHostname); + List<Node> localSearchNodes = nodes.stream() + .filter(node -> node.hostname().equals(selfHostname)) + .collect(Collectors.toList()); // Only use direct dispatch if we have exactly 1 search node on the same machine: if (localSearchNodes.size() != 1) return Optional.empty(); @@ -114,25 +105,24 @@ public class SearchCluster implements NodeManager<Node> { return Optional.of(localSearchNode); } - private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { - ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) - nodesBuilder.add(new Node(node.key(), node.host(), node.group())); - return nodesBuilder.build(); + private static List<Node> toNodes(DispatchConfig dispatchConfig) { + return dispatchConfig.node().stream() + .map(n -> new Node(n.key(), n.host(), n.group())) + .collect(Collectors.toUnmodifiableList()); } public DispatchConfig dispatchConfig() { return dispatchConfig; } - /** Returns the number of nodes in this cluster (across all groups) */ - public int size() { return size; } + /** Returns an immutable list of all nodes in this. */ + public List<Node> nodes() { return nodes; } /** Returns the groups of this cluster as an immutable map indexed by group id */ - public ImmutableMap<Integer, Group> groups() { return groups; } + public Map<Integer, Group> groups() { return groups; } /** Returns the groups of this cluster as an immutable list in introduction order */ - public ImmutableList<Group> orderedGroups() { return orderedGroups; } + public List<Group> orderedGroups() { return orderedGroups; } /** Returns the n'th (zero-indexed) group in the cluster if possible */ public Optional<Group> group(int n) { @@ -143,23 +133,12 @@ public class SearchCluster implements NodeManager<Node> { } } - /** - * Returns the wanted number of nodes per group - size()/groups.size(). - * The actual node count for a given group may differ due to node retirements. - */ - public int wantedGroupSize() { - if (groups().size() == 0) return size(); - return size() / groups().size(); + public boolean allGroupsHaveSize1() { + return nodes.size() == groups.size(); } public int groupsWithSufficientCoverage() { - int covered = 0; - for (Group g : orderedGroups()) { - if (g.hasSufficientCoverage()) { - covered++; - } - } - return covered; + return (int)groups.values().stream().filter(g -> g.hasSufficientCoverage()).count(); } /** @@ -210,8 +189,8 @@ public class SearchCluster implements NodeManager<Node> { } else if (usesLocalCorpusIn(node)) { // follow the status of this node // Do not take this out of rotation if we're a combined cluster of size 1, - // as that can't be helpful, and leads to a deadlock where this node is never taken back in servic e - if (nodeIsWorking || size() > 1) + // as that can't be helpful, and leads to a deadlock where this node is never set back in service + if (nodeIsWorking || nodes.size() > 1) setInRotationOnlyIf(nodeIsWorking); } } @@ -240,11 +219,11 @@ public class SearchCluster implements NodeManager<Node> { } public boolean hasInformationAboutAllNodes() { - return nodesByHost.values().stream().allMatch(node -> node.isWorking() != null); + return nodes.stream().allMatch(node -> node.isWorking() != null); } private boolean hasWorkingNodes() { - return nodesByHost.values().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); + return nodes.stream().anyMatch(node -> node.isWorking() != Boolean.FALSE ); } private boolean usesLocalCorpusIn(Node node) { @@ -255,31 +234,6 @@ public class SearchCluster implements NodeManager<Node> { return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); } - private static class PongCallback implements PongHandler { - - private final ClusterMonitor<Node> clusterMonitor; - private final Node node; - - PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) { - this.node = node; - this.clusterMonitor = clusterMonitor; - } - - @Override - public void handle(Pong pong) { - if (pong.badResponse()) { - clusterMonitor.failed(node, pong.error().get()); - } else { - if (pong.activeDocuments().isPresent()) { - node.setActiveDocuments(pong.activeDocuments().get()); - node.setBlockingWrites(pong.isBlockingWrites()); - } - clusterMonitor.responded(node); - } - } - - } - /** Used by the cluster monitor to manage node status */ @Override public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) { @@ -293,19 +247,15 @@ public class SearchCluster implements NodeManager<Node> { // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. updateSufficientCoverage(group, true); - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), - group.activeDocuments()); + boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments()); trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments()); } private void pingIterationCompletedMultipleGroups() { orderedGroups().forEach(Group::aggregateNodeValues); long medianDocuments = medianDocumentsPerGroup(); - boolean anyGroupsSufficientCoverage = false; for (Group group : orderedGroups()) { - boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), - medianDocuments); - anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage; + boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); } @@ -372,4 +322,29 @@ public class SearchCluster implements NodeManager<Node> { } } + private static class PongCallback implements PongHandler { + + private final ClusterMonitor<Node> clusterMonitor; + private final Node node; + + PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) { + this.node = node; + this.clusterMonitor = clusterMonitor; + } + + @Override + public void handle(Pong pong) { + if (pong.badResponse()) { + clusterMonitor.failed(node, pong.error().get()); + } else { + if (pong.activeDocuments().isPresent()) { + node.setActiveDocuments(pong.activeDocuments().get()); + node.setBlockingWrites(pong.isBlockingWrites()); + } + clusterMonitor.responded(node); + } + } + + } + } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index 54c8c1e0522..abd7267bb04 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -23,7 +23,7 @@ public class MockSearchCluster extends SearchCluster { private final int numNodesPerGroup; private final ImmutableList<Group> orderedGroups; private final ImmutableMap<Integer, Group> groups; - private final ImmutableMultimap<String, Node> nodesByHost; + private final List<Node> nodes; public MockSearchCluster(String clusterId, int groups, int nodesPerGroup) { this(clusterId, createDispatchConfig(), groups, nodesPerGroup); @@ -36,21 +36,22 @@ public class MockSearchCluster extends SearchCluster { ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder(); ImmutableMultimap.Builder<String, Node> hostBuilder = ImmutableMultimap.builder(); int distributionKey = 0; + this.nodes = new ArrayList<>(); for (int group = 0; group < groups; group++) { - List<Node> nodes = new ArrayList<>(); - for (int node = 0; node < nodesPerGroup; node++) { - Node n = new Node(distributionKey, "host" + distributionKey, group); - nodes.add(n); - hostBuilder.put(n.hostname(), n); + List<Node> groupNodes = new ArrayList<>(); + for (int i = 0; i < nodesPerGroup; i++) { + Node node = new Node(distributionKey, "host" + distributionKey, group); + nodes.add(node); + groupNodes.add(node); + hostBuilder.put(node.hostname(), node); distributionKey++; } - Group g = new Group(group, nodes); + Group g = new Group(group, groupNodes); groupBuilder.put(group, g); orderedGroupBuilder.add(g); } this.orderedGroups = orderedGroupBuilder.build(); this.groups = groupBuilder.build(); - this.nodesByHost = hostBuilder.build(); this.numGroups = groups; this.numNodesPerGroup = nodesPerGroup; } @@ -61,9 +62,7 @@ public class MockSearchCluster extends SearchCluster { } @Override - public int size() { - return numGroups * numNodesPerGroup; - } + public List<Node> nodes() { return nodes; } @Override public ImmutableMap<Integer, Group> groups() { @@ -71,9 +70,7 @@ public class MockSearchCluster extends SearchCluster { } @Override - public int wantedGroupSize() { - return numNodesPerGroup; - } + public boolean allGroupsHaveSize1() { return numNodesPerGroup == 1;} @Override public int groupsWithSufficientCoverage() { |