summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2023-07-14 10:39:23 +0200
committerGitHub <noreply@github.com>2023-07-14 10:39:23 +0200
commitbf480e663efe4e7390285d624a7b383de66a1a10 (patch)
treea20ed2bade0cee1a0c6e5ecf647d5b881ab11427
parentb4d7c1418ad373325acf7ecfe68e59cbfa9e2fb2 (diff)
parent9c208cbc41ae710b6bacbd34455b386d27ad7781 (diff)
Merge pull request #27770 from vespa-engine/jonmv/reconfiger-dispatch-selfsub
Add code for reconfigurable dispatcher with self-sub, not wired in
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java10
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java9
-rw-r--r--container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java170
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java37
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java18
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java47
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java71
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java3
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java194
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java6
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java44
24 files changed, 546 insertions, 129 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java b/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java
index 414d4c817c7..728b4d40bdd 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/search/ContainerSearch.java
@@ -6,6 +6,8 @@ import com.yahoo.container.QrSearchersConfig;
import com.yahoo.prelude.semantics.SemanticRulesConfig;
import com.yahoo.search.config.IndexInfoConfig;
import com.yahoo.search.config.SchemaInfoConfig;
+import com.yahoo.search.dispatch.Dispatcher;
+import com.yahoo.search.dispatch.ReconfigurableDispatcher;
import com.yahoo.search.pagetemplates.PageTemplatesConfig;
import com.yahoo.search.query.profile.config.QueryProfilesConfig;
import com.yahoo.search.ranking.RankProfilesEvaluatorFactory;
@@ -81,16 +83,18 @@ public class ContainerSearch extends ContainerSubsystem<SearchChains>
/** Adds a Dispatcher component to the owning container cluster for each search cluster */
private void initializeDispatchers(Collection<SearchCluster> searchClusters) {
+ boolean useReconfigurableDispatch = false;
+ Class<? extends Dispatcher> dispatcherClass = useReconfigurableDispatch ? ReconfigurableDispatcher.class : Dispatcher.class;
for (SearchCluster searchCluster : searchClusters) {
if (searchCluster instanceof IndexedSearchCluster indexed) {
- var dispatcher = new DispatcherComponent(indexed);
+ var dispatcher = new DispatcherComponent(indexed, dispatcherClass);
owningCluster.addComponent(dispatcher);
}
if (globalPhase) {
for (var documentDb : searchCluster.getDocumentDbs()) {
- if (!schemasWithGlobalPhase.contains(documentDb.getSchemaName())) continue;
+ if ( ! schemasWithGlobalPhase.contains(documentDb.getSchemaName())) continue;
var factory = new RankProfilesEvaluatorComponent(documentDb);
- if (! owningCluster.getComponentsMap().containsKey(factory.getComponentId())) {
+ if ( ! owningCluster.getComponentsMap().containsKey(factory.getComponentId())) {
owningCluster.addComponent(factory);
}
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java b/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java
index f9a3a1f1990..fe2df8101bd 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/search/DispatcherComponent.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.model.container.search;
import com.yahoo.config.model.producer.TreeConfigProducer;
import com.yahoo.osgi.provider.model.ComponentModel;
+import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
import com.yahoo.vespa.model.container.component.Component;
@@ -22,15 +23,15 @@ public class DispatcherComponent extends Component<TreeConfigProducer<?>, Compon
private final IndexedSearchCluster indexedSearchCluster;
- public DispatcherComponent(IndexedSearchCluster indexedSearchCluster) {
- super(toComponentModel(indexedSearchCluster.getClusterName()));
+ public DispatcherComponent(IndexedSearchCluster indexedSearchCluster, Class<? extends Dispatcher> clazz) {
+ super(toComponentModel(indexedSearchCluster.getClusterName(), clazz));
this.indexedSearchCluster = indexedSearchCluster;
}
- private static ComponentModel toComponentModel(String clusterName) {
+ private static ComponentModel toComponentModel(String clusterName, Class<? extends Dispatcher> clazz) {
String dispatcherComponentId = "dispatcher." + clusterName; // used by ClusterSearcher
return new ComponentModel(dispatcherComponentId,
- com.yahoo.search.dispatch.Dispatcher.class.getName(),
+ clazz.getName(),
PlatformBundles.SEARCH_AND_DOCPROC_BUNDLE);
}
diff --git a/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java b/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java
index 52b372638c8..3471627e887 100644
--- a/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java
+++ b/container-core/src/main/java/com/yahoo/container/handler/ClustersStatus.java
@@ -12,7 +12,7 @@ import java.util.Set;
/**
* A component which tracks the up/down status of any clusters which should influence
* the up down status of this container itself, as well as the separate fact (from config)
- * that such clusters are present. This is a separate fact because we might know we have clusters configured
+ * that such clusters are present. This is a separate fact because we might know we have clusters configured,
* but we don't have positive information that they are up yet, and in this case we should be down.
*
* This is a separate component which has <b>no dependencies</b> such that the status tracked in this
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
index fd8110e1173..d1377b8d373 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
@@ -82,7 +82,7 @@ public abstract class BaseNodeMonitor<T> {
/** Thread-safely changes the state of this node if required */
protected abstract void setWorking(boolean working,String explanation);
- /** Returns whether or not this is monitoring an internal node. Default is false. */
+ /** Returns whether this is monitoring an internal node. Default is false. */
public boolean isInternal() { return internal; }
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index 0b627e91bc5..332bf4ea2c4 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -66,7 +66,7 @@ public class ClusterMonitor<T> {
* </ul>
*
* @param node the object representing the node
- * @param internal whether or not this node is internal to this cluster
+ * @param internal whether this node is internal to this cluster
*/
public void add(T node, boolean internal) {
nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal));
@@ -96,11 +96,10 @@ public class ClusterMonitor<T> {
* Ping all nodes which needs pinging to discover state changes
*/
public void ping(Executor executor) {
- for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) {
- BaseNodeMonitor<T> monitor= i.next();
- nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded
+ for (var monitor : nodeMonitors()) {
+ if (closed.get()) return; // Do nothing to change state if close has started.
+ nodeManager.ping(this, monitor.getNode(), executor);
}
- if (closed.get()) return; // Do nothing to change state if close has started.
nodeManager.pingIterationCompleted();
}
@@ -143,7 +142,7 @@ public class ClusterMonitor<T> {
// for all pings when there are no problems (important because it ensures that
// any thread local connections are reused) 2) a new thread will be started to execute
// new pings when a ping is not responding
- ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
+ ExecutorService pingExecutor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
while (!closed.get()) {
try {
log.finest("Activating ping");
@@ -165,7 +164,9 @@ public class ClusterMonitor<T> {
}
pingExecutor.shutdown();
try {
- pingExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ if ( ! pingExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.warning("Timeout waiting for ping executor to terminate");
+ }
} catch (InterruptedException e) { }
log.info("Stopped cluster monitor thread " + getName());
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
index 4af6757db8c..1cf36d75fc5 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
@@ -48,7 +48,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
*
* @param id the id of this searcher
* @param connections the connections of the cluster
- * @param internal whether or not this cluster is internal (part of the same installation)
+ * @param internal whether this cluster is internal (part of the same installation)
*/
public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) {
this(id, connections, new Hasher<>(), internal);
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
index 1f6602053d9..f8f8c0d888d 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
@@ -22,7 +22,7 @@ public class MonitorConfiguration {
/**
* Returns the number of milliseconds to attempt to service a request
- * (at different nodes) before giving up. Default is 5000 ms.
+ * (at different nodes) before giving up. See {@link #requestTimeout}.
*/
public long getRequestTimeout() { return requestTimeout; }
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
index 11475b6a0ca..108e7e3e34b 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
@@ -23,7 +23,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
this.configuration = configuration;
}
- /** Whether or not this has ever responded successfully */
+ /** Whether this has ever responded successfully */
private boolean atStartUp = true;
public T getNode() { return node; }
@@ -55,7 +55,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
respondedAt = now();
succeededAt = respondedAt;
- setWorking(true,"Responds correctly");
+ setWorking(true, "Responds correctly");
}
/**
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
index 77496114df1..c6fef88fa2d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
@@ -2,7 +2,6 @@
package com.yahoo.search.dispatch;
import java.io.Closeable;
-import java.time.Duration;
import java.util.function.BiConsumer;
/**
@@ -21,8 +20,8 @@ public abstract class CloseableInvoker implements Closeable {
private RequestDuration duration;
public void teardown(BiConsumer<Boolean, RequestDuration> teardown) {
- this.teardown = teardown;
- this.duration = new RequestDuration();
+ this.teardown = this.teardown == null ? teardown : this.teardown.andThen(teardown);
+ this.duration = this.duration == null ? new RequestDuration() : this.duration;
}
protected void setFinalStatus(boolean success) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index db7e80a95e5..6f6b0fc2b79 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -1,9 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.yahoo.component.annotation.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.ComponentId;
+import com.yahoo.component.annotation.Inject;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
@@ -12,13 +12,14 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
+import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Group;
-import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.query.profile.types.FieldDescription;
import com.yahoo.search.query.profile.types.FieldType;
import com.yahoo.search.query.profile.types.QueryProfileType;
@@ -32,6 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
@@ -54,19 +56,43 @@ public class Dispatcher extends AbstractComponent {
/** If set will control computation of how many hits will be fetched from each partition.*/
public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY);
+ private final InvokerFactoryFactory invokerFactories;
private final DispatchConfig dispatchConfig;
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
private final SearchCluster searchCluster;
- private final ClusterMonitor<Node> clusterMonitor;
private volatile VolatileItems volatileItems;
private static class VolatileItems {
+
final LoadBalancer loadBalancer;
final InvokerFactory invokerFactory;
- VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
+ final ClusterMonitor<Node> clusterMonitor;
+ final AtomicInteger inflight = new AtomicInteger(1); // Initial reference.
+ Runnable cleanup = () -> { };
+
+ VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) {
this.loadBalancer = loadBalancer;
this.invokerFactory = invokerFactory;
+ this.clusterMonitor = clusterMonitor;
+ }
+
+ private void countDown() {
+ if (inflight.decrementAndGet() == 0) cleanup.run();
+ }
+
+ private class Ref implements AutoCloseable {
+ boolean handedOff = false;
+ { inflight.incrementAndGet(); }
+ VolatileItems get() { return VolatileItems.this; }
+ /** Hands off the reference to the given invoker, which will decrement the counter when closed. */
+ <T extends CloseableInvoker> T register(T invoker) {
+ invoker.teardown((__, ___) -> countDown());
+ handedOff = true;
+ return invoker;
+ }
+ @Override public void close() { if ( ! handedOff) countDown(); }
}
+
}
private static final QueryProfileType argumentType;
@@ -81,34 +107,105 @@ public class Dispatcher extends AbstractComponent {
public static QueryProfileType getArgumentType() { return argumentType; }
+ interface InvokerFactoryFactory {
+ InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig);
+ }
+
@Inject
- public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
- DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
- this.dispatchConfig = dispatchConfig;
- rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig);
- searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
- toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool));
- clusterMonitor = new ClusterMonitor<>(searchCluster, true);
- volatileItems = update(null);
+ public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
+ this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new);
initialWarmup(dispatchConfig.warmuptime());
}
- /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
- Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
- DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
+ Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool,
+ new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
+ toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
+ invokerFactories);
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories);
+ this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it.
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) {
this.dispatchConfig = dispatchConfig;
- this.rpcResourcePool = null;
+ this.rpcResourcePool = rpcConnectionPool;
this.searchCluster = searchCluster;
- this.clusterMonitor = clusterMonitor;
- this.volatileItems = update(invokerFactory);
+ this.invokerFactories = invokerFactories;
+ this.volatileItems = update(clusterMonitor);
+ searchCluster.addMonitoring(clusterMonitor);
}
- private VolatileItems update(InvokerFactory invokerFactory) {
+ /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */
+ Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
+ DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
+ this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory);
+ }
+
+ /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */
+ private VolatileItems.Ref volatileItems() {
+ return volatileItems.new Ref();
+ }
+
+ /**
+ * This is called whenever we have new config for backend nodes.
+ * Normally, we'd want to handle partial failure of the component graph, by reinstating the old state;
+ * however, in this case, such a failure would be local to this container, and we instead want to keep
+ * the newest config, as that is what most accurately represents the actual backend.
+ *
+ * The flow of reconfiguration is:
+ * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes.
+ * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close.
+ * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up.
+ *
+ * Ownership details:
+ * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes;
+ * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe.
+ * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it;
+ * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen.
+ * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it;
+ * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot.
+ * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time;
+ * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that.
+ * 5. The search cluster is owned by the dispatcher, and is updated on node set changes;
+ * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher,
+ * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible
+ * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set
+ * coverage information as if the new nodes have zero documents, before even checking their status; this is fine
+ * under the assumption that this is the common case, i.e., new nodes have no documents yet.
+ */
+ void updateWithNewConfig(DispatchNodesConfig nodesConfig) {
+ try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up.
+ items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0.
+
+ // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do.
+ Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig);
+ items.get().cleanup = () -> {
+ for (AutoCloseable pool : connectionPoolsToClose) {
+ try { pool.close(); } catch (Exception ignored) { }
+ }
+ };
+
+ // Update the nodes the search cluster keeps track of, and what nodes are monitored.
+ ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
+
+ // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
+ this.volatileItems = update(newMonitor);
+
+ // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
+ items.get().clusterMonitor.shutdown();
+ } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
+ }
+
+ private VolatileItems update(ClusterMonitor<Node> clusterMonitor) {
var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
- (invokerFactory == null)
- ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig)
- : invokerFactory);
- searchCluster.addMonitoring(clusterMonitor);
+ invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
+ clusterMonitor);
return items;
}
@@ -158,27 +255,30 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
- clusterMonitor.shutdown();
+ volatileItems.clusterMonitor.shutdown();
if (rpcResourcePool != null) {
rpcResourcePool.close();
}
}
public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
- return volatileItems.invokerFactory.createFillInvoker(searcher, result);
+ try (var items = volatileItems()) { // Take a snapshot, and release it when we're done.
+ return items.register(items.get().invokerFactory.createFillInvoker(searcher, result));
+ }
}
public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
- VolatileItems items = volatileItems; // Take a snapshot
- int maxHitsPerNode = dispatchConfig.maxHitsPerNode();
- SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode)
- .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode));
-
- if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
- query.setHits(0);
- query.setOffset(0);
+ try (var items = volatileItems()) { // Take a snapshot, and release it when we're done.
+ int maxHitsPerNode = dispatchConfig.maxHitsPerNode();
+ SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode)
+ .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode));
+
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
+ query.setHits(0);
+ query.setOffset(0);
+ }
+ return items.register(invoker);
}
- return invoker;
}
/** Builds an invoker based on searchpath */
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java
new file mode 100644
index 00000000000..625a8bcb6da
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java
@@ -0,0 +1,37 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.container.handler.VipStatus;
+import com.yahoo.messagebus.network.rpc.SlobrokConfigSubscriber;
+import com.yahoo.vespa.config.search.DispatchConfig;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
+import com.yahoo.yolean.UncheckedInterruptedException;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * @author jonmv
+ */
+public class ReconfigurableDispatcher extends Dispatcher {
+
+ private final ConfigSubscriber subscriber;
+
+ public ReconfigurableDispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, VipStatus vipStatus) {
+ super(clusterId, dispatchConfig, new DispatchNodesConfig.Builder().build(), vipStatus);
+ this.subscriber = new ConfigSubscriber();
+ this.subscriber.subscribe(this::updateWithNewConfig, DispatchNodesConfig.class, clusterId.stringValue());
+ }
+
+ @Override
+ public void deconstruct() {
+ subscriber.close();
+ super.deconstruct();
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
index 1206277a103..6b134dc23a6 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
@@ -5,7 +5,7 @@ import java.time.Duration;
import java.time.Instant;
/**
- * Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests.
+ * Contains start and end time. Exposes a duration, and lets you measure the time difference between 2 requests.
* It does use System.nanoTime to get a steady clock.
*
* @author baldersheim
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 22ed8b6d9fa..6c1f666835c 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -12,7 +12,7 @@ import java.util.Optional;
*
* @author bratseth
*/
-interface Client {
+public interface Client {
/** Creates a connection to a particular node in this */
NodeConnection createConnection(String hostname, int port);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
index fd8e0e4f81a..a93ddb0b360 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
@@ -1,11 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
+
+import java.util.Collection;
+import java.util.List;
+
/**
* Interface for getting a connection given a node id.
*
* @author balderersheim
*/
-public interface RpcConnectionPool {
+public interface RpcConnectionPool extends AutoCloseable {
+
+ /** Returns a connection to the given node id. */
Client.NodeConnection getConnection(int nodeId);
+
+
+ /** Will return a list of items that need a delayed close when updating node set. */
+ default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); }
+
+ /** Shuts down all connections in the pool, and the underlying RPC client. */
+ @Override
+ void close();
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index 154002c4f77..b6228994ac8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory {
super(cluster, dispatchConfig);
this.rpcResourcePool = rpcResourcePool;
this.compressor = new CompressService();
- decodeType = convert(dispatchConfig.summaryDecodePolicy());
+ this.decodeType = convert(dispatchConfig.summaryDecodePolicy());
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index 53dc54f7bc5..a59097e5fff 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -37,7 +37,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
this.clusterMonitor = clusterMonitor;
this.pingSequenceId = node.createPingSequenceId();
this.pongHandler = pongHandler;
- this. compressor = compressor;
+ this.compressor = compressor;
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 63530a7f650..d1f22514481 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -3,8 +3,10 @@ package com.yahoo.search.dispatch.rpc;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
+import com.yahoo.search.dispatch.rpc.RpcClient.RpcNodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
+import com.yahoo.vespa.config.search.DispatchNodesConfig.Node;
import java.util.ArrayList;
import java.util.Collection;
@@ -19,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
+public class RpcResourcePool implements RpcConnectionPool {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of();
@@ -35,46 +37,35 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
}
public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
- super();
rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
numConnections = dispatchConfig.numJrtConnectionsPerNode();
- updateNodes(nodesConfig).forEach(item -> {
- try {
- item.close();
- } catch (Exception e) {}
+ updateNodes(nodesConfig).forEach(pool -> {
+ try { pool.close(); } catch (Exception ignored) { } // Shouldn't throw.
});
}
- /** Will return a list of items that need a delayed close */
- public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
- List<AutoCloseable> toClose = new ArrayList<>();
- var builder = new HashMap<Integer, NodeConnectionPool>();
+ @Override
+ public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
+ Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools);
+ Map<Integer, NodeConnectionPool> nextPools = new HashMap<>();
// Who can be reused
- for (var node : nodesConfig.node()) {
- var prev = nodeConnectionPools.get(node.key());
- NodeConnection nc = prev != null ? prev.nextConnection() : null;
- if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection
- && rpcNodeConnection.getPort() == node.port()
- && rpcNodeConnection.getHostname().equals(node.host()))
+ for (Node node : nodesConfig.node()) {
+ if ( currentPools.containsKey(node.key())
+ && currentPools.get(node.key()).nextConnection() instanceof RpcNodeConnection rpcNodeConnection
+ && rpcNodeConnection.getPort() == node.port()
+ && rpcNodeConnection.getHostname().equals(node.host()))
{
- builder.put(node.key(), prev);
+ nextPools.put(node.key(), currentPools.remove(node.key()));
} else {
- var connections = new ArrayList<NodeConnection>(numConnections);
+ ArrayList<NodeConnection> connections = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; i++) {
connections.add(rpcClient.createConnection(node.host(), node.port()));
}
- builder.put(node.key(), new NodeConnectionPool(connections));
+ nextPools.put(node.key(), new NodeConnectionPool(connections));
}
}
- // Who are not needed any more
- nodeConnectionPools.forEach((key, pool) -> {
- var survivor = builder.get(key);
- if (survivor == null || pool != survivor) {
- toClose.add(pool);
- }
- });
- this.nodeConnectionPools = Map.copyOf(builder);
- return toClose;
+ this.nodeConnectionPools = Map.copyOf(nextPools);
+ return currentPools.values();
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index 121c12335f5..c8af5cea5aa 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.searchcluster;
+import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
@@ -51,7 +52,7 @@ public class Group {
/**
* Returns whether this group has sufficient active documents
- * (compared to other groups) that is should receive traffic
+ * (compared to other groups) that should receive traffic
*/
public boolean hasSufficientCoverage() {
return hasSufficientCoverage;
@@ -66,14 +67,16 @@ public class Group {
}
public void aggregateNodeValues() {
- long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum();
+ List<Node> workingNodes = new ArrayList<>(nodes);
+ workingNodes.removeIf(node -> node.isWorking() != Boolean.TRUE);
+ long activeDocs = workingNodes.stream().mapToLong(Node::getActiveDocuments).sum();
activeDocuments = activeDocs;
- targetActiveDocuments = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getTargetActiveDocuments).sum();
+ targetActiveDocuments = workingNodes.stream().mapToLong(Node::getTargetActiveDocuments).sum();
isBlockingWrites = nodes.stream().anyMatch(Node::isBlockingWrites);
- int numWorkingNodes = workingNodes();
+ int numWorkingNodes = workingNodes.size();
if (numWorkingNodes > 0) {
long average = activeDocs / numWorkingNodes;
- long skew = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
+ long skew = workingNodes.stream().mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
boolean balanced = skew <= activeDocs * maxContentSkew;
if (balanced != isBalanced) {
if (!isSparse())
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 9c65cb3d4c0..3c8950f1f7f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -6,15 +6,18 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
+import com.yahoo.yolean.UncheckedInterruptedException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.groupingBy;
/**
* A model of a search cluster we might want to dispatch queries to.
@@ -28,7 +31,7 @@ public class SearchCluster implements NodeManager<Node> {
private final String clusterId;
private final VipStatus vipStatus;
private final PingFactory pingFactory;
- private final SearchGroupsImpl groups;
+ private volatile SearchGroupsImpl groups;
private volatile long nextLogTime = 0;
/**
@@ -45,6 +48,7 @@ public class SearchCluster implements NodeManager<Node> {
VipStatus vipStatus, PingFactory pingFactory) {
this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory);
}
+
public SearchCluster(String clusterId, SearchGroupsImpl groups, VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
this.vipStatus = vipStatus;
@@ -55,13 +59,28 @@ public class SearchCluster implements NodeManager<Node> {
@Override
public String name() { return clusterId; }
- public VipStatus getVipStatus() { return vipStatus; }
+
+ /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */
+ public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) {
+ Collection<Node> retainedNodes = groups.nodes();
+ Collection<Node> currentNodes = new HashSet<>(newNodes);
+ retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set.
+ currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object.
+ Collection<Node> addedNodes = List.copyOf(currentNodes);
+ currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set.
+ SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage);
+ ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false);
+ for (Node node : groups.nodes()) monitor.add(node, true);
+ monitor.start();
+ try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } }
+ catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
+ pingIterationCompleted(groups);
+ this.groups = groups;
+ return monitor;
+ }
public void addMonitoring(ClusterMonitor<Node> clusterMonitor) {
- for (var group : groups()) {
- for (var node : group.nodes())
- clusterMonitor.add(node, true);
- }
+ for (Node node : groups.nodes()) clusterMonitor.add(node, true);
}
private static Node findLocalCorpusDispatchTarget(String selfHostname, SearchGroups groups) {
@@ -86,14 +105,14 @@ public class SearchCluster implements NodeManager<Node> {
private static SearchGroupsImpl toGroups(Collection<Node> nodes, double minActivedocsPercentage) {
Map<Integer, Group> groups = new HashMap<>();
- for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) {
- Group g = new Group(group.getKey(), group.getValue());
- groups.put(group.getKey(), g);
- }
+ nodes.stream().collect(groupingBy(Node::group)).forEach((groupId, groupNodes) -> {
+ groups.put(groupId, new Group(groupId, groupNodes));
+ });
return new SearchGroupsImpl(Map.copyOf(groups), minActivedocsPercentage);
}
public SearchGroups groupList() { return groups; }
+
public Group group(int id) { return groups.get(id); }
private Collection<Group> groups() { return groups.groups(); }
@@ -107,14 +126,14 @@ public class SearchCluster implements NodeManager<Node> {
* or empty if we should not dispatch directly.
*/
public Optional<Node> localCorpusDispatchTarget() {
- if ( localCorpusDispatchTarget == null) return Optional.empty();
+ if (localCorpusDispatchTarget == null) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
Group localSearchGroup = groups.get(localCorpusDispatchTarget.group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is not down
- if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty();
+ if (localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty();
return Optional.of(localCorpusDispatchTarget);
}
@@ -176,7 +195,7 @@ public class SearchCluster implements NodeManager<Node> {
return groups().stream().allMatch(group -> group.nodes().stream().allMatch(node -> node.isWorking() != null));
}
- public long nonWorkingNodeCount() {
+ long nonWorkingNodeCount() {
return groups().stream().flatMap(group -> group.nodes().stream()).filter(node -> node.isWorking() == Boolean.FALSE).count();
}
@@ -194,13 +213,13 @@ public class SearchCluster implements NodeManager<Node> {
/** Used by the cluster monitor to manage node status */
@Override
- public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) {
+ public void ping(ClusterMonitor<Node> clusterMonitor, Node node, Executor executor) {
Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor));
pinger.ping();
}
- private void pingIterationCompletedSingleGroup() {
- Group group = groups().iterator().next();
+ private void pingIterationCompletedSingleGroup(SearchGroupsImpl groups) {
+ Group group = groups.groups().iterator().next();
group.aggregateNodeValues();
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
@@ -209,10 +228,10 @@ public class SearchCluster implements NodeManager<Node> {
trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments());
}
- private void pingIterationCompletedMultipleGroups() {
- groups().forEach(Group::aggregateNodeValues);
+ private void pingIterationCompletedMultipleGroups(SearchGroupsImpl groups) {
+ groups.groups().forEach(Group::aggregateNodeValues);
long medianDocuments = groups.medianDocumentsPerGroup();
- for (Group group : groups()) {
+ for (Group group : groups.groups()) {
boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments);
updateSufficientCoverage(group, sufficientCoverage);
trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments);
@@ -226,20 +245,20 @@ public class SearchCluster implements NodeManager<Node> {
*/
@Override
public void pingIterationCompleted() {
+ pingIterationCompleted(groups);
+ }
+
+ private void pingIterationCompleted(SearchGroupsImpl groups) {
if (groups.size() == 1) {
- pingIterationCompletedSingleGroup();
+ pingIterationCompletedSingleGroup(groups);
} else {
- pingIterationCompletedMultipleGroups();
+ pingIterationCompletedMultipleGroups(groups);
}
}
-
-
/**
* Calculate whether a subset of nodes in a group has enough coverage
*/
-
-
private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) {
if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about.
boolean changed = group.fullCoverageStatusChanged(fullCoverage);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java
index b041ba28db9..5727931281a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java
@@ -1,8 +1,16 @@
package com.yahoo.search.dispatch.searchcluster;
+import com.yahoo.stream.CustomCollectors;
+
import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
import java.util.Set;
+import static java.util.Comparator.comparingInt;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toSet;
+
/**
* Simple interface for groups and their nodes in the content cluster
* @author baldersheim
@@ -14,6 +22,11 @@ public interface SearchGroups {
default boolean isEmpty() {
return size() == 0;
}
+ default Set<Node> nodes() {
+ return groups().stream().flatMap(group -> group.nodes().stream())
+ .sorted(comparingInt(Node::key))
+ .collect(toCollection(LinkedHashSet::new));
+ }
int size();
boolean isPartialGroupCoverageSufficient(Collection<Node> nodes);
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java
index 514f0de4fec..3c5dbe9927a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java
@@ -3,10 +3,8 @@ package com.yahoo.search.dispatch.searchcluster;
import com.google.common.math.Quantiles;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
public class SearchGroupsImpl implements SearchGroups {
@@ -42,4 +40,5 @@ public class SearchGroupsImpl implements SearchGroups {
double[] activeDocuments = groups().stream().mapToDouble(Group::activeDocuments).toArray();
return (long) Quantiles.median().computeInPlace(activeDocuments);
}
+
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index 2603f89b546..1278afe3759 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -1,27 +1,51 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
+import com.yahoo.search.dispatch.Dispatcher.InvokerFactoryFactory;
+import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
+import com.yahoo.search.dispatch.rpc.Client.ResponseReceiver;
+import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.searchcluster.MockSearchCluster;
-import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.PingFactory;
import com.yahoo.search.dispatch.searchcluster.Pinger;
import com.yahoo.search.dispatch.searchcluster.PongHandler;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.search.dispatch.searchcluster.SearchGroups;
+import com.yahoo.search.searchchain.Execution;
import com.yahoo.vespa.config.search.DispatchConfig;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static com.yahoo.search.dispatch.searchcluster.MockSearchCluster.createDispatchConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -149,6 +173,172 @@ public class DispatcherTest {
dispatcher.deconstruct();
}
+ @Test
+ void testRpcResourceShutdownOnReconfiguration() throws InterruptedException, ExecutionException, IOException {
+ // Ping factory lets us tick each ping, so we may delay shutdown, due to monitor thread RPC usage.
+ Map<Integer, Phaser> pingPhasers = new ConcurrentHashMap<>();
+ pingPhasers.put(0, new Phaser(2));
+ pingPhasers.put(1, new Phaser(2));
+ pingPhasers.put(2, new Phaser(2));
+
+ PingFactory pingFactory = (node, monitor, pongHandler) -> () -> {
+ pingPhasers.get(node.key()).arriveAndAwaitAdvance();
+ pongHandler.handle(new Pong(2, 2));
+ pingPhasers.get(node.key()).arriveAndAwaitAdvance();
+ };
+
+ // Search cluster uses the ping factory, and zero nodes initially, later configured with two nodes.
+ SearchCluster cluster = new MockSearchCluster("cid", 0, 1, pingFactory);
+
+ // Dummy RPC layer where we manually tick responses for each node.
+ // When a response is let go, we verify the RPC resource is not yet closed.
+ // This is signalled by terminating its phaser, which is done by the dispatcher in delayed cleanup.
+ // We verify in the end that all connections have been shut down, prior to shutting down the RPC pool proper.
+ Map<Integer, Boolean > rpcResources = new HashMap<>();
+ AtomicLong cleanupThreadId = new AtomicLong();
+ AtomicInteger nodeIdOfSearcher0 = new AtomicInteger(-1);
+ RpcConnectionPool rpcPool = new RpcConnectionPool() {
+ // Returns a connection that lets us advance the searcher when we want to, as well as tracking which threads do what.
+ @Override public NodeConnection getConnection(int nodeId) {
+ nodeIdOfSearcher0.set(nodeId);
+ return new NodeConnection() {
+ @Override public void request(String rpcMethod, CompressionType compression, int uncompressedLength, byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) {
+ assertTrue(rpcResources.get(nodeId));
+ }
+ @Override public void close() {
+ assertFalse(rpcResources.remove(nodeId));
+ }
+ };
+ }
+ // Verifies cleanup is done by the expected thread, by ID, and cleans up the "RPC connection" (phaser).
+ @Override public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig config) {
+ for (DispatchNodesConfig.Node node : config.node())
+ rpcResources.putIfAbsent(node.key(), true);
+ return rpcResources.keySet().stream()
+ .filter(key -> config.node().stream().noneMatch(node -> node.key() == key))
+ .map(key -> (AutoCloseable) () -> {
+ assertTrue(rpcResources.put(key, false));
+ cleanupThreadId.set(Thread.currentThread().getId());
+ getConnection(key).close();
+ })
+ .toList();
+ };
+ // In the end, we have reconfigured down to 0 nodes, and no resources should be left running after cleanup.
+ @Override public void close() {
+ assertEquals(Map.of(), rpcResources);
+ }
+ };
+
+ // This factory just forwards search to the dummy RPC layer above, nothing more.
+ InvokerFactoryFactory invokerFactories = (rpcConnectionPool, searchGroups, dispatchConfig) -> new InvokerFactory(searchGroups, dispatchConfig) {
+ @Override protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, int maxHits, Node node) {
+ return Optional.of(new SearchInvoker(Optional.of(node)) {
+ @Override protected Object sendSearchRequest(Query query, Object context) {
+ rpcPool.getConnection(node.key()).request(null, null, 0, null, null, 0);
+ return null;
+ };
+ @Override protected InvokerResult getSearchResult(Execution execution) {
+ return new InvokerResult(new Result(new Query()));
+ }
+ @Override protected void release() { }
+ });
+ };
+ @Override public FillInvoker createFillInvoker(VespaBackEndSearcher searcher, Result result) {
+ return new FillInvoker() {
+ @Override protected void getFillResults(Result result, String summaryClass) { fail(); }
+ @Override protected void sendFillRequest(Result result, String summaryClass) { fail(); }
+ @Override protected void release() { fail(); }
+ };
+ }
+ };
+
+ Dispatcher dispatcher = new Dispatcher(dispatchConfig, rpcPool, cluster, invokerFactories);
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+
+ // Set two groups with a single node each. The first cluster-monitor has nothing to do, and is shut down immediately.
+ // There are also no invokers, so the whole reconfiguration completes once the new cluster monitor has seen all nodes.
+ Future<?> reconfiguration = executor.submit(() -> {
+ dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder()
+ .node(new DispatchNodesConfig.Node.Builder().key(0).group(0).port(123).host("host0"))
+ .node(new DispatchNodesConfig.Node.Builder().key(1).group(1).port(123).host("host1"))
+ .build());
+ });
+
+ // Let pings return, to allow the search cluster to reconfigure.
+ pingPhasers.get(0).arriveAndAwaitAdvance();
+ pingPhasers.get(0).arriveAndAwaitAdvance();
+ pingPhasers.get(1).arriveAndAwaitAdvance();
+ pingPhasers.get(1).arriveAndAwaitAdvance();
+ // We need to wait for the cluster to have at least one group, lest dispatch will fail below.
+ reconfiguration.get();
+ assertNotEquals(cleanupThreadId.get(), Thread.currentThread().getId());
+ assertEquals(1, cluster.group(0).workingNodes());
+ assertEquals(1, cluster.group(1).workingNodes());
+
+ Node node0 = cluster.group(0).nodes().get(0); // Node0 will be replaced.
+ Node node1 = cluster.group(1).nodes().get(0); // Node1 will be retained.
+
+ // Start some searches, one against each group, since we have a round-robin policy.
+ SearchInvoker search0 = dispatcher.getSearchInvoker(new Query(), null);
+ search0.search(new Query(), null);
+ // Unknown whether the first or second search hits node0, so we must track that.
+ int offset = nodeIdOfSearcher0.get();
+ SearchInvoker search1 = dispatcher.getSearchInvoker(new Query(), null);
+ search1.search(new Query(), null);
+
+ // Wait for the current cluster monitor to be mid-ping-round.
+ pingPhasers.get(0).arriveAndAwaitAdvance();
+
+ // Then reconfigure the dispatcher with new nodes, replacing node0 with node2.
+ reconfiguration = executor.submit(() -> {
+ dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder()
+ .node(new DispatchNodesConfig.Node.Builder().key(2).group(0).port(123).host("host2"))
+ .node(new DispatchNodesConfig.Node.Builder().key(1).group(1).port(123).host("host1"))
+ .build());
+ });
+ // Reconfiguration starts, but groups are only updated once the search cluster has knowledge about all of them.
+ pingPhasers.get(1).arriveAndAwaitAdvance();
+ pingPhasers.get(1).arriveAndAwaitAdvance();
+ pingPhasers.get(2).arriveAndAwaitAdvance();
+ // Cluster has not yet updated its group reference.
+ assertEquals(1, cluster.group(0).workingNodes()); // Node0 is still working.
+ assertSame(node0, cluster.group(0).nodes().get(0));
+ pingPhasers.get(2).arriveAndAwaitAdvance();
+
+ // Old cluster monitor is waiting for that ping to complete before it can shut down, and let reconfiguration complete.
+ pingPhasers.get(0).arriveAndAwaitAdvance();
+ reconfiguration.get();
+ Node node2 = cluster.group(0).nodes().get(0);
+ assertNotSame(node0, node2);
+ assertSame(node1, cluster.group(1).nodes().get(0));
+
+ // Next search should hit group0 again, this time on node2.
+ SearchInvoker search2 = dispatcher.getSearchInvoker(new Query(), null);
+ search2.search(new Query(), null);
+
+ // Searches against nodes 1 and 2 complete.
+ (offset == 0 ? search0 : search1).close();
+ search2.close();
+
+ // We're still waiting for search against node0 to complete, before we can shut down its RPC connection.
+ assertEquals(Set.of(0, 1, 2), rpcResources.keySet());
+ (offset == 0 ? search1 : search0).close();
+ // Thread for search 0 should have closed the RPC pool now.
+ assertEquals(Set.of(1, 2), rpcResources.keySet());
+ assertEquals(cleanupThreadId.get(), Thread.currentThread().getId());
+
+ // Finally, reconfigure down to 0 nodes.
+ reconfiguration = executor.submit(() -> {
+ cleanupThreadId.set(Thread.currentThread().getId());
+ dispatcher.updateWithNewConfig(new DispatchNodesConfig.Builder().build());
+ });
+ pingPhasers.get(1).forceTermination();
+ pingPhasers.get(2).forceTermination();
+ reconfiguration.get();
+ assertNotEquals(cleanupThreadId.get(), Thread.currentThread().getId());
+ dispatcher.deconstruct();
+ }
+
interface FactoryStep {
boolean returnInvoker(List<Node> nodes, boolean acceptIncompleteCoverage);
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java
index 5fb5b465c69..cd0791a3881 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/MockSearchCluster.java
@@ -15,7 +15,11 @@ import java.util.Map;
public class MockSearchCluster extends SearchCluster {
public MockSearchCluster(String clusterId, int groups, int nodesPerGroup) {
- super(clusterId, buildGroupListForTest(groups, nodesPerGroup, 88.0), null, null);
+ this(clusterId, groups, nodesPerGroup, null);
+ }
+
+ public MockSearchCluster(String clusterId, int groups, int nodesPerGroup, PingFactory pingFactory) {
+ super(clusterId, buildGroupListForTest(groups, nodesPerGroup, 88.0), null, pingFactory);
}
@Override
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
index 51256ec496e..bfe1aed1084 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
@@ -13,11 +13,18 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
import static org.junit.jupiter.api.Assertions.*;
/**
@@ -31,7 +38,7 @@ public class SearchClusterTest {
final int nodesPerGroup;
final VipStatus vipStatus;
final SearchCluster searchCluster;
- final ClusterMonitor clusterMonitor;
+ final ClusterMonitor<Node> clusterMonitor;
final List<AtomicInteger> numDocsPerNode;
List<AtomicInteger> pingCounts;
@@ -57,7 +64,7 @@ public class SearchClusterTest {
}
searchCluster = new SearchCluster(clusterId, 100.0, nodes,
vipStatus, new Factory(nodesPerGroup, numDocsPerNode, pingCounts));
- clusterMonitor = new ClusterMonitor(searchCluster, false);
+ clusterMonitor = new ClusterMonitor<>(searchCluster, false);
searchCluster.addMonitoring(clusterMonitor);
}
@@ -376,4 +383,37 @@ public class SearchClusterTest {
assertTrue(group.isBalanced());
}
+ @Test
+ void requireThatPreciselyTheRetainedNodesAreKeptWhenNodesAreUpdated() {
+ try (State state = new State("query", 2, IntStream.range(0, 6).mapToObj(i -> "node-" + i).toList())) {
+ List<Node> referenceNodes = List.of(new Node(0, "node-0", 0),
+ new Node(1, "node-1", 0),
+ new Node(0, "node-2", 1),
+ new Node(1, "node-3", 1),
+ new Node(0, "node-4", 2),
+ new Node(1, "node-5", 2));
+ SearchGroups oldGroups = state.searchCluster.groupList();
+ assertEquals(Set.copyOf(referenceNodes), oldGroups.nodes());
+
+ List<Node> updatedNodes = List.of(new Node(0, "node-1", 0), // Swap node-0 and node-1
+ new Node(1, "node-0", 0), // Swap node-1 and node-0
+ new Node(0, "node-4", 1), // Swap node-2 and node-4
+ new Node(1, "node-3", 1),
+ new Node(0, "node-2", 2), // Swap node-4 and node-2
+ new Node(1, "node-6", 2)); // Replace node-6
+ state.searchCluster.updateNodes(updatedNodes, 100.0);
+ SearchGroups newGroups = state.searchCluster.groupList();
+ assertEquals(Set.copyOf(updatedNodes), newGroups.nodes());
+
+ Map<Node, Node> oldNodesByIdentity = newGroups.nodes().stream().collect(toMap(identity(), identity()));
+ Map<Node, Node> newNodesByIdentity = newGroups.nodes().stream().collect(toMap(identity(), identity()));
+ assertSame(updatedNodes.get(0), newNodesByIdentity.get(updatedNodes.get(0)));
+ assertSame(updatedNodes.get(1), newNodesByIdentity.get(updatedNodes.get(1)));
+ assertSame(updatedNodes.get(2), newNodesByIdentity.get(updatedNodes.get(2)));
+ assertSame(oldNodesByIdentity.get(referenceNodes.get(3)), newNodesByIdentity.get(updatedNodes.get(3)));
+ assertSame(updatedNodes.get(4), newNodesByIdentity.get(updatedNodes.get(4)));
+ assertSame(updatedNodes.get(5), newNodesByIdentity.get(updatedNodes.get(5)));
+ }
+ }
+
}