aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java131
1 files changed, 53 insertions, 78 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 36d7e7a85a9..f8c4627473d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -1,10 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.searchcluster;
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
import com.google.common.math.Quantiles;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.net.HostName;
@@ -32,11 +29,10 @@ public class SearchCluster implements NodeManager<Node> {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
private final DispatchConfig dispatchConfig;
- private final int size;
private final String clusterId;
- private final ImmutableMap<Integer, Group> groups;
- private final ImmutableMultimap<String, Node> nodesByHost;
- private final ImmutableList<Group> orderedGroups;
+ private final Map<Integer, Group> groups;
+ private final List<Group> orderedGroups;
+ private final List<Node> nodes;
private final VipStatus vipStatus;
private final PingFactory pingFactory;
private final TopKEstimator hitEstimator;
@@ -60,8 +56,7 @@ public class SearchCluster implements NodeManager<Node> {
this.vipStatus = vipStatus;
this.pingFactory = pingFactory;
- List<Node> nodes = toNodes(dispatchConfig);
- this.size = nodes.size();
+ this.nodes = toNodes(dispatchConfig);
// Create groups
ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
@@ -72,16 +67,10 @@ public class SearchCluster implements NodeManager<Node> {
this.groups = groupsBuilder.build();
LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>();
nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group())));
- this.orderedGroups = ImmutableList.<Group>builder().addAll(groupIntroductionOrder.values()).build();
+ this.orderedGroups = List.copyOf(groupIntroductionOrder.values());
- // Index nodes by host
- ImmutableMultimap.Builder<String, Node> nodesByHostBuilder = new ImmutableMultimap.Builder<>();
- for (Node node : nodes)
- nodesByHostBuilder.put(node.hostname(), node);
- this.nodesByHost = nodesByHostBuilder.build();
hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
-
- this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodesByHost, groups);
+ this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups);
}
@Override
@@ -95,13 +84,15 @@ public class SearchCluster implements NodeManager<Node> {
}
private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
- ImmutableMultimap<String, Node> nodesByHost,
- ImmutableMap<Integer, Group> groups) {
+ List<Node> nodes,
+ Map<Integer, Group> groups) {
// A search node in the search cluster in question is configured on the same host as the currently running container.
// It has all the data <==> No other nodes in the search cluster have the same group id as this node.
// That local search node responds.
// The search cluster to be searched has at least as many nodes as the container cluster we're running in.
- ImmutableCollection<Node> localSearchNodes = nodesByHost.get(selfHostname);
+ List<Node> localSearchNodes = nodes.stream()
+ .filter(node -> node.hostname().equals(selfHostname))
+ .collect(Collectors.toList());
// Only use direct dispatch if we have exactly 1 search node on the same machine:
if (localSearchNodes.size() != 1) return Optional.empty();
@@ -114,25 +105,24 @@ public class SearchCluster implements NodeManager<Node> {
return Optional.of(localSearchNode);
}
- private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) {
- ImmutableList.Builder<Node> nodesBuilder = new ImmutableList.Builder<>();
- for (DispatchConfig.Node node : dispatchConfig.node())
- nodesBuilder.add(new Node(node.key(), node.host(), node.group()));
- return nodesBuilder.build();
+ private static List<Node> toNodes(DispatchConfig dispatchConfig) {
+ return dispatchConfig.node().stream()
+ .map(n -> new Node(n.key(), n.host(), n.group()))
+ .collect(Collectors.toUnmodifiableList());
}
public DispatchConfig dispatchConfig() {
return dispatchConfig;
}
- /** Returns the number of nodes in this cluster (across all groups) */
- public int size() { return size; }
+ /** Returns an immutable list of all nodes in this. */
+ public List<Node> nodes() { return nodes; }
/** Returns the groups of this cluster as an immutable map indexed by group id */
- public ImmutableMap<Integer, Group> groups() { return groups; }
+ public Map<Integer, Group> groups() { return groups; }
/** Returns the groups of this cluster as an immutable list in introduction order */
- public ImmutableList<Group> orderedGroups() { return orderedGroups; }
+ public List<Group> orderedGroups() { return orderedGroups; }
/** Returns the n'th (zero-indexed) group in the cluster if possible */
public Optional<Group> group(int n) {
@@ -143,23 +133,12 @@ public class SearchCluster implements NodeManager<Node> {
}
}
- /**
- * Returns the wanted number of nodes per group - size()/groups.size().
- * The actual node count for a given group may differ due to node retirements.
- */
- public int wantedGroupSize() {
- if (groups().size() == 0) return size();
- return size() / groups().size();
+ public boolean allGroupsHaveSize1() {
+ return nodes.size() == groups.size();
}
public int groupsWithSufficientCoverage() {
- int covered = 0;
- for (Group g : orderedGroups()) {
- if (g.hasSufficientCoverage()) {
- covered++;
- }
- }
- return covered;
+ return (int)groups.values().stream().filter(g -> g.hasSufficientCoverage()).count();
}
/**
@@ -210,8 +189,8 @@ public class SearchCluster implements NodeManager<Node> {
}
else if (usesLocalCorpusIn(node)) { // follow the status of this node
// Do not take this out of rotation if we're a combined cluster of size 1,
- // as that can't be helpful, and leads to a deadlock where this node is never taken back in servic e
- if (nodeIsWorking || size() > 1)
+ // as that can't be helpful, and leads to a deadlock where this node is never set back in service
+ if (nodeIsWorking || nodes.size() > 1)
setInRotationOnlyIf(nodeIsWorking);
}
}
@@ -240,11 +219,11 @@ public class SearchCluster implements NodeManager<Node> {
}
public boolean hasInformationAboutAllNodes() {
- return nodesByHost.values().stream().allMatch(node -> node.isWorking() != null);
+ return nodes.stream().allMatch(node -> node.isWorking() != null);
}
private boolean hasWorkingNodes() {
- return nodesByHost.values().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE );
+ return nodes.stream().anyMatch(node -> node.isWorking() != Boolean.FALSE );
}
private boolean usesLocalCorpusIn(Node node) {
@@ -255,31 +234,6 @@ public class SearchCluster implements NodeManager<Node> {
return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id();
}
- private static class PongCallback implements PongHandler {
-
- private final ClusterMonitor<Node> clusterMonitor;
- private final Node node;
-
- PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) {
- this.node = node;
- this.clusterMonitor = clusterMonitor;
- }
-
- @Override
- public void handle(Pong pong) {
- if (pong.badResponse()) {
- clusterMonitor.failed(node, pong.error().get());
- } else {
- if (pong.activeDocuments().isPresent()) {
- node.setActiveDocuments(pong.activeDocuments().get());
- node.setBlockingWrites(pong.isBlockingWrites());
- }
- clusterMonitor.responded(node);
- }
- }
-
- }
-
/** Used by the cluster monitor to manage node status */
@Override
public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) {
@@ -293,19 +247,15 @@ public class SearchCluster implements NodeManager<Node> {
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
updateSufficientCoverage(group, true);
- boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(),
- group.activeDocuments());
+ boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), group.activeDocuments());
trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments());
}
private void pingIterationCompletedMultipleGroups() {
orderedGroups().forEach(Group::aggregateNodeValues);
long medianDocuments = medianDocumentsPerGroup();
- boolean anyGroupsSufficientCoverage = false;
for (Group group : orderedGroups()) {
- boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(),
- medianDocuments);
- anyGroupsSufficientCoverage = anyGroupsSufficientCoverage || sufficientCoverage;
+ boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments);
updateSufficientCoverage(group, sufficientCoverage);
trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments);
}
@@ -372,4 +322,29 @@ public class SearchCluster implements NodeManager<Node> {
}
}
+ private static class PongCallback implements PongHandler {
+
+ private final ClusterMonitor<Node> clusterMonitor;
+ private final Node node;
+
+ PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) {
+ this.node = node;
+ this.clusterMonitor = clusterMonitor;
+ }
+
+ @Override
+ public void handle(Pong pong) {
+ if (pong.badResponse()) {
+ clusterMonitor.failed(node, pong.error().get());
+ } else {
+ if (pong.activeDocuments().isPresent()) {
+ node.setActiveDocuments(pong.activeDocuments().get());
+ node.setBlockingWrites(pong.isBlockingWrites());
+ }
+ clusterMonitor.responded(node);
+ }
+ }
+
+ }
+
}