summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-03-09 13:45:19 +0100
committerGitHub <noreply@github.com>2022-03-09 13:45:19 +0100
commitbd0f51961308733184b8bfb40c49ddbb9dbe7119 (patch)
tree5ccd1f1ab084dfa67790f9d7fb6740130e091993 /container-search
parent7dec6d6a1ed9a93fd0368d6477a74817996e2e78 (diff)
parentf035afd9460684334860eac91698bc45a6558fbd (diff)
Merge pull request #21611 from vespa-engine/bratseth/dispatch
Bratseth/dispatch
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java37
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java131
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java25
5 files changed, 80 insertions, 122 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index 7f411bbc80f..27a45753bb5 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -81,7 +81,7 @@ public class FastSearcher extends VespaBackEndSearcher {
@Override
public Result doSearch2(Query query, Execution execution) {
- if (dispatcher.searchCluster().wantedGroupSize() == 1)
+ if (dispatcher.searchCluster().allGroupsHaveSize1())
forceSinglePassGrouping(query);
try (SearchInvoker invoker = getSearchInvoker(query)) {
Result result = invoker.search(query, execution);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
index 7f4d8fc4739..4c0bcb38d15 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -12,13 +12,14 @@ import java.util.Set;
import java.util.logging.Logger;
/**
- * LoadBalancer determines which group of content nodes should be accessed next for each search query when the internal java dispatcher is
- * used.
+ * LoadBalancer determines which group of content nodes should be accessed next for each search query when the
+ * internal java dispatcher is used.
+ *
+ * The implementation here is a simplistic least queries in flight + round-robin load balancer
*
* @author ollivir
*/
public class LoadBalancer {
- // The implementation here is a simplistic least queries in flight + round-robin load balancer
private static final Logger log = Logger.getLogger(LoadBalancer.class.getName());
@@ -84,6 +85,7 @@ public class LoadBalancer {
}
static class GroupStatus {
+
private final Group group;
private int allocations = 0;
private long queries = 0;
@@ -174,24 +176,10 @@ public class LoadBalancer {
* @return the better of the two
*/
private static GroupStatus betterGroup(GroupStatus first, GroupStatus second) {
- if (second == null) {
- return first;
- }
- if (first == null) {
- return second;
- }
-
- // different coverage
- if (first.group.hasSufficientCoverage() != second.group.hasSufficientCoverage()) {
- if (!first.group.hasSufficientCoverage()) {
- // first doesn't have coverage, second does
- return second;
- } else {
- // second doesn't have coverage, first does
- return first;
- }
- }
-
+ if (second == null) return first;
+ if (first == null) return second;
+ if (first.group.hasSufficientCoverage() != second.group.hasSufficientCoverage())
+ return first.group.hasSufficientCoverage() ? first : second;
return first;
}
@@ -246,11 +234,8 @@ public class LoadBalancer {
public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) {
double needle = random.nextDouble();
Optional<GroupStatus> gs = selectGroup(needle, true, rejectedGroups);
- if (gs.isPresent()) {
- return gs;
- }
- // fallback - any coverage better than none
- return selectGroup(needle, false, rejectedGroups);
+ if (gs.isPresent()) return gs;
+ return selectGroup(needle, false, rejectedGroups); // any coverage better than none
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index e99c1d5ad32..8f6e1b1ee2f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -17,7 +17,9 @@ import java.util.logging.Logger;
public class Group {
private static final Logger log = Logger.getLogger(Group.class.getName());
- private final static double maxContentSkew = 0.10; // If documents on a node is more than 10% off from the average the group is unbalanced
+
+ // If documents on a node is more than 10% off from the average the group is unbalanced
+ private final static double maxContentSkew = 0.10;
private final static int minDocsPerNodeToRequireLowSkew = 100;
private final int id;
@@ -41,8 +43,7 @@ public class Group {
/**
* Returns the unique identity of this group.
- * NOTE: This is a contiguous index from 0, NOT necessarily the group id assigned
- * by the user or node repo.
+ * NOTE: This is a contiguous index from 0, NOT necessarily the group id assigned by the user or node repo.
*/
public int id() { return id; }
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 36d7e7a85a9..f8c4627473d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -1,10 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.searchcluster;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
import com.google.common.math.Quantiles;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.net.HostName;
@@ -32,11 +29,10 @@ public class SearchCluster implements NodeManager<Node> {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
private final DispatchConfig dispatchConfig;
- private final int size;
private final String clusterId;
- private final ImmutableMap<Integer, Group> groups;
- private final ImmutableMultimap<String, Node> nodesByHost;
- private final ImmutableList<Group> orderedGroups;
+ private final Map<Integer, Group> groups;
+ private final List<Group> orderedGroups;
+ private final List<Node> nodes;
private final VipStatus vipStatus;
private final PingFactory pingFactory;
private final TopKEstimator hitEstimator;
@@ -60,8 +56,7 @@ public class SearchCluster implements NodeManager<Node> {
this.vipStatus = vipStatus;
this.pingFactory = pingFactory;
- List<Node> nodes = toNodes(dispatchConfig);
- this.size = nodes.size();
+ this.nodes = toNodes(dispatchConfig);
// Create groups
ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
@@ -72,16 +67,10 @@ public class SearchCluster implements NodeManager<Node> {
this.groups = groupsBuilder.build();
LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>();
nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group())));
- this.orderedGroups = ImmutableList.<Group>builder().addAll(groupIntroductionOrder.values()).build();
+ this.orderedGroups = List.copyOf(groupIntroductionOrder.values());
- // Index nodes by host
- ImmutableMultimap.Builder<String, Node> nodesByHostBuilder = new ImmutableMultimap.Builder<>();
- for (Node node : nodes)
- nodesByHostBuilder.put(node.hostname(), node);
- this.nodesByHost = nodesByHostBuilder.build();
hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
-
- this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodesByHost, groups);
+ this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups);
}
@Override
@@ -95,13 +84,15 @@ public class SearchCluster implements NodeManager<Node> {
}
private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
- ImmutableMultimap<String, Node> nodesByHost,
- ImmutableMap<Integer, Group> groups) {
+ List<Node> nodes,
+ Map<Integer, Group> groups) {
// A search node in the search cluster in question is configured on the same host as the currently running container.
// It has all the data <==> No other nodes in the search cluster have the same group id as this node.
// That local search node responds.
// The search cluster to be searched has at least as many nodes as the container cluster we're running in.
- ImmutableCollection<Node> localSearchNodes = nodesByHost.get(selfHostname);
+ List<Node> localSearchNodes = nodes.stream()
+ .filter(node -> node.hostname().equals(selfHostname))
+ .collect(Collectors.toList());
// Only use direct dispatch if we have exactly 1 search node on the same machine:
if (localSearchNodes.size() != 1) return Optional.empty();
@@ -114,25 +105,24 @@ public class SearchCluster implements NodeManager<Node> {
return Optional.of(localSearchNode);
}
- private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) {
- ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>();
- for (DispatchConfig.Node node : dispatchConfig.node())
- nodesBuilder.add(new Node(node.key(), node.host(), node.group()));
- return nodesBuilder.build();
+ private static List<Node> toNodes(DispatchConfig dispatchConfig) {
+ return dispatchConfig.node().stream()
+ .map(n -> new Node(n.key(), n.host(), n.group()))
+ .collect(Collectors.toUnmodifiableList());
}
public DispatchConfig dispatchConfig() {
return dispatchConfig;
}
- /** Returns the number of nodes in this cluster (across all groups) */
- public int size() { return size; }
+ /** Returns an immutable list of all nodes in this. */
+ public List<Node> nodes() { return nodes; }
/** Returns the groups of this cluster as an immutable map indexed by group id */
- public ImmutableMap<Integer, Group> groups() { return groups; }
+ public Map<Integer, Group> groups() { return groups; }
/** Returns the groups of this cluster as an immutable list in introduction order */
- public ImmutableList<Group> orderedGroups() { return orderedGroups; }
+ public List<Group> orderedGroups() { return orderedGroups; }
/** Returns the n'th (zero-indexed) group in the cluster if possible */
public Optional<Group> group(int n) {
@@ -143,23 +133,12 @@ public class SearchCluster implements NodeManager<Node> {
}
}
- /**
- * Returns the wanted number of nodes per group - size()/groups.size().
- * The actual node count for a given group may differ due to node retirements.
- */
- public int wantedGroupSize() {
- if (groups().size() == 0) return size();
- return size() / groups().size();
+ public boolean allGroupsHaveSize1() {
+ return nodes.size() == groups.size();
}
public int groupsWithSufficientCoverage() {
- int covered = 0;
- for (Group g : orderedGroups()) {
- if (g.hasSufficientCoverage()) {
- covered++;
- }
- }
- return covered;
+ return (int)groups.values().stream().filter(g -> g.hasSufficientCoverage()).count();
}
/**
@@ -210,8 +189,8 @@ public class SearchCluster implements NodeManager<Node> {
}
else if (usesLocalCorpusIn(node)) { // follow the status of this node
// Do not take this out of rotation if we're a combined cluster of size 1,
- // as that can't be helpful, and leads to a deadlock where this node is never taken back in servic e
- if (nodeIsWorking || size() > 1)
+ // as that can't be helpful, and leads to a deadlock where this node is never set back in service
+ if (nodeIsWorking || nodes.size() > 1)
setInRotationOnlyIf(nodeIsWorking);
}
}
@@ -240,11 +219,11 @@ public class SearchCluster implements NodeManager<Node> {
}
public boolean hasInformationAboutAllNodes() {
- return nodesByHost.values().stream().allMatch(node -> node.isWorking() != null);
+ return nodes.stream().allMatch(node -> node.isWorking() != null);
}
private boolean hasWorkingNodes() {
- return nodesByHost.values().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE );
+ return nodes.stream().anyMatch(node -> node.isWorking() != Boolean.FALSE );
}
private boolean usesLocalCorpusIn(Node node) {
@@ -255,31 +234,6 @@ public class SearchCluster implements NodeManager<Node> {
return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id();
}
- private static class PongCallback implements PongHandler {
-
- private final ClusterMonitor<Node> clusterMonitor;
- private final Node node;
-
- PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) {
- this.node = node;
- this.clusterMonitor = clusterMonitor;
- }
-
- @Override
- public void handle(Pong pong) {
- if (pong.badResponse()) {
- clusterMonitor.failed(node, pong.error().get());
- } else {
- if (pong.activeDocuments().isPresent()) {
- node.setActiveDocuments(pong.activeDocuments().get());
- node.setBlockingWrites(pong.isBlockingWrites());
- }
- clusterMonitor.responded(node);
- }
- }
-
- }
-
/** Used by the cluster monitor to manage node status */
@Override
public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) {
@@ -293,19 +247,15 @@ public class SearchCluster implements NodeManager<Node> {
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
updateSufficientCoverage(group, true);
- boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(),
- group.activeDocuments());
+ boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments());
trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments());
}
private void pingIterationCompletedMultipleGroups() {
orderedGroups().forEach(Group::aggregateNodeValues);
long medianDocuments = medianDocumentsPerGroup();
- boolean anyGroupsSufficientCoverage = false;
for (Group group : orderedGroups()) {
- boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(),
- medianDocuments);
- anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage;
+ boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments);
updateSufficientCoverage(group, sufficientCoverage);
trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments);
}
@@ -372,4 +322,29 @@ public class SearchCluster implements NodeManager<Node> {
}
}
+ private static class PongCallback implements PongHandler {
+
+ private final ClusterMonitor<Node> clusterMonitor;
+ private final Node node;
+
+ PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) {
+ this.node = node;
+ this.clusterMonitor = clusterMonitor;
+ }
+
+ @Override
+ public void handle(Pong pong) {
+ if (pong.badResponse()) {
+ clusterMonitor.failed(node, pong.error().get());
+ } else {
+ if (pong.activeDocuments().isPresent()) {
+ node.setActiveDocuments(pong.activeDocuments().get());
+ node.setBlockingWrites(pong.isBlockingWrites());
+ }
+ clusterMonitor.responded(node);
+ }
+ }
+
+ }
+
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
index 54c8c1e0522..abd7267bb04 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
@@ -23,7 +23,7 @@ public class MockSearchCluster extends SearchCluster {
private final int numNodesPerGroup;
private final ImmutableList<Group> orderedGroups;
private final ImmutableMap<Integer, Group> groups;
- private final ImmutableMultimap<String, Node> nodesByHost;
+ private final List<Node> nodes;
public MockSearchCluster(String clusterId, int groups, int nodesPerGroup) {
this(clusterId, createDispatchConfig(), groups, nodesPerGroup);
@@ -36,21 +36,22 @@ public class MockSearchCluster extends SearchCluster {
ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder();
ImmutableMultimap.Builder<String, Node> hostBuilder = ImmutableMultimap.builder();
int distributionKey = 0;
+ this.nodes = new ArrayList<>();
for (int group = 0; group < groups; group++) {
- List<Node> nodes = new ArrayList<>();
- for (int node = 0; node < nodesPerGroup; node++) {
- Node n = new Node(distributionKey, "host" + distributionKey, group);
- nodes.add(n);
- hostBuilder.put(n.hostname(), n);
+ List<Node> groupNodes = new ArrayList<>();
+ for (int i = 0; i < nodesPerGroup; i++) {
+ Node node = new Node(distributionKey, "host" + distributionKey, group);
+ nodes.add(node);
+ groupNodes.add(node);
+ hostBuilder.put(node.hostname(), node);
distributionKey++;
}
- Group g = new Group(group, nodes);
+ Group g = new Group(group, groupNodes);
groupBuilder.put(group, g);
orderedGroupBuilder.add(g);
}
this.orderedGroups = orderedGroupBuilder.build();
this.groups = groupBuilder.build();
- this.nodesByHost = hostBuilder.build();
this.numGroups = groups;
this.numNodesPerGroup = nodesPerGroup;
}
@@ -61,9 +62,7 @@ public class MockSearchCluster extends SearchCluster {
}
@Override
- public int size() {
- return numGroups * numNodesPerGroup;
- }
+ public List<Node> nodes() { return nodes; }
@Override
public ImmutableMap<Integer, Group> groups() {
@@ -71,9 +70,7 @@ public class MockSearchCluster extends SearchCluster {
}
@Override
- public int wantedGroupSize() {
- return numNodesPerGroup;
- }
+ public boolean allGroupsHaveSize1() { return numNodesPerGroup == 1;}
@Override
public int groupsWithSufficientCoverage() {