diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2023-07-14 10:39:23 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-14 10:39:23 +0200 |
commit | bf480e663efe4e7390285d624a7b383de66a1a10 (patch) | |
tree | a20ed2bade0cee1a0c6e5ecf647d5b881ab11427 | |
parent | b4d7c1418ad373325acf7ecfe68e59cbfa9e2fb2 (diff) | |
parent | 9c208cbc41ae710b6bacbd34455b386d27ad7781 (diff) |
Merge pull request #27770 from vespa-engine/jonmv/reconfiger-dispatch-selfsub
Add code for reconfigurable dispatcher with self-sub, not wired in
24 files changed, 546 insertions, 129 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java b/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java index 414d4c817c7..728b4d40bdd 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java @@ -6,6 +6,8 @@ import com.yahoo.container.QrSearchersConfig; import com.yahoo.prelude.semantics.SemanticRulesConfig; import com.yahoo.search.config.IndexInfoConfig; import com.yahoo.search.config.SchemaInfoConfig; +import com.yahoo.search.dispatch.Dispatcher; +import com.yahoo.search.dispatch.ReconfigurableDispatcher; import com.yahoo.search.pagetemplates.PageTemplatesConfig; import com.yahoo.search.query.profile.config.QueryProfilesConfig; import com.yahoo.search.ranking.RankProfilesEvaluatorFactory; @@ -81,16 +83,18 @@ public class ContainerSearch extends ContainerSubsystem<SearchChains> /** Adds a Dispatcher component to the owning container cluster for each search cluster */ private void initializeDispatchers(Collection<SearchCluster> searchClusters) { + boolean useReconfigurableDispatch = false; + Class<? extends Dispatcher> dispatcherClass = useReconfigurableDispatch ? ReconfigurableDispatcher.class : Dispatcher.class; for (SearchCluster searchCluster : searchClusters) { if (searchCluster instanceof IndexedSearchCluster indexed) { - var dispatcher = new DispatcherComponent(indexed); + var dispatcher = new DispatcherComponent(indexed, dispatcherClass); owningCluster.addComponent(dispatcher); } if (globalPhase) { for (var documentDb : searchCluster.getDocumentDbs()) { - if (!schemasWithGlobalPhase.contains(documentDb.getSchemaName())) continue; + if ( ! schemasWithGlobalPhase.contains(documentDb.getSchemaName())) continue; var factory = new RankProfilesEvaluatorComponent(documentDb); - if (! owningCluster.getComponentsMap().containsKey(factory.getComponentId())) { + if ( ! owningCluster.getComponentsMap().containsKey(factory.getComponentId())) { owningCluster.addComponent(factory); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java b/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java index f9a3a1f1990..fe2df8101bd 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.model.container.search; import com.yahoo.config.model.producer.TreeConfigProducer; import com.yahoo.osgi.provider.model.ComponentModel; +import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import com.yahoo.vespa.model.container.component.Component; @@ -22,15 +23,15 @@ public class DispatcherComponent extends Component<TreeConfigProducer<?>, Compon private final IndexedSearchCluster indexedSearchCluster; - public DispatcherComponent(IndexedSearchCluster indexedSearchCluster) { - super(toComponentModel(indexedSearchCluster.getClusterName())); + public DispatcherComponent(IndexedSearchCluster indexedSearchCluster, Class<? extends Dispatcher> clazz) { + super(toComponentModel(indexedSearchCluster.getClusterName(), clazz)); this.indexedSearchCluster = indexedSearchCluster; } - private static ComponentModel toComponentModel(String clusterName) { + private static ComponentModel toComponentModel(String clusterName, Class<? extends Dispatcher> clazz) { String dispatcherComponentId = "dispatcher." + clusterName; // used by ClusterSearcher return new ComponentModel(dispatcherComponentId, - com.yahoo.search.dispatch.Dispatcher.class.getName(), + clazz.getName(), PlatformBundles.SEARCH_AND_DOCPROC_BUNDLE); } diff --git a/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java b/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java index 52b372638c8..3471627e887 100644 --- a/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java +++ b/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java @@ -12,7 +12,7 @@ import java.util.Set; /** * A component which tracks the up/down status of any clusters which should influence * the up down status of this container itself, as well as the separate fact (from config) - * that such clusters are present. This is a separate fact because we might know we have clusters configured + * that such clusters are present. This is a separate fact because we might know we have clusters configured, * but we don't have positive information that they are up yet, and in this case we should be down. * * This is a separate component which has <b>no dependencies</b> such that the status tracked in this diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java index fd8110e1173..d1377b8d373 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java @@ -82,7 +82,7 @@ public abstract class BaseNodeMonitor<T> { /** Thread-safely changes the state of this node if required */ protected abstract void setWorking(boolean working,String explanation); - /** Returns whether or not this is monitoring an internal node. Default is false. */ + /** Returns whether this is monitoring an internal node. Default is false. */ public boolean isInternal() { return internal; } } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 0b627e91bc5..332bf4ea2c4 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -66,7 +66,7 @@ public class ClusterMonitor<T> { * </ul> * * @param node the object representing the node - * @param internal whether or not this node is internal to this cluster + * @param internal whether this node is internal to this cluster */ public void add(T node, boolean internal) { nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal)); @@ -96,11 +96,10 @@ public class ClusterMonitor<T> { * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { - for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) { - BaseNodeMonitor<T> monitor= i.next(); - nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded + for (var monitor : nodeMonitors()) { + if (closed.get()) return; // Do nothing to change state if close has started. + nodeManager.ping(this, monitor.getNode(), executor); } - if (closed.get()) return; // Do nothing to change state if close has started. nodeManager.pingIterationCompleted(); } @@ -143,7 +142,7 @@ public class ClusterMonitor<T> { // for all pings when there are no problems (important because it ensures that // any thread local connections are reused) 2) a new thread will be started to execute // new pings when a ping is not responding - ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); + ExecutorService pingExecutor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); while (!closed.get()) { try { log.finest("Activating ping"); @@ -165,7 +164,9 @@ public class ClusterMonitor<T> { } pingExecutor.shutdown(); try { - pingExecutor.awaitTermination(10, TimeUnit.SECONDS); + if ( ! pingExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warning("Timeout waiting for ping executor to terminate"); + } } catch (InterruptedException e) { } log.info("Stopped cluster monitor thread " + getName()); } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java index 4af6757db8c..1cf36d75fc5 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -48,7 +48,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod * * @param id the id of this searcher * @param connections the connections of the cluster - * @param internal whether or not this cluster is internal (part of the same installation) + * @param internal whether this cluster is internal (part of the same installation) */ public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) { this(id, connections, new Hasher<>(), internal); diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java index 1f6602053d9..f8f8c0d888d 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java @@ -22,7 +22,7 @@ public class MonitorConfiguration { /** * Returns the number of milliseconds to attempt to service a request - * (at different nodes) before giving up. Default is 5000 ms. + * (at different nodes) before giving up. See {@link #requestTimeout}. */ public long getRequestTimeout() { return requestTimeout; } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java index 11475b6a0ca..108e7e3e34b 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java @@ -23,7 +23,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { this.configuration = configuration; } - /** Whether or not this has ever responded successfully */ + /** Whether this has ever responded successfully */ private boolean atStartUp = true; public T getNode() { return node; } @@ -55,7 +55,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { respondedAt = now(); succeededAt = respondedAt; - setWorking(true,"Responds correctly"); + setWorking(true, "Responds correctly"); } /** diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java index 77496114df1..c6fef88fa2d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java @@ -2,7 +2,6 @@ package com.yahoo.search.dispatch; import java.io.Closeable; -import java.time.Duration; import java.util.function.BiConsumer; /** @@ -21,8 +20,8 @@ public abstract class CloseableInvoker implements Closeable { private RequestDuration duration; public void teardown(BiConsumer<Boolean, RequestDuration> teardown) { - this.teardown = teardown; - this.duration = new RequestDuration(); + this.teardown = this.teardown == null ? teardown : this.teardown.andThen(teardown); + this.duration = this.duration == null ? new RequestDuration() : this.duration; } protected void setFinalStatus(boolean success) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index db7e80a95e5..6f6b0fc2b79 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -1,9 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.component.annotation.Inject; import com.yahoo.component.AbstractComponent; import com.yahoo.component.ComponentId; +import com.yahoo.component.annotation.Inject; import com.yahoo.compress.Compressor; import com.yahoo.container.handler.VipStatus; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; @@ -12,13 +12,14 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.rpc.RpcConnectionPool; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; -import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.FieldType; import com.yahoo.search.query.profile.types.QueryProfileType; @@ -32,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -54,19 +56,43 @@ public class Dispatcher extends AbstractComponent { /** If set will control computation of how many hits will be fetched from each partition.*/ public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY); + private final InvokerFactoryFactory invokerFactories; private final DispatchConfig dispatchConfig; - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; - private final ClusterMonitor<Node> clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { + final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { + final ClusterMonitor<Node> clusterMonitor; + final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. + Runnable cleanup = () -> { }; + + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; + this.clusterMonitor = clusterMonitor; + } + + private void countDown() { + if (inflight.decrementAndGet() == 0) cleanup.run(); + } + + private class Ref implements AutoCloseable { + boolean handedOff = false; + { inflight.incrementAndGet(); } + VolatileItems get() { return VolatileItems.this; } + /** Hands off the reference to the given invoker, which will decrement the counter when closed. */ + <T extends CloseableInvoker> T register(T invoker) { + invoker.teardown((__, ___) -> countDown()); + handedOff = true; + return invoker; + } + @Override public void close() { if ( ! handedOff) countDown(); } } + } private static final QueryProfileType argumentType; @@ -81,34 +107,105 @@ public class Dispatcher extends AbstractComponent { public static QueryProfileType getArgumentType() { return argumentType; } + interface InvokerFactoryFactory { + InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig); + } + @Inject - public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, VipStatus vipStatus) { - this.dispatchConfig = dispatchConfig; - rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); - searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), - toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool)); - clusterMonitor = new ClusterMonitor<>(searchCluster, true); - volatileItems = update(null); + public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) { + this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new); initialWarmup(dispatchConfig.warmuptime()); } - /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ - Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, - DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { + Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, + new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), + toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), + invokerFactories); + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); + this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it. + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) { this.dispatchConfig = dispatchConfig; - this.rpcResourcePool = null; + this.rpcResourcePool = rpcConnectionPool; this.searchCluster = searchCluster; - this.clusterMonitor = clusterMonitor; - this.volatileItems = update(invokerFactory); + this.invokerFactories = invokerFactories; + this.volatileItems = update(clusterMonitor); + searchCluster.addMonitoring(clusterMonitor); } - private VolatileItems update(InvokerFactory invokerFactory) { + /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ + Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, + DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { + this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory); + } + + /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ + private VolatileItems.Ref volatileItems() { + return volatileItems.new Ref(); + } + + /** + * This is called whenever we have new config for backend nodes. + * Normally, we'd want to handle partial failure of the component graph, by reinstating the old state; + * however, in this case, such a failure would be local to this container, and we instead want to keep + * the newest config, as that is what most accurately represents the actual backend. + * + * The flow of reconfiguration is: + * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes. + * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close. + * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up. + * + * Ownership details: + * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes; + * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe. + * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it; + * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen. + * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; + * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. + * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; + * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; + * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, + * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible + * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set + * coverage information as if the new nodes have zero documents, before even checking their status; this is fine + * under the assumption that this is the common case, i.e., new nodes have no documents yet. + */ + void updateWithNewConfig(DispatchNodesConfig nodesConfig) { + try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up. + items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0. + + // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do. + Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig); + items.get().cleanup = () -> { + for (AutoCloseable pool : connectionPoolsToClose) { + try { pool.close(); } catch (Exception ignored) { } + } + }; + + // Update the nodes the search cluster keeps track of, and what nodes are monitored. + ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); + + // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. + this.volatileItems = update(newMonitor); + + // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. + items.get().clusterMonitor.shutdown(); + } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. + } + + private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - (invokerFactory == null) - ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) - : invokerFactory); - searchCluster.addMonitoring(clusterMonitor); + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), + clusterMonitor); return items; } @@ -158,27 +255,30 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - clusterMonitor.shutdown(); + volatileItems.clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { - return volatileItems.invokerFactory.createFillInvoker(searcher, result); + try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. + return items.register(items.get().invokerFactory.createFillInvoker(searcher, result)); + } } public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - VolatileItems items = volatileItems; // Take a snapshot - int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); - SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode) - .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode)); - - if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { - query.setHits(0); - query.setOffset(0); + try (var items = volatileItems()) { // Take a snapshot, and release it when we're done. + int maxHitsPerNode = dispatchConfig.maxHitsPerNode(); + SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode) + .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode)); + + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { + query.setHits(0); + query.setOffset(0); + } + return items.register(invoker); } - return invoker; } /** Builds an invoker based on searchpath */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java new file mode 100644 index 00000000000..625a8bcb6da --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java @@ -0,0 +1,37 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.component.ComponentId; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.messagebus.network.rpc.SlobrokConfigSubscriber; +import com.yahoo.vespa.config.search.DispatchConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.yolean.UncheckedInterruptedException; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + */ +public class ReconfigurableDispatcher extends Dispatcher { + + private final ConfigSubscriber subscriber; + + public ReconfigurableDispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, VipStatus vipStatus) { + super(clusterId, dispatchConfig, new DispatchNodesConfig.Builder().build(), vipStatus); + this.subscriber = new ConfigSubscriber(); + this.subscriber.subscribe(this::updateWithNewConfig, DispatchNodesConfig.class, clusterId.stringValue()); + } + + @Override + public void deconstruct() { + subscriber.close(); + super.deconstruct(); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java index 1206277a103..6b134dc23a6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java @@ -5,7 +5,7 @@ import java.time.Duration; import java.time.Instant; /** - * Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests. + * Contains start and end time. Exposes a duration, and lets you measure the time difference between 2 requests. * It does use System.nanoTime to get a steady clock. * * @author baldersheim diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index 22ed8b6d9fa..6c1f666835c 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -12,7 +12,7 @@ import java.util.Optional; * * @author bratseth */ -interface Client { +public interface Client { /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java index fd8e0e4f81a..a93ddb0b360 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java @@ -1,11 +1,27 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; +import com.yahoo.vespa.config.search.DispatchNodesConfig; + +import java.util.Collection; +import java.util.List; + /** * Interface for getting a connection given a node id. * * @author balderersheim */ -public interface RpcConnectionPool { +public interface RpcConnectionPool extends AutoCloseable { + + /** Returns a connection to the given node id. */ Client.NodeConnection getConnection(int nodeId); + + + /** Will return a list of items that need a delayed close when updating node set. */ + default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); } + + /** Shuts down all connections in the pool, and the underlying RPC client. */ + @Override + void close(); + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 154002c4f77..b6228994ac8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory { super(cluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; this.compressor = new CompressService(); - decodeType = convert(dispatchConfig.summaryDecodePolicy()); + this.decodeType = convert(dispatchConfig.summaryDecodePolicy()); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index 53dc54f7bc5..a59097e5fff 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -37,7 +37,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { this.clusterMonitor = clusterMonitor; this.pingSequenceId = node.createPingSequenceId(); this.pongHandler = pongHandler; - this. compressor = compressor; + this.compressor = compressor; } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 63530a7f650..d1f22514481 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -3,8 +3,10 @@ package com.yahoo.search.dispatch.rpc; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; +import com.yahoo.search.dispatch.rpc.RpcClient.RpcNodeConnection; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig.Node; import java.util.ArrayList; import java.util.Collection; @@ -19,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom; * * @author ollivir */ -public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { +public class RpcResourcePool implements RpcConnectionPool { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of(); @@ -35,46 +37,35 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { } public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { - super(); rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); numConnections = dispatchConfig.numJrtConnectionsPerNode(); - updateNodes(nodesConfig).forEach(item -> { - try { - item.close(); - } catch (Exception e) {} + updateNodes(nodesConfig).forEach(pool -> { + try { pool.close(); } catch (Exception ignored) { } // Shouldn't throw. }); } - /** Will return a list of items that need a delayed close */ - public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { - List<AutoCloseable> toClose = new ArrayList<>(); - var builder = new HashMap<Integer, NodeConnectionPool>(); + @Override + public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { + Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools); + Map<Integer, NodeConnectionPool> nextPools = new HashMap<>(); // Who can be reused - for (var node : nodesConfig.node()) { - var prev = nodeConnectionPools.get(node.key()); - NodeConnection nc = prev != null ? prev.nextConnection() : null; - if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection - && rpcNodeConnection.getPort() == node.port() - && rpcNodeConnection.getHostname().equals(node.host())) + for (Node node : nodesConfig.node()) { + if ( currentPools.containsKey(node.key()) + && currentPools.get(node.key()).nextConnection() instanceof RpcNodeConnection rpcNodeConnection + && rpcNodeConnection.getPort() == node.port() + && rpcNodeConnection.getHostname().equals(node.host())) { - builder.put(node.key(), prev); + nextPools.put(node.key(), currentPools.remove(node.key())); } else { - var connections = new ArrayList<NodeConnection>(numConnections); + ArrayList<NodeConnection> connections = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; i++) { connections.add(rpcClient.createConnection(node.host(), node.port())); } - builder.put(node.key(), new NodeConnectionPool(connections)); + nextPools.put(node.key(), new NodeConnectionPool(connections)); } } - // Who are not needed any more - nodeConnectionPools.forEach((key, pool) -> { - var survivor = builder.get(key); - if (survivor == null || pool != survivor) { - toClose.add(pool); - } - }); - this.nodeConnectionPools = Map.copyOf(builder); - return toClose; + this.nodeConnectionPools = Map.copyOf(nextPools); + return currentPools.values(); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index 121c12335f5..c8af5cea5aa 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; +import java.util.ArrayList; import java.util.List; import java.util.logging.Logger; @@ -51,7 +52,7 @@ public class Group { /** * Returns whether this group has sufficient active documents - * (compared to other groups) that is should receive traffic + * (compared to other groups) that should receive traffic */ public boolean hasSufficientCoverage() { return hasSufficientCoverage; @@ -66,14 +67,16 @@ public class Group { } public void aggregateNodeValues() { - long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum(); + List<Node> workingNodes = new ArrayList<>(nodes); + workingNodes.removeIf(node -> node.isWorking() != Boolean.TRUE); + long activeDocs = workingNodes.stream().mapToLong(Node::getActiveDocuments).sum(); activeDocuments = activeDocs; - targetActiveDocuments = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getTargetActiveDocuments).sum(); + targetActiveDocuments = workingNodes.stream().mapToLong(Node::getTargetActiveDocuments).sum(); isBlockingWrites = nodes.stream().anyMatch(Node::isBlockingWrites); - int numWorkingNodes = workingNodes(); + int numWorkingNodes = workingNodes.size(); if (numWorkingNodes > 0) { long average = activeDocs / numWorkingNodes; - long skew = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); + long skew = workingNodes.stream().mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum(); boolean balanced = skew <= activeDocs * maxContentSkew; if (balanced != isBalanced) { if (!isSparse()) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 9c65cb3d4c0..3c8950f1f7f 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -6,15 +6,18 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; +import com.yahoo.yolean.UncheckedInterruptedException; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; import java.util.logging.Logger; -import java.util.stream.Collectors; + +import static java.util.stream.Collectors.groupingBy; /** * A model of a search cluster we might want to dispatch queries to. @@ -28,7 +31,7 @@ public class SearchCluster implements NodeManager<Node> { private final String clusterId; private final VipStatus vipStatus; private final PingFactory pingFactory; - private final SearchGroupsImpl groups; + private volatile SearchGroupsImpl groups; private volatile long nextLogTime = 0; /** @@ -45,6 +48,7 @@ public class SearchCluster implements NodeManager<Node> { VipStatus vipStatus, PingFactory pingFactory) { this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory); } + public SearchCluster(String clusterId, SearchGroupsImpl groups, VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; this.vipStatus = vipStatus; @@ -55,13 +59,28 @@ public class SearchCluster implements NodeManager<Node> { @Override public String name() { return clusterId; } - public VipStatus getVipStatus() { return vipStatus; } + + /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */ + public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { + Collection<Node> retainedNodes = groups.nodes(); + Collection<Node> currentNodes = new HashSet<>(newNodes); + retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set. + currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object. + Collection<Node> addedNodes = List.copyOf(currentNodes); + currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set. + SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage); + ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false); + for (Node node : groups.nodes()) monitor.add(node, true); + monitor.start(); + try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } } + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } + pingIterationCompleted(groups); + this.groups = groups; + return monitor; + } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { - for (var group : groups()) { - for (var node : group.nodes()) - clusterMonitor.add(node, true); - } + for (Node node : groups.nodes()) clusterMonitor.add(node, true); } private static Node findLocalCorpusDispatchTarget(String selfHostname, SearchGroups groups) { @@ -86,14 +105,14 @@ public class SearchCluster implements NodeManager<Node> { private static SearchGroupsImpl toGroups(Collection<Node> nodes, double minActivedocsPercentage) { Map<Integer, Group> groups = new HashMap<>(); - for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) { - Group g = new Group(group.getKey(), group.getValue()); - groups.put(group.getKey(), g); - } + nodes.stream().collect(groupingBy(Node::group)).forEach((groupId, groupNodes) -> { + groups.put(groupId, new Group(groupId, groupNodes)); + }); return new SearchGroupsImpl(Map.copyOf(groups), minActivedocsPercentage); } public SearchGroups groupList() { return groups; } + public Group group(int id) { return groups.get(id); } private Collection<Group> groups() { return groups.groups(); } @@ -107,14 +126,14 @@ public class SearchCluster implements NodeManager<Node> { * or empty if we should not dispatch directly. */ public Optional<Node> localCorpusDispatchTarget() { - if ( localCorpusDispatchTarget == null) return Optional.empty(); + if (localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage Group localSearchGroup = groups.get(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down - if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); + if (localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); return Optional.of(localCorpusDispatchTarget); } @@ -176,7 +195,7 @@ public class SearchCluster implements NodeManager<Node> { return groups().stream().allMatch(group -> group.nodes().stream().allMatch(node -> node.isWorking() != null)); } - public long nonWorkingNodeCount() { + long nonWorkingNodeCount() { return groups().stream().flatMap(group -> group.nodes().stream()).filter(node -> node.isWorking() == Boolean.FALSE).count(); } @@ -194,13 +213,13 @@ public class SearchCluster implements NodeManager<Node> { /** Used by the cluster monitor to manage node status */ @Override - public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) { + public void ping(ClusterMonitor<Node> clusterMonitor, Node node, Executor executor) { Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor)); pinger.ping(); } - private void pingIterationCompletedSingleGroup() { - Group group = groups().iterator().next(); + private void pingIterationCompletedSingleGroup(SearchGroupsImpl groups) { + Group group = groups.groups().iterator().next(); group.aggregateNodeValues(); // With just one group sufficient coverage may not be the same as full coverage, as the // group will always be marked sufficient for use. @@ -209,10 +228,10 @@ public class SearchCluster implements NodeManager<Node> { trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments()); } - private void pingIterationCompletedMultipleGroups() { - groups().forEach(Group::aggregateNodeValues); + private void pingIterationCompletedMultipleGroups(SearchGroupsImpl groups) { + groups.groups().forEach(Group::aggregateNodeValues); long medianDocuments = groups.medianDocumentsPerGroup(); - for (Group group : groups()) { + for (Group group : groups.groups()) { boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments); updateSufficientCoverage(group, sufficientCoverage); trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments); @@ -226,20 +245,20 @@ public class SearchCluster implements NodeManager<Node> { */ @Override public void pingIterationCompleted() { + pingIterationCompleted(groups); + } + + private void pingIterationCompleted(SearchGroupsImpl groups) { if (groups.size() == 1) { - pingIterationCompletedSingleGroup(); + pingIterationCompletedSingleGroup(groups); } else { - pingIterationCompletedMultipleGroups(); + pingIterationCompletedMultipleGroups(groups); } } - - /** * Calculate whether a subset of nodes in a group has enough coverage */ - - private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) { if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about. boolean changed = group.fullCoverageStatusChanged(fullCoverage); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java index b041ba28db9..5727931281a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java @@ -1,8 +1,16 @@ package com.yahoo.search.dispatch.searchcluster; +import com.yahoo.stream.CustomCollectors; + import java.util.Collection; +import java.util.Comparator; +import java.util.LinkedHashSet; import java.util.Set; +import static java.util.Comparator.comparingInt; +import static java.util.stream.Collectors.toCollection; +import static java.util.stream.Collectors.toSet; + /** * Simple interface for groups and their nodes in the content cluster * @author baldersheim @@ -14,6 +22,11 @@ public interface SearchGroups { default boolean isEmpty() { return size() == 0; } + default Set<Node> nodes() { + return groups().stream().flatMap(group -> group.nodes().stream()) + .sorted(comparingInt(Node::key)) + .collect(toCollection(LinkedHashSet::new)); + } int size(); boolean isPartialGroupCoverageSufficient(Collection<Node> nodes); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java index 514f0de4fec..3c5dbe9927a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java @@ -3,10 +3,8 @@ package com.yahoo.search.dispatch.searchcluster; import com.google.common.math.Quantiles; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class SearchGroupsImpl implements SearchGroups { @@ -42,4 +40,5 @@ public class SearchGroupsImpl implements SearchGroups { double[] activeDocuments = groups().stream().mapToDouble(Group::activeDocuments).toArray(); return (long) Quantiles.median().computeInPlace(activeDocuments); } + } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 2603f89b546..1278afe3759 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -1,27 +1,51 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.yahoo.compress.CompressionType; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.dispatch.Dispatcher.InvokerFactoryFactory; +import com.yahoo.search.dispatch.rpc.Client.NodeConnection; +import com.yahoo.search.dispatch.rpc.Client.ResponseReceiver; +import com.yahoo.search.dispatch.rpc.RpcConnectionPool; import com.yahoo.search.dispatch.searchcluster.MockSearchCluster; -import com.yahoo.search.dispatch.searchcluster.SearchGroups; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.PingFactory; import com.yahoo.search.dispatch.searchcluster.Pinger; import com.yahoo.search.dispatch.searchcluster.PongHandler; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.search.dispatch.searchcluster.SearchGroups; +import com.yahoo.search.searchchain.Execution; import com.yahoo.vespa.config.search.DispatchConfig; +import com.yahoo.vespa.config.search.DispatchNodesConfig; import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static com.yahoo.search.dispatch.searchcluster.MockSearchCluster.createDispatchConfig; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -149,6 +173,172 @@ public class DispatcherTest { dispatcher.deconstruct(); } + @Test + void testRpcResourceShutdownOnReconfiguration() throws InterruptedException, ExecutionException, IOException { + // Ping factory lets us tick each ping, so we may delay shutdown, due to monitor thread RPC usage. + Map<Integer, Phaser> pingPhasers = new ConcurrentHashMap<>(); + pingPhasers.put(0, new Phaser(2)); + pingPhasers.put(1, new Phaser(2)); + pingPhasers.put(2, new Phaser(2)); + + PingFactory pingFactory = (node, monitor, pongHandler) -> () -> { + pingPhasers.get(node.key()).arriveAndAwaitAdvance(); + pongHandler.handle(new Pong(2, 2)); + pingPhasers.get(node.key()).arriveAndAwaitAdvance(); + }; + + // Search cluster uses the ping factory, and zero nodes initially, later configured with two nodes. + SearchCluster cluster = new MockSearchCluster("cid", 0, 1, pingFactory); + + // Dummy RPC layer where we manually tick responses for each node. + // When a response is let go, we verify the RPC resource is not yet closed. + // This is signalled by terminating its phaser, which is done by the dispatcher in delayed cleanup. + // We verify in the end that all connections have been shut down, prior to shutting down the RPC pool proper. + Map<Integer, Boolean > rpcResources = new HashMap<>(); + AtomicLong cleanupThreadId = new AtomicLong(); + AtomicInteger nodeIdOfSearcher0 = new AtomicInteger(-1); + RpcConnectionPool rpcPool = new RpcConnectionPool() { + // Returns a connection that lets us advance the searcher when we want to, as well as tracking which threads do what. + @Override public NodeConnection getConnection(int nodeId) { + nodeIdOfSearcher0.set(nodeId); + return new NodeConnection() { + @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { + assertTrue(rpcResources.get(nodeId)); + } + @Override public void close() { + assertFalse(rpcResources.remove(nodeId)); + } + }; + } + // Verifies cleanup is done by the expected thread, by ID, and cleans up the "RPC connection" (phaser). + @Override public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig config) { + for (DispatchNodesConfig.Node node : config.node()) + rpcResources.putIfAbsent(node.key(), true); + return rpcResources.keySet().stream() + .filter(key -> config.node().stream().noneMatch(node -> node.key() == key)) + .map(key -> (AutoCloseable) () -> { + assertTrue(rpcResources.put(key, false)); + cleanupThreadId.set(Thread.currentThread().getId()); + getConnection(key).close(); + }) + .toList(); + }; + // In the end, we have reconfigured down to 0 nodes, and no resources should be left running after cleanup. + @Override public void close() { + assertEquals(Map.of(), rpcResources); + } + }; + + // This factory just forwards search to the dummy RPC layer above, nothing more. + InvokerFactoryFactory invokerFactories = (rpcConnectionPool, searchGroups, dispatchConfig) -> new InvokerFactory(searchGroups, dispatchConfig) { + @Override protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, int maxHits, Node node) { + return Optional.of(new SearchInvoker(Optional.of(node)) { + @Override protected Object sendSearchRequest(Query query, Object context) { + rpcPool.getConnection(node.key()).request(null, null, 0, null, null, 0); + return null; + }; + @Override protected InvokerResult getSearchResult(Execution execution) { + return new InvokerResult(new Result(new Query())); + } + @Override protected void release() { } + }); + }; + @Override public FillInvoker createFillInvoker(VespaBackEndSearcher searcher, Result result) { + return new FillInvoker() { + @Override protected void getFillResults(Result result, String summaryClass) { fail(); } + @Override protected void sendFillRequest(Result result, String summaryClass) { fail(); } + @Override protected void release() { fail(); } + }; + } + }; + + Dispatcher dispatcher = new Dispatcher(dispatchConfig, rpcPool, cluster, invokerFactories); + ExecutorService executor = Executors.newFixedThreadPool(1); + + // Set two groups with a single node each. The first cluster-monitor has nothing to do, and is shut down immediately. + // There are also no invokers, so the whole reconfiguration completes once the new cluster monitor has seen all nodes. + Future<?> reconfiguration = executor.submit(() -> { + dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder() + .node(new DispatchNodesConfig.Node.Builder().key(0).group(0).port(123).host("host0")) + .node(new DispatchNodesConfig.Node.Builder().key(1).group(1).port(123).host("host1")) + .build()); + }); + + // Let pings return, to allow the search cluster to reconfigure. + pingPhasers.get(0).arriveAndAwaitAdvance(); + pingPhasers.get(0).arriveAndAwaitAdvance(); + pingPhasers.get(1).arriveAndAwaitAdvance(); + pingPhasers.get(1).arriveAndAwaitAdvance(); + // We need to wait for the cluster to have at least one group, lest dispatch will fail below. + reconfiguration.get(); + assertNotEquals(cleanupThreadId.get(), Thread.currentThread().getId()); + assertEquals(1, cluster.group(0).workingNodes()); + assertEquals(1, cluster.group(1).workingNodes()); + + Node node0 = cluster.group(0).nodes().get(0); // Node0 will be replaced. + Node node1 = cluster.group(1).nodes().get(0); // Node1 will be retained. + + // Start some searches, one against each group, since we have a round-robin policy. + SearchInvoker search0 = dispatcher.getSearchInvoker(new Query(), null); + search0.search(new Query(), null); + // Unknown whether the first or second search hits node0, so we must track that. + int offset = nodeIdOfSearcher0.get(); + SearchInvoker search1 = dispatcher.getSearchInvoker(new Query(), null); + search1.search(new Query(), null); + + // Wait for the current cluster monitor to be mid-ping-round. + pingPhasers.get(0).arriveAndAwaitAdvance(); + + // Then reconfigure the dispatcher with new nodes, replacing node0 with node2. + reconfiguration = executor.submit(() -> { + dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder() + .node(new DispatchNodesConfig.Node.Builder().key(2).group(0).port(123).host("host2")) + .node(new DispatchNodesConfig.Node.Builder().key(1).group(1).port(123).host("host1")) + .build()); + }); + // Reconfiguration starts, but groups are only updated once the search cluster has knowledge about all of them. + pingPhasers.get(1).arriveAndAwaitAdvance(); + pingPhasers.get(1).arriveAndAwaitAdvance(); + pingPhasers.get(2).arriveAndAwaitAdvance(); + // Cluster has not yet updated its group reference. + assertEquals(1, cluster.group(0).workingNodes()); // Node0 is still working. + assertSame(node0, cluster.group(0).nodes().get(0)); + pingPhasers.get(2).arriveAndAwaitAdvance(); + + // Old cluster monitor is waiting for that ping to complete before it can shut down, and let reconfiguration complete. + pingPhasers.get(0).arriveAndAwaitAdvance(); + reconfiguration.get(); + Node node2 = cluster.group(0).nodes().get(0); + assertNotSame(node0, node2); + assertSame(node1, cluster.group(1).nodes().get(0)); + + // Next search should hit group0 again, this time on node2. + SearchInvoker search2 = dispatcher.getSearchInvoker(new Query(), null); + search2.search(new Query(), null); + + // Searches against nodes 1 and 2 complete. + (offset == 0 ? search0 : search1).close(); + search2.close(); + + // We're still waiting for search against node0 to complete, before we can shut down its RPC connection. + assertEquals(Set.of(0, 1, 2), rpcResources.keySet()); + (offset == 0 ? search1 : search0).close(); + // Thread for search 0 should have closed the RPC pool now. + assertEquals(Set.of(1, 2), rpcResources.keySet()); + assertEquals(cleanupThreadId.get(), Thread.currentThread().getId()); + + // Finally, reconfigure down to 0 nodes. + reconfiguration = executor.submit(() -> { + cleanupThreadId.set(Thread.currentThread().getId()); + dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder().build()); + }); + pingPhasers.get(1).forceTermination(); + pingPhasers.get(2).forceTermination(); + reconfiguration.get(); + assertNotEquals(cleanupThreadId.get(), Thread.currentThread().getId()); + dispatcher.deconstruct(); + } + interface FactoryStep { boolean returnInvoker(List<Node> nodes, boolean acceptIncompleteCoverage); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java index 5fb5b465c69..cd0791a3881 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java @@ -15,7 +15,11 @@ import java.util.Map; public class MockSearchCluster extends SearchCluster { public MockSearchCluster(String clusterId, int groups, int nodesPerGroup) { - super(clusterId, buildGroupListForTest(groups, nodesPerGroup, 88.0), null, null); + this(clusterId, groups, nodesPerGroup, null); + } + + public MockSearchCluster(String clusterId, int groups, int nodesPerGroup, PingFactory pingFactory) { + super(clusterId, buildGroupListForTest(groups, nodesPerGroup, 88.0), null, pingFactory); } @Override diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java index 51256ec496e..bfe1aed1084 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java @@ -13,11 +13,18 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; import static org.junit.jupiter.api.Assertions.*; /** @@ -31,7 +38,7 @@ public class SearchClusterTest { final int nodesPerGroup; final VipStatus vipStatus; final SearchCluster searchCluster; - final ClusterMonitor clusterMonitor; + final ClusterMonitor<Node> clusterMonitor; final List<AtomicInteger> numDocsPerNode; List<AtomicInteger> pingCounts; @@ -57,7 +64,7 @@ public class SearchClusterTest { } searchCluster = new SearchCluster(clusterId, 100.0, nodes, vipStatus, new Factory(nodesPerGroup, numDocsPerNode, pingCounts)); - clusterMonitor = new ClusterMonitor(searchCluster, false); + clusterMonitor = new ClusterMonitor<>(searchCluster, false); searchCluster.addMonitoring(clusterMonitor); } @@ -376,4 +383,37 @@ public class SearchClusterTest { assertTrue(group.isBalanced()); } + @Test + void requireThatPreciselyTheRetainedNodesAreKeptWhenNodesAreUpdated() { + try (State state = new State("query", 2, IntStream.range(0, 6).mapToObj(i -> "node-" + i).toList())) { + List<Node> referenceNodes = List.of(new Node(0, "node-0", 0), + new Node(1, "node-1", 0), + new Node(0, "node-2", 1), + new Node(1, "node-3", 1), + new Node(0, "node-4", 2), + new Node(1, "node-5", 2)); + SearchGroups oldGroups = state.searchCluster.groupList(); + assertEquals(Set.copyOf(referenceNodes), oldGroups.nodes()); + + List<Node> updatedNodes = List.of(new Node(0, "node-1", 0), // Swap node-0 and node-1 + new Node(1, "node-0", 0), // Swap node-1 and node-0 + new Node(0, "node-4", 1), // Swap node-2 and node-4 + new Node(1, "node-3", 1), + new Node(0, "node-2", 2), // Swap node-4 and node-2 + new Node(1, "node-6", 2)); // Replace node-6 + state.searchCluster.updateNodes(updatedNodes, 100.0); + SearchGroups newGroups = state.searchCluster.groupList(); + assertEquals(Set.copyOf(updatedNodes), newGroups.nodes()); + + Map<Node, Node> oldNodesByIdentity = newGroups.nodes().stream().collect(toMap(identity(), identity())); + Map<Node, Node> newNodesByIdentity = newGroups.nodes().stream().collect(toMap(identity(), identity())); + assertSame(updatedNodes.get(0), newNodesByIdentity.get(updatedNodes.get(0))); + assertSame(updatedNodes.get(1), newNodesByIdentity.get(updatedNodes.get(1))); + assertSame(updatedNodes.get(2), newNodesByIdentity.get(updatedNodes.get(2))); + assertSame(oldNodesByIdentity.get(referenceNodes.get(3)), newNodesByIdentity.get(updatedNodes.get(3))); + assertSame(updatedNodes.get(4), newNodesByIdentity.get(updatedNodes.get(4))); + assertSame(updatedNodes.get(5), newNodesByIdentity.get(updatedNodes.get(5))); + } + } + } |