diff options
author | jonmv <venstad@gmail.com> | 2023-09-05 10:47:23 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-09-05 11:01:15 +0200 |
commit | 5c5be982848fb8a3f1d84fa522f380b8706e6ddb (patch) | |
tree | 87879722d4763785b73648b6778bb3b428b5f555 /container-search | |
parent | 932a5311bf7acfc9bad8e45be39cec5540b0a692 (diff) |
Keep and reconfigure ClusterMonitor when reconfiguring dispatcher
Diffstat (limited to 'container-search')
5 files changed, 113 insertions, 47 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 332bf4ea2c4..d81f9079a02 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -3,16 +3,22 @@ package com.yahoo.search.cluster; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.yolean.UncheckedInterruptedException; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,7 +42,16 @@ public class ClusterMonitor<T> { private final AtomicBoolean closed = new AtomicBoolean(false); /** A map from Node to corresponding MonitoredNode */ - private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); + private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap<>()); + + // Used during reconfiguration to ensure async RPC calls are complete. + private final Set<T> nodesToRemove = new LinkedHashSet<>(); + + // Used during reconfiguration to ensure all nodes have data. + private final Set<T> nodesToUpdate = new LinkedHashSet<>(); + + // Used for reconfiguration, and during shutdown. + private boolean skipNextWait = false; public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) { nodeManager = manager; @@ -46,6 +61,22 @@ public class ClusterMonitor<T> { } } + /** Updates the monitored set of nodes, and waits for 1. data on new nodes, and 2. RPC completion of removed nodes. */ + public synchronized void reconfigure(Collection<T> nodes) { + if ( ! monitorThread.isAlive()) throw new IllegalStateException("monitor thread must be alive for reconfiguration"); + + nodesToUpdate.addAll(nodes); + nodesToRemove.addAll(nodeMonitors.keySet()); + nodesToRemove.removeAll(nodes); + for (T node : nodes) if ( ! nodeMonitors.containsKey(node)) add(node, true); + + synchronized (nodeManager) { skipNextWait = true; nodeManager.notifyAll(); } + try { while ( ! nodesToRemove.isEmpty() || ! nodesToUpdate.isEmpty()) wait(1); } + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } + + nodeManager.pingIterationCompleted(); + } + public void start() { if ( ! monitorThread.isAlive()) { monitorThread.start(); @@ -74,30 +105,48 @@ public class ClusterMonitor<T> { /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { - if (closed.get()) return; // Do not touch state if close has started. - TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); - Boolean wasWorking = monitor.isKnownWorking(); - monitor.failed(error); - if (wasWorking != monitor.isKnownWorking()) - nodeManager.failed(node); + updateMonitoredNode(node, monitor -> monitor.failed(error), nodeManager::failed); } /** Called when a node responded */ public synchronized void responded(T node) { - if (closed.get()) return; // Do not touch state if close has started. + updateMonitoredNode(node, TrafficNodeMonitor::responded, nodeManager::working); + } + + private void updateMonitoredNode(T node, Consumer<TrafficNodeMonitor<T>> monitorUpdate, Consumer<T> nodeUpdate) { TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); - Boolean wasWorking = monitor.isKnownWorking(); - monitor.responded(); - if (wasWorking != monitor.isKnownWorking()) - nodeManager.working(node); + + // Don't touch state during shutdown. + if (closed.get()) monitor = null; + + // Node was removed during reconfiguration, and should no longer be monitored. + if (nodesToRemove.remove(node)) { + nodeMonitors.remove(node); + monitor = null; + } + + // Update monitor state only when it actually changes. + if (monitor != null) { + Boolean wasWorking = monitor.isKnownWorking(); + monitorUpdate.accept(monitor); + if (wasWorking != monitor.isKnownWorking()) + nodeUpdate.accept(node); + } + + // If the node was added in a recent reconfiguration, we now have its required data. + nodesToUpdate.remove(node); } /** * Ping all nodes which needs pinging to discover state changes */ - public void ping(Executor executor) { + public synchronized void ping(Executor executor) { for (var monitor : nodeMonitors()) { if (closed.get()) return; // Do nothing to change state if close has started. + if (nodesToRemove.remove(monitor.getNode())) { + nodeMonitors.remove(monitor.getNode()); + continue; + } nodeManager.ping(this, monitor.getNode(), executor); } nodeManager.pingIterationCompleted(); @@ -120,6 +169,7 @@ public class ClusterMonitor<T> { nodeMonitors.clear(); } synchronized (nodeManager) { + skipNextWait = true; nodeManager.notifyAll(); } try { @@ -148,7 +198,9 @@ public class ClusterMonitor<T> { log.finest("Activating ping"); ping(pingExecutor); synchronized (nodeManager) { - nodeManager.wait(configuration.getCheckInterval()); + if ( ! skipNextWait) + nodeManager.wait(configuration.getCheckInterval()); + skipNextWait = false; } } catch (Throwable e) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index eca0c8058a1..43d0e08886d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -60,20 +60,19 @@ public class Dispatcher extends AbstractComponent { private final DispatchConfig dispatchConfig; private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; + private final ClusterMonitor<Node> clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; - final ClusterMonitor<Node> clusterMonitor; final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. Runnable cleanup = () -> { }; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) { + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; - this.clusterMonitor = clusterMonitor; } private void countDown() { @@ -128,7 +127,7 @@ public class Dispatcher extends AbstractComponent { Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); - this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it. + this.clusterMonitor.start(); // Populate nodes to monitor before starting it. } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, @@ -137,7 +136,8 @@ public class Dispatcher extends AbstractComponent { this.rpcResourcePool = rpcConnectionPool; this.searchCluster = searchCluster; this.invokerFactories = invokerFactories; - this.volatileItems = update(clusterMonitor); + this.clusterMonitor = clusterMonitor; + this.volatileItems = update(); searchCluster.addMonitoring(clusterMonitor); } @@ -171,7 +171,7 @@ public class Dispatcher extends AbstractComponent { * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; - * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated. * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible @@ -192,21 +192,16 @@ public class Dispatcher extends AbstractComponent { }; // Update the nodes the search cluster keeps track of, and what nodes are monitored. - ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), dispatchConfig.minActivedocsPercentage()); + searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. - this.volatileItems = update(newMonitor); - - // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. - items.get().clusterMonitor.shutdown(); + this.volatileItems = update(); } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { - var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), - clusterMonitor); - return items; + private VolatileItems update() { + return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig)); } private void initialWarmup(double warmupTime) { @@ -255,7 +250,7 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - volatileItems.clusterMonitor.shutdown(); + clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 59b4637a627..f7a77ebf963 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -6,7 +6,6 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.yolean.UncheckedInterruptedException; import java.util.ArrayList; import java.util.Collection; @@ -62,7 +61,7 @@ public class SearchCluster implements NodeManager<Node> { public String name() { return clusterId; } /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */ - public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { + public void updateNodes(Collection<Node> newNodes, ClusterMonitor<Node> monitor, double minActivedocsPercentage) { List<Node> currentNodes = new ArrayList<>(newNodes); List<Node> addedNodes = new ArrayList<>(); Map<Node, Node> retainedNodes = groups.nodes().stream().collect(toMap(node -> node, node -> node)); @@ -72,15 +71,9 @@ public class SearchCluster implements NodeManager<Node> { else addedNodes.add(currentNodes.get(i)); } SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage); - ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false); - for (Node node : groups.nodes()) monitor.add(node, true); - monitor.start(); - try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } } - catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } - pingIterationCompleted(groups); this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups); + monitor.reconfigure(groups.nodes()); this.groups = groups; - return monitor; } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 3397638b950..d0f1f46d6ea 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -21,6 +21,7 @@ import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.searchchain.Execution; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.yolean.UncheckedInterruptedException; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -36,6 +37,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -181,7 +183,11 @@ public class DispatcherTest { pingPhasers.put(1, new Phaser(2)); pingPhasers.put(2, new Phaser(2)); + AtomicBoolean doPing = new AtomicBoolean(); + PingFactory pingFactory = (node, monitor, pongHandler) -> () -> { + try { while ( ! doPing.getAndSet(false)) { monitor.wait(1); } } // Need to avoid hogging monitor lock while waiting for phaser. + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } pingPhasers.get(node.key()).arriveAndAwaitAdvance(); pongHandler.handle(new Pong(2, 2)); pingPhasers.get(node.key()).arriveAndAwaitAdvance(); @@ -255,8 +261,8 @@ public class DispatcherTest { Dispatcher dispatcher = new Dispatcher(dispatchConfig, rpcPool, cluster, invokerFactories); ExecutorService executor = Executors.newFixedThreadPool(1); - // Set two groups with a single node each. The first cluster-monitor has nothing to do, and is shut down immediately. - // There are also no invokers, so the whole reconfiguration completes once the new cluster monitor has seen all nodes. + // Set two groups with a single node each. + // There are no invokers, so the whole reconfiguration completes once the cluster monitor has seen all the new nodes. Future<?> reconfiguration = executor.submit(() -> { dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder() .node(new DispatchNodesConfig.Node.Builder().key(0).group(0).port(123).host("host0")) @@ -265,8 +271,10 @@ public class DispatcherTest { }); // Let pings return, to allow the search cluster to reconfigure. + doPing.set(true); pingPhasers.get(0).arriveAndAwaitAdvance(); pingPhasers.get(0).arriveAndAwaitAdvance(); + doPing.set(true); pingPhasers.get(1).arriveAndAwaitAdvance(); pingPhasers.get(1).arriveAndAwaitAdvance(); // We need to wait for the cluster to have at least one group, lest dispatch will fail below. @@ -287,9 +295,10 @@ public class DispatcherTest { search1.search(new Query(), null); // Wait for the current cluster monitor to be mid-ping-round. + doPing.set(true); pingPhasers.get(0).arriveAndAwaitAdvance(); - // Then reconfigure the dispatcher with new nodes, replacing node0 with node2. + // Reconfigure the dispatcher with new nodes, removing node0 and adding node2. reconfiguration = executor.submit(() -> { dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder() .node(new DispatchNodesConfig.Node.Builder().key(2).group(0).port(123).host("host2")) @@ -297,16 +306,23 @@ public class DispatcherTest { .build()); }); // Reconfiguration starts, but groups are only updated once the search cluster has knowledge about all of them. + pingPhasers.get(0).arriveAndAwaitAdvance(); // Ping for node to remove completes. + doPing.set(true); + pingPhasers.get(1).arriveAndAwaitAdvance(); // Ping for node to keep completes. pingPhasers.get(1).arriveAndAwaitAdvance(); + // New round of pings starts, with nodes 1 and 2. + doPing.set(true); pingPhasers.get(1).arriveAndAwaitAdvance(); - pingPhasers.get(2).arriveAndAwaitAdvance(); + pingPhasers.get(1).arriveAndAwaitAdvance(); + // Cluster has not yet updated its group reference. assertEquals(1, cluster.group(0).workingNodes()); // Node0 is still working. assertSame(node0, cluster.group(0).nodes().get(0)); + + doPing.set(true); + pingPhasers.get(2).arriveAndAwaitAdvance(); pingPhasers.get(2).arriveAndAwaitAdvance(); - // Old cluster monitor is waiting for that ping to complete before it can shut down, and let reconfiguration complete. - pingPhasers.get(0).arriveAndAwaitAdvance(); reconfiguration.get(); Node node2 = cluster.group(0).nodes().get(0); assertNotSame(node0, node2); diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java index f6a1ca5cae3..109b9db2ce4 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java @@ -6,12 +6,14 @@ import com.yahoo.container.handler.ClustersStatus; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; +import com.yahoo.search.cluster.BaseNodeMonitor; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.result.ErrorMessage; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -384,6 +386,7 @@ public class SearchClusterTest { @Test void requireThatPreciselyTheRetainedNodesAreKeptWhenNodesAreUpdated() { try (State state = new State("query", 2, IntStream.range(0, 6).mapToObj(i -> "node-" + i).toList())) { + state.clusterMonitor.start(); List<Node> referenceNodes = List.of(new Node("test", 0, "node-0", 0), new Node("test", 1, "node-1", 0), new Node("test", 0, "node-2", 1), @@ -392,6 +395,7 @@ public class SearchClusterTest { new Node("test", 1, "node-5", 2)); SearchGroups oldGroups = state.searchCluster.groupList(); assertEquals(Set.copyOf(referenceNodes), oldGroups.nodes()); + List<BaseNodeMonitor<Node>> oldMonitors = state.clusterMonitor.nodeMonitors(); List<Node> updatedNodes = List.of(new Node("test", 0, "node-1", 0), // Swap node-0 and node-1 new Node("test", 1, "node-0", 0), // Swap node-1 and node-0 @@ -399,7 +403,7 @@ public class SearchClusterTest { new Node("test", 1, "node-3", 1), new Node("test", 0, "node-2", 2), // Swap node-4 and node-2 new Node("test", 1, "node-6", 2)); // Replace node-6 - state.searchCluster.updateNodes(updatedNodes, 100.0); + state.searchCluster.updateNodes(updatedNodes, state.clusterMonitor, 100.0); SearchGroups newGroups = state.searchCluster.groupList(); assertEquals(Set.copyOf(updatedNodes), newGroups.nodes()); @@ -417,6 +421,12 @@ public class SearchClusterTest { for (Node node : updatedNodes) assertEquals(pathIndexWithinGroup[node.group()]++, newNodesByIdentity.get(node).pathIndex(), "search path index within group should follow updated node order for: " + node); + + // Precisely the one retained node keeps its monitor through reconfiguration. + Set<BaseNodeMonitor<Node>> retainedMonitors = new HashSet<>(state.clusterMonitor.nodeMonitors()); + retainedMonitors.retainAll(oldMonitors); + assertEquals(1, retainedMonitors.size()); + assertSame(oldNodesByIdentity.get(updatedNodes.get(3)), retainedMonitors.iterator().next().getNode()); } } |