summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java60
1 files changed, 41 insertions, 19 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 2e8fbe34781..9b0c79dc2bc 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -1,9 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.yahoo.component.annotation.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.ComponentId;
+import com.yahoo.component.annotation.Inject;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
@@ -16,9 +16,9 @@ import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Group;
-import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.query.profile.types.FieldDescription;
import com.yahoo.search.query.profile.types.FieldType;
import com.yahoo.search.query.profile.types.QueryProfileType;
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
@@ -59,19 +58,20 @@ public class Dispatcher extends AbstractComponent {
private final DispatchConfig dispatchConfig;
private final RpcResourcePool rpcResourcePool;
private final SearchCluster searchCluster;
- private final ClusterMonitor<Node> clusterMonitor;
private volatile VolatileItems volatileItems;
private static class VolatileItems {
final LoadBalancer loadBalancer;
final InvokerFactory invokerFactory;
+ final ClusterMonitor<Node> clusterMonitor;
final AtomicInteger inflight = new AtomicInteger(1); // Initial reference.
Runnable cleanup = () -> { };
- VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
+ VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) {
this.loadBalancer = loadBalancer;
this.invokerFactory = invokerFactory;
+ this.clusterMonitor = clusterMonitor;
}
private void countDown() {
@@ -112,19 +112,17 @@ public class Dispatcher extends AbstractComponent {
rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig);
searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool));
- clusterMonitor = new ClusterMonitor<>(searchCluster, true);
- volatileItems = update(null);
+ volatileItems = update(null, new ClusterMonitor<>(searchCluster, true));
initialWarmup(dispatchConfig.warmuptime());
}
- /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
+ /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */
Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
this.dispatchConfig = dispatchConfig;
this.rpcResourcePool = null;
this.searchCluster = searchCluster;
- this.clusterMonitor = clusterMonitor;
- this.volatileItems = update(invokerFactory);
+ this.volatileItems = update(invokerFactory, clusterMonitor);
}
/** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */
@@ -142,13 +140,28 @@ public class Dispatcher extends AbstractComponent {
* 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes.
* 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close.
* 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up.
+ *
+ * Ownership details:
+ * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes;
+ * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe.
+ * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it;
+ * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen.
+ * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it;
+ * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot.
+ * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time;
+ * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that.
+ * 5. The search cluster is owned by the dispatcher, and is updated on node set changes;
+ * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher,
+ * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible
+ * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set
+ * coverage information as if the new nodes have zero documents, before even checking their status; this is fine
+ * under the assumption that this is the common case, i.e., new nodes have no documents yet.
*/
void updateWithNewConfig(DispatchNodesConfig nodesConfig) {
- try (var items = volatileItems()) { // Replace the volatile items snapshot, marking a reference to the old one.
- items.get().countDown(); // Decrement for the initial creation reference.
- this.volatileItems = update(null);
+ try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up.
+ items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0.
- // Set up the cleanup that we need to do.
+ // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do.
Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig);
items.get().cleanup = () -> {
for (AutoCloseable pool : connectionPoolsToClose) {
@@ -156,15 +169,24 @@ public class Dispatcher extends AbstractComponent {
}
};
- } // Close the old snapshot, which may trigger the cleanup right now, or when the last invoker is closed.
+ // Update the nodes the search cluster keeps track of, and what nodes are monitored.
+ searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
+
+ // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
+ this.volatileItems = update(null, new ClusterMonitor<>(searchCluster, true));
+
+ // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
+ items.get().clusterMonitor.shutdown();
+ } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
}
- private VolatileItems update(InvokerFactory invokerFactory) {
+ private VolatileItems update(InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) {
var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
(invokerFactory == null)
? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig)
- : invokerFactory);
- searchCluster.addMonitoring(clusterMonitor); // TODO: Update, rather than add ... as this creates a pinger for each node
+ : invokerFactory,
+ clusterMonitor);
+ searchCluster.addMonitoring(clusterMonitor);
return items;
}
@@ -214,7 +236,7 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
- clusterMonitor.shutdown();
+ volatileItems.clusterMonitor.shutdown();
if (rpcResourcePool != null) {
rpcResourcePool.close();
}