diff options
author | jonmv <venstad@gmail.com> | 2023-07-13 17:45:19 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-07-13 17:45:19 +0200 |
commit | 37c552bceaa211a226571b5674339325ad20c61f (patch) | |
tree | ac4997eabfb179cea7ed1832ea69f08face6b509 /container-search | |
parent | bcdf634ebaf2b16e2f64937e453d37067750d172 (diff) |
Get info about all nodes before updating search groups reference
Diffstat (limited to 'container-search')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 9 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 32 |
2 files changed, 28 insertions, 13 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 3b686da35f6..bae935bb783 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -127,7 +127,8 @@ public class Dispatcher extends AbstractComponent { Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { - this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, true), invokerFactories); + this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); + this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it. } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, @@ -137,6 +138,7 @@ public class Dispatcher extends AbstractComponent { this.searchCluster = searchCluster; this.invokerFactories = invokerFactories; this.volatileItems = update(clusterMonitor); + searchCluster.addMonitoring(clusterMonitor); } /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ @@ -190,10 +192,10 @@ public class Dispatcher extends AbstractComponent { }; // Update the nodes the search cluster keeps track of, and what nodes are monitored. - searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); + ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. - this.volatileItems = update(new ClusterMonitor<>(searchCluster, true)); + this.volatileItems = update(newMonitor); // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. items.get().clusterMonitor.shutdown(); @@ -204,7 +206,6 @@ public class Dispatcher extends AbstractComponent { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), clusterMonitor); - searchCluster.addMonitoring(clusterMonitor); return items; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 8204de77bce..3c8950f1f7f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -6,6 +6,7 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; +import com.yahoo.yolean.UncheckedInterruptedException; import java.util.Collection; import java.util.HashMap; @@ -60,13 +61,22 @@ public class SearchCluster implements NodeManager<Node> { public String name() { return clusterId; } /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */ - public void updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { + public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { Collection<Node> retainedNodes = groups.nodes(); Collection<Node> currentNodes = new HashSet<>(newNodes); retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set. currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object. + Collection<Node> addedNodes = List.copyOf(currentNodes); currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set. - groups = toGroups(currentNodes, minActivedocsPercentage); + SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage); + ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false); + for (Node node : groups.nodes()) monitor.add(node, true); + monitor.start(); + try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } } + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } + pingIterationCompleted(groups); + this.groups = groups; + return monitor; } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { @@ -208,8 +218,8 @@ public class SearchCluster implements NodeManager<Node> { pinger.ping(); } - private void pingIterationCompletedSingleGroup() { - Group group = groups().iterator().next(); + private void pingIterationCompletedSingleGroup(SearchGroupsImpl groups) { + Group group = groups.groups().iterator().next(); group.aggregateNodeValues(); // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. @@ -218,10 +228,10 @@ public class SearchCluster implements NodeManager<Node> { trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments()); } - private void pingIterationCompletedMultipleGroups() { - groups().forEach(Group::aggregateNodeValues); + private void pingIterationCompletedMultipleGroups(SearchGroupsImpl groups) { + groups.groups().forEach(Group::aggregateNodeValues); long medianDocuments = groups.medianDocumentsPerGroup(); - for (Group group : groups()) { + for (Group group : groups.groups()) { boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); @@ -235,10 +245,14 @@ public class SearchCluster implements NodeManager<Node> { */ @Override public void pingIterationCompleted() { + pingIterationCompleted(groups); + } + + private void pingIterationCompleted(SearchGroupsImpl groups) { if (groups.size() == 1) { - pingIterationCompletedSingleGroup(); + pingIterationCompletedSingleGroup(groups); } else { - pingIterationCompletedMultipleGroups(); + pingIterationCompletedMultipleGroups(groups); } } |