diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 2e8fbe34781..9b0c79dc2bc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.component.annotation.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.component.ComponentId; +import com.yahoo.component.annotation.Inject; import com.yahoo.compress.Compressor; import com.yahoo.container.handler.VipStatus; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; @@ -16,9 +16,9 @@ import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; -import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.FieldType; import com.yahoo.search.query.profile.types.QueryProfileType; @@ -33,7 +33,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -59,19 +58,20 @@ public class Dispatcher extends AbstractComponent { private final DispatchConfig dispatchConfig; private final RpcResourcePool rpcResourcePool; private final SearchCluster searchCluster; - private final ClusterMonitor<Node> clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; + final ClusterMonitor<Node> clusterMonitor; final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. Runnable cleanup = () -> { }; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; + this.clusterMonitor = clusterMonitor; } private void countDown() { @@ -112,19 +112,17 @@ public class Dispatcher extends AbstractComponent { rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool)); - clusterMonitor = new ClusterMonitor<>(searchCluster, true); - volatileItems = update(null); + volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); initialWarmup(dispatchConfig.warmuptime()); } - /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ + /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { this.dispatchConfig = dispatchConfig; this.rpcResourcePool = null; this.searchCluster = searchCluster; - this.clusterMonitor = clusterMonitor; - this.volatileItems = update(invokerFactory); + this.volatileItems = update(invokerFactory, clusterMonitor); } /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ @@ -142,13 +140,28 @@ public class Dispatcher extends AbstractComponent { * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes. * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close. * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up. + * + * Ownership details: + * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes; + * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe. + * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it; + * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen. + * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; + * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. + * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; + * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; + * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, + * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible + * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set + * coverage information as if the new nodes have zero documents, before even checking their status; this is fine + * under the assumption that this is the common case, i.e., new nodes have no documents yet. */ void updateWithNewConfig(DispatchNodesConfig nodesConfig) { - try (var items = volatileItems()) { // Replace the volatile items snapshot, marking a reference to the old one. - items.get().countDown(); // Decrement for the initial creation reference. - this.volatileItems = update(null); + try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up. + items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0. - // Set up the cleanup that we need to do. + // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do. Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig); items.get().cleanup = () -> { for (AutoCloseable pool : connectionPoolsToClose) { @@ -156,15 +169,24 @@ public class Dispatcher extends AbstractComponent { } }; - } // Close the old snapshot, which may trigger the cleanup right now, or when the last invoker is closed. + // Update the nodes the search cluster keeps track of, and what nodes are monitored. + searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); + + // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. + this.volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); + + // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. + items.get().clusterMonitor.shutdown(); + } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(InvokerFactory invokerFactory) { + private VolatileItems update(InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), (invokerFactory == null) ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) - : invokerFactory); - searchCluster.addMonitoring(clusterMonitor); // TODO: Update, rather than add ... as this creates a pinger for each node + : invokerFactory, + clusterMonitor); + searchCluster.addMonitoring(clusterMonitor); return items; } @@ -214,7 +236,7 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - clusterMonitor.shutdown(); + volatileItems.clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } |