aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-07-13 17:45:19 +0200
committerjonmv <venstad@gmail.com>2023-07-13 17:45:19 +0200
commit37c552bceaa211a226571b5674339325ad20c61f (patch)
treeac4997eabfb179cea7ed1832ea69f08face6b509 /container-search
parentbcdf634ebaf2b16e2f64937e453d37067750d172 (diff)
Get info about all nodes before updating search groups reference
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java32
2 files changed, 28 insertions, 13 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 3b686da35f6..bae935bb783 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -127,7 +127,8 @@ public class Dispatcher extends AbstractComponent {
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
- this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, true), invokerFactories);
+ this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories);
+ this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it.
}
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
@@ -137,6 +138,7 @@ public class Dispatcher extends AbstractComponent {
this.searchCluster = searchCluster;
this.invokerFactories = invokerFactories;
this.volatileItems = update(clusterMonitor);
+ searchCluster.addMonitoring(clusterMonitor);
}
/* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */
@@ -190,10 +192,10 @@ public class Dispatcher extends AbstractComponent {
};
// Update the nodes the search cluster keeps track of, and what nodes are monitored.
- searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
+ ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
// Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
- this.volatileItems = update(new ClusterMonitor<>(searchCluster, true));
+ this.volatileItems = update(newMonitor);
// Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
items.get().clusterMonitor.shutdown();
@@ -204,7 +206,6 @@ public class Dispatcher extends AbstractComponent {
var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
clusterMonitor);
- searchCluster.addMonitoring(clusterMonitor);
return items;
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 8204de77bce..3c8950f1f7f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -6,6 +6,7 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
+import com.yahoo.yolean.UncheckedInterruptedException;
import java.util.Collection;
import java.util.HashMap;
@@ -60,13 +61,22 @@ public class SearchCluster implements NodeManager<Node> {
public String name() { return clusterId; }
/** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */
- public void updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) {
+ public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) {
Collection<Node> retainedNodes = groups.nodes();
Collection<Node> currentNodes = new HashSet<>(newNodes);
retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set.
currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object.
+ Collection<Node> addedNodes = List.copyOf(currentNodes);
currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set.
- groups = toGroups(currentNodes, minActivedocsPercentage);
+ SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage);
+ ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false);
+ for (Node node : groups.nodes()) monitor.add(node, true);
+ monitor.start();
+ try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } }
+ catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
+ pingIterationCompleted(groups);
+ this.groups = groups;
+ return monitor;
}
public void addMonitoring(ClusterMonitor<Node> clusterMonitor) {
@@ -208,8 +218,8 @@ public class SearchCluster implements NodeManager<Node> {
pinger.ping();
}
- private void pingIterationCompletedSingleGroup() {
- Group group = groups().iterator().next();
+ private void pingIterationCompletedSingleGroup(SearchGroupsImpl groups) {
+ Group group = groups.groups().iterator().next();
group.aggregateNodeValues();
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
@@ -218,10 +228,10 @@ public class SearchCluster implements NodeManager<Node> {
trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments());
}
- private void pingIterationCompletedMultipleGroups() {
- groups().forEach(Group::aggregateNodeValues);
+ private void pingIterationCompletedMultipleGroups(SearchGroupsImpl groups) {
+ groups.groups().forEach(Group::aggregateNodeValues);
long medianDocuments = groups.medianDocumentsPerGroup();
- for (Group group : groups()) {
+ for (Group group : groups.groups()) {
boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments);
updateSufficientCoverage(group, sufficientCoverage);
trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments);
@@ -235,10 +245,14 @@ public class SearchCluster implements NodeManager<Node> {
*/
@Override
public void pingIterationCompleted() {
+ pingIterationCompleted(groups);
+ }
+
+ private void pingIterationCompleted(SearchGroupsImpl groups) {
if (groups.size() == 1) {
- pingIterationCompletedSingleGroup();
+ pingIterationCompletedSingleGroup(groups);
} else {
- pingIterationCompletedMultipleGroups();
+ pingIterationCompletedMultipleGroups(groups);
}
}