summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-09-05 10:47:23 +0200
committerjonmv <venstad@gmail.com>2023-09-05 11:01:15 +0200
commit5c5be982848fb8a3f1d84fa522f380b8706e6ddb (patch)
tree87879722d4763785b73648b6778bb3b428b5f555 /container-search
parent932a5311bf7acfc9bad8e45be39cec5540b0a692 (diff)
Keep and reconfigure ClusterMonitor when reconfiguring dispatcher
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java80
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java29
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java11
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java28
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java12
5 files changed, 113 insertions, 47 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index 332bf4ea2c4..d81f9079a02 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -3,16 +3,22 @@ package com.yahoo.search.cluster;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.yolean.UncheckedInterruptedException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,7 +42,16 @@ public class ClusterMonitor<T> {
private final AtomicBoolean closed = new AtomicBoolean(false);
/** A map from Node to corresponding MonitoredNode */
- private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
+ private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap<>());
+
+ // Used during reconfiguration to ensure async RPC calls are complete.
+ private final Set<T> nodesToRemove = new LinkedHashSet<>();
+
+ // Used during reconfiguration to ensure all nodes have data.
+ private final Set<T> nodesToUpdate = new LinkedHashSet<>();
+
+ // Used for reconfiguration, and during shutdown.
+ private boolean skipNextWait = false;
public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) {
nodeManager = manager;
@@ -46,6 +61,22 @@ public class ClusterMonitor<T> {
}
}
+ /** Updates the monitored set of nodes, and waits for 1. data on new nodes, and 2. RPC completion of removed nodes. */
+ public synchronized void reconfigure(Collection<T> nodes) {
+ if ( ! monitorThread.isAlive()) throw new IllegalStateException("monitor thread must be alive for reconfiguration");
+
+ nodesToUpdate.addAll(nodes);
+ nodesToRemove.addAll(nodeMonitors.keySet());
+ nodesToRemove.removeAll(nodes);
+ for (T node : nodes) if ( ! nodeMonitors.containsKey(node)) add(node, true);
+
+ synchronized (nodeManager) { skipNextWait = true; nodeManager.notifyAll(); }
+ try { while ( ! nodesToRemove.isEmpty() || ! nodesToUpdate.isEmpty()) wait(1); }
+ catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
+
+ nodeManager.pingIterationCompleted();
+ }
+
public void start() {
if ( ! monitorThread.isAlive()) {
monitorThread.start();
@@ -74,30 +105,48 @@ public class ClusterMonitor<T> {
/** Called from ClusterSearcher/NodeManager when a node failed */
public synchronized void failed(T node, ErrorMessage error) {
- if (closed.get()) return; // Do not touch state if close has started.
- TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
- Boolean wasWorking = monitor.isKnownWorking();
- monitor.failed(error);
- if (wasWorking != monitor.isKnownWorking())
- nodeManager.failed(node);
+ updateMonitoredNode(node, monitor -> monitor.failed(error), nodeManager::failed);
}
/** Called when a node responded */
public synchronized void responded(T node) {
- if (closed.get()) return; // Do not touch state if close has started.
+ updateMonitoredNode(node, TrafficNodeMonitor::responded, nodeManager::working);
+ }
+
+ private void updateMonitoredNode(T node, Consumer<TrafficNodeMonitor<T>> monitorUpdate, Consumer<T> nodeUpdate) {
TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
- Boolean wasWorking = monitor.isKnownWorking();
- monitor.responded();
- if (wasWorking != monitor.isKnownWorking())
- nodeManager.working(node);
+
+ // Don't touch state during shutdown.
+ if (closed.get()) monitor = null;
+
+ // Node was removed during reconfiguration, and should no longer be monitored.
+ if (nodesToRemove.remove(node)) {
+ nodeMonitors.remove(node);
+ monitor = null;
+ }
+
+ // Update monitor state only when it actually changes.
+ if (monitor != null) {
+ Boolean wasWorking = monitor.isKnownWorking();
+ monitorUpdate.accept(monitor);
+ if (wasWorking != monitor.isKnownWorking())
+ nodeUpdate.accept(node);
+ }
+
+ // If the node was added in a recent reconfiguration, we now have its required data.
+ nodesToUpdate.remove(node);
}
/**
* Ping all nodes which needs pinging to discover state changes
*/
- public void ping(Executor executor) {
+ public synchronized void ping(Executor executor) {
for (var monitor : nodeMonitors()) {
if (closed.get()) return; // Do nothing to change state if close has started.
+ if (nodesToRemove.remove(monitor.getNode())) {
+ nodeMonitors.remove(monitor.getNode());
+ continue;
+ }
nodeManager.ping(this, monitor.getNode(), executor);
}
nodeManager.pingIterationCompleted();
@@ -120,6 +169,7 @@ public class ClusterMonitor<T> {
nodeMonitors.clear();
}
synchronized (nodeManager) {
+ skipNextWait = true;
nodeManager.notifyAll();
}
try {
@@ -148,7 +198,9 @@ public class ClusterMonitor<T> {
log.finest("Activating ping");
ping(pingExecutor);
synchronized (nodeManager) {
- nodeManager.wait(configuration.getCheckInterval());
+ if ( ! skipNextWait)
+ nodeManager.wait(configuration.getCheckInterval());
+ skipNextWait = false;
}
}
catch (Throwable e) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index eca0c8058a1..43d0e08886d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -60,20 +60,19 @@ public class Dispatcher extends AbstractComponent {
private final DispatchConfig dispatchConfig;
private final RpcConnectionPool rpcResourcePool;
private final SearchCluster searchCluster;
+ private final ClusterMonitor<Node> clusterMonitor;
private volatile VolatileItems volatileItems;
private static class VolatileItems {
final LoadBalancer loadBalancer;
final InvokerFactory invokerFactory;
- final ClusterMonitor<Node> clusterMonitor;
final AtomicInteger inflight = new AtomicInteger(1); // Initial reference.
Runnable cleanup = () -> { };
- VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) {
+ VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
this.loadBalancer = loadBalancer;
this.invokerFactory = invokerFactory;
- this.clusterMonitor = clusterMonitor;
}
private void countDown() {
@@ -128,7 +127,7 @@ public class Dispatcher extends AbstractComponent {
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories);
- this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it.
+ this.clusterMonitor.start(); // Populate nodes to monitor before starting it.
}
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
@@ -137,7 +136,8 @@ public class Dispatcher extends AbstractComponent {
this.rpcResourcePool = rpcConnectionPool;
this.searchCluster = searchCluster;
this.invokerFactories = invokerFactories;
- this.volatileItems = update(clusterMonitor);
+ this.clusterMonitor = clusterMonitor;
+ this.volatileItems = update();
searchCluster.addMonitoring(clusterMonitor);
}
@@ -171,7 +171,7 @@ public class Dispatcher extends AbstractComponent {
* 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it;
* it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot.
* 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time;
- * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that.
+ * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated.
* 5. The search cluster is owned by the dispatcher, and is updated on node set changes;
* its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher,
* as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible
@@ -192,21 +192,16 @@ public class Dispatcher extends AbstractComponent {
};
// Update the nodes the search cluster keeps track of, and what nodes are monitored.
- ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), dispatchConfig.minActivedocsPercentage());
+ searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage());
// Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
- this.volatileItems = update(newMonitor);
-
- // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
- items.get().clusterMonitor.shutdown();
+ this.volatileItems = update();
} // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
}
- private VolatileItems update(ClusterMonitor<Node> clusterMonitor) {
- var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
- invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
- clusterMonitor);
- return items;
+ private VolatileItems update() {
+ return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
+ invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig));
}
private void initialWarmup(double warmupTime) {
@@ -255,7 +250,7 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
- volatileItems.clusterMonitor.shutdown();
+ clusterMonitor.shutdown();
if (rpcResourcePool != null) {
rpcResourcePool.close();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 59b4637a627..f7a77ebf963 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -6,7 +6,6 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
-import com.yahoo.yolean.UncheckedInterruptedException;
import java.util.ArrayList;
import java.util.Collection;
@@ -62,7 +61,7 @@ public class SearchCluster implements NodeManager<Node> {
public String name() { return clusterId; }
/** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */
- public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) {
+ public void updateNodes(Collection<Node> newNodes, ClusterMonitor<Node> monitor, double minActivedocsPercentage) {
List<Node> currentNodes = new ArrayList<>(newNodes);
List<Node> addedNodes = new ArrayList<>();
Map<Node, Node> retainedNodes = groups.nodes().stream().collect(toMap(node -> node, node -> node));
@@ -72,15 +71,9 @@ public class SearchCluster implements NodeManager<Node> {
else addedNodes.add(currentNodes.get(i));
}
SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage);
- ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false);
- for (Node node : groups.nodes()) monitor.add(node, true);
- monitor.start();
- try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } }
- catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
- pingIterationCompleted(groups);
this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups);
+ monitor.reconfigure(groups.nodes());
this.groups = groups;
- return monitor;
}
public void addMonitoring(ClusterMonitor<Node> clusterMonitor) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index 3397638b950..d0f1f46d6ea 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -21,6 +21,7 @@ import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.searchchain.Execution;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
+import com.yahoo.yolean.UncheckedInterruptedException;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -36,6 +37,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -181,7 +183,11 @@ public class DispatcherTest {
pingPhasers.put(1, new Phaser(2));
pingPhasers.put(2, new Phaser(2));
+ AtomicBoolean doPing = new AtomicBoolean();
+
PingFactory pingFactory = (node, monitor, pongHandler) -> () -> {
+ try { while ( ! doPing.getAndSet(false)) { monitor.wait(1); } } // Need to avoid hogging monitor lock while waiting for phaser.
+ catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
pingPhasers.get(node.key()).arriveAndAwaitAdvance();
pongHandler.handle(new Pong(2, 2));
pingPhasers.get(node.key()).arriveAndAwaitAdvance();
@@ -255,8 +261,8 @@ public class DispatcherTest {
Dispatcher dispatcher = new Dispatcher(dispatchConfig, rpcPool, cluster, invokerFactories);
ExecutorService executor = Executors.newFixedThreadPool(1);
- // Set two groups with a single node each. The first cluster-monitor has nothing to do, and is shut down immediately.
- // There are also no invokers, so the whole reconfiguration completes once the new cluster monitor has seen all nodes.
+ // Set two groups with a single node each.
+ // There are no invokers, so the whole reconfiguration completes once the cluster monitor has seen all the new nodes.
Future<?> reconfiguration = executor.submit(() -> {
dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder()
.node(new DispatchNodesConfig.Node.Builder().key(0).group(0).port(123).host("host0"))
@@ -265,8 +271,10 @@ public class DispatcherTest {
});
// Let pings return, to allow the search cluster to reconfigure.
+ doPing.set(true);
pingPhasers.get(0).arriveAndAwaitAdvance();
pingPhasers.get(0).arriveAndAwaitAdvance();
+ doPing.set(true);
pingPhasers.get(1).arriveAndAwaitAdvance();
pingPhasers.get(1).arriveAndAwaitAdvance();
// We need to wait for the cluster to have at least one group, lest dispatch will fail below.
@@ -287,9 +295,10 @@ public class DispatcherTest {
search1.search(new Query(), null);
// Wait for the current cluster monitor to be mid-ping-round.
+ doPing.set(true);
pingPhasers.get(0).arriveAndAwaitAdvance();
- // Then reconfigure the dispatcher with new nodes, replacing node0 with node2.
+ // Reconfigure the dispatcher with new nodes, removing node0 and adding node2.
reconfiguration = executor.submit(() -> {
dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder()
.node(new DispatchNodesConfig.Node.Builder().key(2).group(0).port(123).host("host2"))
@@ -297,16 +306,23 @@ public class DispatcherTest {
.build());
});
// Reconfiguration starts, but groups are only updated once the search cluster has knowledge about all of them.
+ pingPhasers.get(0).arriveAndAwaitAdvance(); // Ping for node to remove completes.
+ doPing.set(true);
+ pingPhasers.get(1).arriveAndAwaitAdvance(); // Ping for node to keep completes.
pingPhasers.get(1).arriveAndAwaitAdvance();
+ // New round of pings starts, with nodes 1 and 2.
+ doPing.set(true);
pingPhasers.get(1).arriveAndAwaitAdvance();
- pingPhasers.get(2).arriveAndAwaitAdvance();
+ pingPhasers.get(1).arriveAndAwaitAdvance();
+
// Cluster has not yet updated its group reference.
assertEquals(1, cluster.group(0).workingNodes()); // Node0 is still working.
assertSame(node0, cluster.group(0).nodes().get(0));
+
+ doPing.set(true);
+ pingPhasers.get(2).arriveAndAwaitAdvance();
pingPhasers.get(2).arriveAndAwaitAdvance();
- // Old cluster monitor is waiting for that ping to complete before it can shut down, and let reconfiguration complete.
- pingPhasers.get(0).arriveAndAwaitAdvance();
reconfiguration.get();
Node node2 = cluster.group(0).nodes().get(0);
assertNotSame(node0, node2);
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
index f6a1ca5cae3..109b9db2ce4 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
@@ -6,12 +6,14 @@ import com.yahoo.container.handler.ClustersStatus;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
+import com.yahoo.search.cluster.BaseNodeMonitor;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.result.ErrorMessage;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -384,6 +386,7 @@ public class SearchClusterTest {
@Test
void requireThatPreciselyTheRetainedNodesAreKeptWhenNodesAreUpdated() {
try (State state = new State("query", 2, IntStream.range(0, 6).mapToObj(i -> "node-" + i).toList())) {
+ state.clusterMonitor.start();
List<Node> referenceNodes = List.of(new Node("test", 0, "node-0", 0),
new Node("test", 1, "node-1", 0),
new Node("test", 0, "node-2", 1),
@@ -392,6 +395,7 @@ public class SearchClusterTest {
new Node("test", 1, "node-5", 2));
SearchGroups oldGroups = state.searchCluster.groupList();
assertEquals(Set.copyOf(referenceNodes), oldGroups.nodes());
+ List<BaseNodeMonitor<Node>> oldMonitors = state.clusterMonitor.nodeMonitors();
List<Node> updatedNodes = List.of(new Node("test", 0, "node-1", 0), // Swap node-0 and node-1
new Node("test", 1, "node-0", 0), // Swap node-1 and node-0
@@ -399,7 +403,7 @@ public class SearchClusterTest {
new Node("test", 1, "node-3", 1),
new Node("test", 0, "node-2", 2), // Swap node-4 and node-2
new Node("test", 1, "node-6", 2)); // Replace node-6
- state.searchCluster.updateNodes(updatedNodes, 100.0);
+ state.searchCluster.updateNodes(updatedNodes, state.clusterMonitor, 100.0);
SearchGroups newGroups = state.searchCluster.groupList();
assertEquals(Set.copyOf(updatedNodes), newGroups.nodes());
@@ -417,6 +421,12 @@ public class SearchClusterTest {
for (Node node : updatedNodes)
assertEquals(pathIndexWithinGroup[node.group()]++, newNodesByIdentity.get(node).pathIndex(),
"search path index within group should follow updated node order for: " + node);
+
+ // Precisely the one retained node keeps its monitor through reconfiguration.
+ Set<BaseNodeMonitor<Node>> retainedMonitors = new HashSet<>(state.clusterMonitor.nodeMonitors());
+ retainedMonitors.retainAll(oldMonitors);
+ assertEquals(1, retainedMonitors.size());
+ assertSame(oldNodesByIdentity.get(updatedNodes.get(3)), retainedMonitors.iterator().next().getNode());
}
}