From 5c5be982848fb8a3f1d84fa522f380b8706e6ddb Mon Sep 17 00:00:00 2001 From: jonmv Date: Tue, 5 Sep 2023 10:47:23 +0200 Subject: Keep and reconfigure ClusterMonitor when reconfiguring dispatcher --- .../com/yahoo/search/dispatch/DispatcherTest.java | 28 +++++++++++++++++----- .../dispatch/searchcluster/SearchClusterTest.java | 12 +++++++++- 2 files changed, 33 insertions(+), 7 deletions(-) (limited to 'container-search/src/test/java/com') diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 3397638b950..d0f1f46d6ea 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -21,6 +21,7 @@ import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.searchchain.Execution; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.yolean.UncheckedInterruptedException; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -36,6 +37,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -181,7 +183,11 @@ public class DispatcherTest { pingPhasers.put(1, new Phaser(2)); pingPhasers.put(2, new Phaser(2)); + AtomicBoolean doPing = new AtomicBoolean(); + PingFactory pingFactory = (node, monitor, pongHandler) -> () -> { + try { while ( ! doPing.getAndSet(false)) { monitor.wait(1); } } // Need to avoid hogging monitor lock while waiting for phaser. + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } pingPhasers.get(node.key()).arriveAndAwaitAdvance(); pongHandler.handle(new Pong(2, 2)); pingPhasers.get(node.key()).arriveAndAwaitAdvance(); @@ -255,8 +261,8 @@ public class DispatcherTest { Dispatcher dispatcher = new Dispatcher(dispatchConfig, rpcPool, cluster, invokerFactories); ExecutorService executor = Executors.newFixedThreadPool(1); - // Set two groups with a single node each. The first cluster-monitor has nothing to do, and is shut down immediately. - // There are also no invokers, so the whole reconfiguration completes once the new cluster monitor has seen all nodes. + // Set two groups with a single node each. + // There are no invokers, so the whole reconfiguration completes once the cluster monitor has seen all the new nodes. Future reconfiguration = executor.submit(() -> { dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder() .node(new DispatchNodesConfig.Node.Builder().key(0).group(0).port(123).host("host0")) @@ -265,8 +271,10 @@ public class DispatcherTest { }); // Let pings return, to allow the search cluster to reconfigure. + doPing.set(true); pingPhasers.get(0).arriveAndAwaitAdvance(); pingPhasers.get(0).arriveAndAwaitAdvance(); + doPing.set(true); pingPhasers.get(1).arriveAndAwaitAdvance(); pingPhasers.get(1).arriveAndAwaitAdvance(); // We need to wait for the cluster to have at least one group, lest dispatch will fail below. @@ -287,9 +295,10 @@ public class DispatcherTest { search1.search(new Query(), null); // Wait for the current cluster monitor to be mid-ping-round. + doPing.set(true); pingPhasers.get(0).arriveAndAwaitAdvance(); - // Then reconfigure the dispatcher with new nodes, replacing node0 with node2. + // Reconfigure the dispatcher with new nodes, removing node0 and adding node2. reconfiguration = executor.submit(() -> { dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder() .node(new DispatchNodesConfig.Node.Builder().key(2).group(0).port(123).host("host2")) @@ -297,16 +306,23 @@ public class DispatcherTest { .build()); }); // Reconfiguration starts, but groups are only updated once the search cluster has knowledge about all of them. + pingPhasers.get(0).arriveAndAwaitAdvance(); // Ping for node to remove completes. + doPing.set(true); + pingPhasers.get(1).arriveAndAwaitAdvance(); // Ping for node to keep completes. pingPhasers.get(1).arriveAndAwaitAdvance(); + // New round of pings starts, with nodes 1 and 2. + doPing.set(true); pingPhasers.get(1).arriveAndAwaitAdvance(); - pingPhasers.get(2).arriveAndAwaitAdvance(); + pingPhasers.get(1).arriveAndAwaitAdvance(); + // Cluster has not yet updated its group reference. assertEquals(1, cluster.group(0).workingNodes()); // Node0 is still working. assertSame(node0, cluster.group(0).nodes().get(0)); + + doPing.set(true); + pingPhasers.get(2).arriveAndAwaitAdvance(); pingPhasers.get(2).arriveAndAwaitAdvance(); - // Old cluster monitor is waiting for that ping to complete before it can shut down, and let reconfiguration complete. - pingPhasers.get(0).arriveAndAwaitAdvance(); reconfiguration.get(); Node node2 = cluster.group(0).nodes().get(0); assertNotSame(node0, node2); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java index f6a1ca5cae3..109b9db2ce4 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java @@ -6,12 +6,14 @@ import com.yahoo.container.handler.ClustersStatus; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; +import com.yahoo.search.cluster.BaseNodeMonitor; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.result.ErrorMessage; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -384,6 +386,7 @@ public class SearchClusterTest { @Test void requireThatPreciselyTheRetainedNodesAreKeptWhenNodesAreUpdated() { try (State state = new State("query", 2, IntStream.range(0, 6).mapToObj(i -> "node-" + i).toList())) { + state.clusterMonitor.start(); List referenceNodes = List.of(new Node("test", 0, "node-0", 0), new Node("test", 1, "node-1", 0), new Node("test", 0, "node-2", 1), @@ -392,6 +395,7 @@ public class SearchClusterTest { new Node("test", 1, "node-5", 2)); SearchGroups oldGroups = state.searchCluster.groupList(); assertEquals(Set.copyOf(referenceNodes), oldGroups.nodes()); + List> oldMonitors = state.clusterMonitor.nodeMonitors(); List updatedNodes = List.of(new Node("test", 0, "node-1", 0), // Swap node-0 and node-1 new Node("test", 1, "node-0", 0), // Swap node-1 and node-0 @@ -399,7 +403,7 @@ public class SearchClusterTest { new Node("test", 1, "node-3", 1), new Node("test", 0, "node-2", 2), // Swap node-4 and node-2 new Node("test", 1, "node-6", 2)); // Replace node-6 - state.searchCluster.updateNodes(updatedNodes, 100.0); + state.searchCluster.updateNodes(updatedNodes, state.clusterMonitor, 100.0); SearchGroups newGroups = state.searchCluster.groupList(); assertEquals(Set.copyOf(updatedNodes), newGroups.nodes()); @@ -417,6 +421,12 @@ public class SearchClusterTest { for (Node node : updatedNodes) assertEquals(pathIndexWithinGroup[node.group()]++, newNodesByIdentity.get(node).pathIndex(), "search path index within group should follow updated node order for: " + node); + + // Precisely the one retained node keeps its monitor through reconfiguration. + Set> retainedMonitors = new HashSet<>(state.clusterMonitor.nodeMonitors()); + retainedMonitors.retainAll(oldMonitors); + assertEquals(1, retainedMonitors.size()); + assertSame(oldNodesByIdentity.get(updatedNodes.get(3)), retainedMonitors.iterator().next().getNode()); } } -- cgit v1.2.3