aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java109
1 files changed, 50 insertions, 59 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index ca2fce0b32b..bedfa965229 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -8,13 +8,13 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
-import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
-import java.util.LinkedHashMap;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -24,7 +24,7 @@ import java.util.stream.Collectors;
*
* @author bratseth
*/
-public class SearchCluster implements NodeManager<Node> {
+public class SearchCluster implements NodeManager<Node>, GroupList {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
@@ -32,9 +32,7 @@ public class SearchCluster implements NodeManager<Node> {
private final String clusterId;
private final VipStatus vipStatus;
private final PingFactory pingFactory;
- private final Map<Integer, Group> groups;
- private final List<Group> orderedGroups;
- private final List<Node> nodes;
+ private final GroupList groups;
private long nextLogTime = 0;
/**
@@ -47,55 +45,48 @@ public class SearchCluster implements NodeManager<Node> {
*/
private final Node localCorpusDispatchTarget;
- public SearchCluster(String clusterId, DispatchConfig dispatchConfig,
+ public SearchCluster(String clusterId, double minActivedocsPercentage,
DispatchNodesConfig nodesConfig,
VipStatus vipStatus, PingFactory pingFactory) {
+ this(clusterId, minActivedocsPercentage, toNodes(nodesConfig), vipStatus, pingFactory);
+ }
+ public SearchCluster(String clusterId, double minActivedocsPercentage, List<Node> nodes,
+ VipStatus vipStatus, PingFactory pingFactory) {
+ this(clusterId, minActivedocsPercentage, toGroups(nodes), vipStatus, pingFactory);
+ }
+ public SearchCluster(String clusterId, double minActivedocsPercentage, GroupList groups,
+ VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
- this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage();
+ this.minActivedocsPercentage = minActivedocsPercentage;
this.vipStatus = vipStatus;
this.pingFactory = pingFactory;
-
- this.nodes = toNodes(nodesConfig);
-
- // Create groups
- ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
- for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) {
- Group g = new Group(group.getKey(), group.getValue());
- groupsBuilder.put(group.getKey(), g);
- }
- this.groups = groupsBuilder.build();
- LinkedHashMap<Integer, Group> groupIntroductionOrder = new LinkedHashMap<>();
- nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group())));
- this.orderedGroups = List.copyOf(groupIntroductionOrder.values());
-
- this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups);
+ this.groups = groups;
+ this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups);
}
@Override
public String name() { return clusterId; }
public void addMonitoring(ClusterMonitor<Node> clusterMonitor) {
- for (var group : orderedGroups()) {
+ for (var group : groups()) {
for (var node : group.nodes())
clusterMonitor.add(node, true);
}
}
- private static Node findLocalCorpusDispatchTarget(String selfHostname,
- List<Node> nodes,
- Map<Integer, Group> groups) {
+ private static Node findLocalCorpusDispatchTarget(String selfHostname, GroupList groups) {
// A search node in the search cluster in question is configured on the same host as the currently running container.
// It has all the data <==> No other nodes in the search cluster have the same group id as this node.
// That local search node responds.
// The search cluster to be searched has at least as many nodes as the container cluster we're running in.
- List<Node> localSearchNodes = nodes.stream()
+ List<Node> localSearchNodes = groups.groups().stream().flatMap(g -> g.nodes().stream())
.filter(node -> node.hostname().equals(selfHostname))
.toList();
// Only use direct dispatch if we have exactly 1 search node on the same machine:
if (localSearchNodes.size() != 1) return null;
Node localSearchNode = localSearchNodes.iterator().next();
- Group localSearchGroup = groups.get(localSearchNode.group());
+ Group localSearchGroup = groups.group(localSearchNode.group());
// Only use direct dispatch if the local search node has the entire corpus
if (localSearchGroup.nodes().size() != 1) return null;
@@ -109,27 +100,30 @@ public class SearchCluster implements NodeManager<Node> {
.toList();
}
- /** Returns the groups of this cluster as an immutable map indexed by group id */
- public Map<Integer, Group> groups() { return groups; }
-
- /** Returns the groups of this cluster as an immutable list in introduction order */
- public List<Group> orderedGroups() { return orderedGroups; }
-
- /** Returns the n'th (zero-indexed) group in the cluster if possible */
- public Optional<Group> group(int n) {
- if (orderedGroups().size() > n) {
- return Optional.of(orderedGroups().get(n));
- } else {
- return Optional.empty();
+ private static GroupList toGroups(Collection<Node> nodes) {
+ ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
+ for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) {
+ Group g = new Group(group.getKey(), group.getValue());
+ groupsBuilder.put(group.getKey(), g);
}
+ return new GroupListImpl(groupsBuilder.build());
}
- public boolean allGroupsHaveSize1() {
- return nodes.size() == groups.size();
+ @Override
+ public Group group(int id) {
+ return groups.group(id);
}
+ @Override
+ public Set<Integer> groupKeys() { return groups.groupKeys(); }
+ @Override
+ public Collection<Group> groups() { return groups.groups(); }
+ @Override
+ public int numGroups() { return groups.numGroups(); }
+
+ public boolean allGroupsHaveSize1() { return groups().stream().allMatch(g -> g.nodes().size() == 1); }
public int groupsWithSufficientCoverage() {
- return (int)groups.values().stream().filter(g -> g.hasSufficientCoverage()).count();
+ return (int)groups().stream().filter(Group::hasSufficientCoverage).count();
}
/**
@@ -140,7 +134,7 @@ public class SearchCluster implements NodeManager<Node> {
if ( localCorpusDispatchTarget == null) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
- Group localSearchGroup = groups().get(localCorpusDispatchTarget.group());
+ Group localSearchGroup = group(localCorpusDispatchTarget.group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is not down
@@ -181,7 +175,7 @@ public class SearchCluster implements NodeManager<Node> {
else if (usesLocalCorpusIn(node)) { // follow the status of this node
// Do not take this out of rotation if we're a combined cluster of size 1,
// as that can't be helpful, and leads to a deadlock where this node is never set back in service
- if (nodeIsWorking || nodes.size() > 1)
+ if (nodeIsWorking || groups().stream().map(Group::nodes).count() > 1)
setInRotationOnlyIf(nodeIsWorking);
}
}
@@ -203,11 +197,11 @@ public class SearchCluster implements NodeManager<Node> {
}
public boolean hasInformationAboutAllNodes() {
- return nodes.stream().allMatch(node -> node.isWorking() != null);
+ return groups().stream().allMatch(g -> g.nodes().stream().allMatch(node -> node.isWorking() != null));
}
private boolean hasWorkingNodes() {
- return nodes.stream().anyMatch(node -> node.isWorking() != Boolean.FALSE );
+ return groups().stream().anyMatch(g -> g.nodes().stream().anyMatch(node -> node.isWorking() != Boolean.FALSE));
}
private boolean usesLocalCorpusIn(Node node) {
@@ -226,7 +220,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void pingIterationCompletedSingleGroup() {
- Group group = groups().values().iterator().next();
+ Group group = groups().iterator().next();
group.aggregateNodeValues();
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
@@ -236,9 +230,9 @@ public class SearchCluster implements NodeManager<Node> {
}
private void pingIterationCompletedMultipleGroups() {
- orderedGroups().forEach(Group::aggregateNodeValues);
+ groups().forEach(Group::aggregateNodeValues);
long medianDocuments = medianDocumentsPerGroup();
- for (Group group : orderedGroups()) {
+ for (Group group : groups()) {
boolean sufficientCoverage = isGroupCoverageSufficient(group.activeDocuments(), medianDocuments);
updateSufficientCoverage(group, sufficientCoverage);
trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments);
@@ -246,8 +240,8 @@ public class SearchCluster implements NodeManager<Node> {
}
private long medianDocumentsPerGroup() {
- if (orderedGroups().isEmpty()) return 0;
- var activeDocuments = orderedGroups().stream().map(Group::activeDocuments).collect(Collectors.toList());
+ if (isEmpty()) return 0;
+ var activeDocuments = groups().stream().map(Group::activeDocuments).collect(Collectors.toList());
return (long)Quantiles.median().compute(activeDocuments);
}
@@ -258,8 +252,7 @@ public class SearchCluster implements NodeManager<Node> {
*/
@Override
public void pingIterationCompleted() {
- int numGroups = orderedGroups().size();
- if (numGroups == 1) {
+ if (numGroups() == 1) {
pingIterationCompletedSingleGroup();
} else {
pingIterationCompletedMultipleGroups();
@@ -268,16 +261,14 @@ public class SearchCluster implements NodeManager<Node> {
private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) {
double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments;
- if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage)
- return false;
- return true;
+ return ! (medianDocuments > 0 && documentCoverage < minActivedocsPercentage);
}
/**
* Calculate whether a subset of nodes in a group has enough coverage
*/
public boolean isPartialGroupCoverageSufficient(List<Node> nodes) {
- if (orderedGroups().size() == 1)
+ if (numGroups() == 1)
return true;
long activeDocuments = nodes.stream().mapToLong(Node::getActiveDocuments).sum();
return isGroupCoverageSufficient(activeDocuments, medianDocumentsPerGroup());