aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java170
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java37
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java18
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java47
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java71
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java3
18 files changed, 294 insertions, 116 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
index fd8110e1173..d1377b8d373 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
@@ -82,7 +82,7 @@ public abstract class BaseNodeMonitor<T> {
/** Thread-safely changes the state of this node if required */
protected abstract void setWorking(boolean working,String explanation);
- /** Returns whether or not this is monitoring an internal node. Default is false. */
+ /** Returns whether this is monitoring an internal node. Default is false. */
public boolean isInternal() { return internal; }
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index 0b627e91bc5..332bf4ea2c4 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -66,7 +66,7 @@ public class ClusterMonitor<T> {
* </ul>
*
* @param node the object representing the node
- * @param internal whether or not this node is internal to this cluster
+ * @param internal whether this node is internal to this cluster
*/
public void add(T node, boolean internal) {
nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal));
@@ -96,11 +96,10 @@ public class ClusterMonitor<T> {
* Ping all nodes which needs pinging to discover state changes
*/
public void ping(Executor executor) {
- for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) {
- BaseNodeMonitor<T> monitor= i.next();
- nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded
+ for (var monitor : nodeMonitors()) {
+ if (closed.get()) return; // Do nothing to change state if close has started.
+ nodeManager.ping(this, monitor.getNode(), executor);
}
- if (closed.get()) return; // Do nothing to change state if close has started.
nodeManager.pingIterationCompleted();
}
@@ -143,7 +142,7 @@ public class ClusterMonitor<T> {
// for all pings when there are no problems (important because it ensures that
// any thread local connections are reused) 2) a new thread will be started to execute
// new pings when a ping is not responding
- ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
+ ExecutorService pingExecutor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
while (!closed.get()) {
try {
log.finest("Activating ping");
@@ -165,7 +164,9 @@ public class ClusterMonitor<T> {
}
pingExecutor.shutdown();
try {
- pingExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ if ( ! pingExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.warning("Timeout waiting for ping executor to terminate");
+ }
} catch (InterruptedException e) { }
log.info("Stopped cluster monitor thread " + getName());
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
index 4af6757db8c..1cf36d75fc5 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
@@ -48,7 +48,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
*
* @param id the id of this searcher
* @param connections the connections of the cluster
- * @param internal whether or not this cluster is internal (part of the same installation)
+ * @param internal whether this cluster is internal (part of the same installation)
*/
public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) {
this(id, connections, new Hasher<>(), internal);
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
index 1f6602053d9..f8f8c0d888d 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
@@ -22,7 +22,7 @@ public class MonitorConfiguration {
/**
* Returns the number of milliseconds to attempt to service a request
- * (at different nodes) before giving up. Default is 5000 ms.
+ * (at different nodes) before giving up. See {@link #requestTimeout}.
*/
public long getRequestTimeout() { return requestTimeout; }
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
index 11475b6a0ca..108e7e3e34b 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
@@ -23,7 +23,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
this.configuration = configuration;
}
- /** Whether or not this has ever responded successfully */
+ /** Whether this has ever responded successfully */
private boolean atStartUp = true;
public T getNode() { return node; }
@@ -55,7 +55,7 @@ public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
respondedAt = now();
succeededAt = respondedAt;
- setWorking(true,"Responds correctly");
+ setWorking(true, "Responds correctly");
}
/**
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
index 77496114df1..c6fef88fa2d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
@@ -2,7 +2,6 @@
package com.yahoo.search.dispatch;
import java.io.Closeable;
-import java.time.Duration;
import java.util.function.BiConsumer;
/**
@@ -21,8 +20,8 @@ public abstract class CloseableInvoker implements Closeable {
private RequestDuration duration;
public void teardown(BiConsumer<Boolean, RequestDuration> teardown) {
- this.teardown = teardown;
- this.duration = new RequestDuration();
+ this.teardown = this.teardown == null ? teardown : this.teardown.andThen(teardown);
+ this.duration = this.duration == null ? new RequestDuration() : this.duration;
}
protected void setFinalStatus(boolean success) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index db7e80a95e5..6f6b0fc2b79 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -1,9 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.yahoo.component.annotation.Inject;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.ComponentId;
+import com.yahoo.component.annotation.Inject;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
@@ -12,13 +12,14 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
+import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Group;
-import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.search.dispatch.searchcluster.SearchGroups;
import com.yahoo.search.query.profile.types.FieldDescription;
import com.yahoo.search.query.profile.types.FieldType;
import com.yahoo.search.query.profile.types.QueryProfileType;
@@ -32,6 +33,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
@@ -54,19 +56,43 @@ public class Dispatcher extends AbstractComponent {
/** If set will control computation of how many hits will be fetched from each partition.*/
public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY);
+ private final InvokerFactoryFactory invokerFactories;
private final DispatchConfig dispatchConfig;
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
private final SearchCluster searchCluster;
- private final ClusterMonitor<Node> clusterMonitor;
private volatile VolatileItems volatileItems;
private static class VolatileItems {
+
final LoadBalancer loadBalancer;
final InvokerFactory invokerFactory;
- VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
+ final ClusterMonitor<Node> clusterMonitor;
+ final AtomicInteger inflight = new AtomicInteger(1); // Initial reference.
+ Runnable cleanup = () -> { };
+
+ VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) {
this.loadBalancer = loadBalancer;
this.invokerFactory = invokerFactory;
+ this.clusterMonitor = clusterMonitor;
+ }
+
+ private void countDown() {
+ if (inflight.decrementAndGet() == 0) cleanup.run();
+ }
+
+ private class Ref implements AutoCloseable {
+ boolean handedOff = false;
+ { inflight.incrementAndGet(); }
+ VolatileItems get() { return VolatileItems.this; }
+ /** Hands off the reference to the given invoker, which will decrement the counter when closed. */
+ <T extends CloseableInvoker> T register(T invoker) {
+ invoker.teardown((__, ___) -> countDown());
+ handedOff = true;
+ return invoker;
+ }
+ @Override public void close() { if ( ! handedOff) countDown(); }
}
+
}
private static final QueryProfileType argumentType;
@@ -81,34 +107,105 @@ public class Dispatcher extends AbstractComponent {
public static QueryProfileType getArgumentType() { return argumentType; }
+ interface InvokerFactoryFactory {
+ InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig);
+ }
+
@Inject
- public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
- DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
- this.dispatchConfig = dispatchConfig;
- rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig);
- searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
- toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool));
- clusterMonitor = new ClusterMonitor<>(searchCluster, true);
- volatileItems = update(null);
+ public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
+ this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new);
initialWarmup(dispatchConfig.warmuptime());
}
- /* For simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
- Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
- DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
+ Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool,
+ new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
+ toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
+ invokerFactories);
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories);
+ this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it.
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) {
this.dispatchConfig = dispatchConfig;
- this.rpcResourcePool = null;
+ this.rpcResourcePool = rpcConnectionPool;
this.searchCluster = searchCluster;
- this.clusterMonitor = clusterMonitor;
- this.volatileItems = update(invokerFactory);
+ this.invokerFactories = invokerFactories;
+ this.volatileItems = update(clusterMonitor);
+ searchCluster.addMonitoring(clusterMonitor);
}
- private VolatileItems update(InvokerFactory invokerFactory) {
+ /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */
+ Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
+ DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
+ this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory);
+ }
+
+ /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */
+ private VolatileItems.Ref volatileItems() {
+ return volatileItems.new Ref();
+ }
+
+ /**
+ * This is called whenever we have new config for backend nodes.
+ * Normally, we'd want to handle partial failure of the component graph, by reinstating the old state;
+ * however, in this case, such a failure would be local to this container, and we instead want to keep
+ * the newest config, as that is what most accurately represents the actual backend.
+ *
+ * The flow of reconfiguration is:
+ * 1. The volatile snapshot of disposable items is replaced with a new one that only references updated nodes.
+ * 2. Dependencies of the items in 1., which must be configured, are updated, yielding a list of resources to close.
+ * 3. When inflight operations against the old snapshot are done, all obsolete resources are cleaned up.
+ *
+ * Ownership details:
+ * 1. The RPC resource pool is owned by the dispatcher, and is updated on node set changes;
+ * it contains the means by which the container talks to backend nodes, so cleanup must be delayed until safe.
+ * 2. The invocation factory is owned by the volatile snapshot, and is swapped atomically with it;
+ * it is used by the dispatcher to create ephemeral invokers, which must complete before cleanup (above) can happen.
+ * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it;
+ * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot.
+ * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time;
+ * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that.
+ * 5. The search cluster is owned by the dispatcher, and is updated on node set changes;
+ * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher,
+ * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible
+ * of its state across reconfigurations: with new node config, it will immediately forget obsolete nodes, and set
+ * coverage information as if the new nodes have zero documents, before even checking their status; this is fine
+ * under the assumption that this is the common case, i.e., new nodes have no documents yet.
+ */
+ void updateWithNewConfig(DispatchNodesConfig nodesConfig) {
+ try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up.
+ items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0.
+
+ // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do.
+ Collection<? extends AutoCloseable> connectionPoolsToClose = rpcResourcePool.updateNodes(nodesConfig);
+ items.get().cleanup = () -> {
+ for (AutoCloseable pool : connectionPoolsToClose) {
+ try { pool.close(); } catch (Exception ignored) { }
+ }
+ };
+
+ // Update the nodes the search cluster keeps track of, and what nodes are monitored.
+ ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
+
+ // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
+ this.volatileItems = update(newMonitor);
+
+ // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
+ items.get().clusterMonitor.shutdown();
+ } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
+ }
+
+ private VolatileItems update(ClusterMonitor<Node> clusterMonitor) {
var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
- (invokerFactory == null)
- ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig)
- : invokerFactory);
- searchCluster.addMonitoring(clusterMonitor);
+ invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
+ clusterMonitor);
return items;
}
@@ -158,27 +255,30 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
- clusterMonitor.shutdown();
+ volatileItems.clusterMonitor.shutdown();
if (rpcResourcePool != null) {
rpcResourcePool.close();
}
}
public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
- return volatileItems.invokerFactory.createFillInvoker(searcher, result);
+ try (var items = volatileItems()) { // Take a snapshot, and release it when we're done.
+ return items.register(items.get().invokerFactory.createFillInvoker(searcher, result));
+ }
}
public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
- VolatileItems items = volatileItems; // Take a snapshot
- int maxHitsPerNode = dispatchConfig.maxHitsPerNode();
- SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.invokerFactory, maxHitsPerNode)
- .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.loadBalancer, items.invokerFactory, maxHitsPerNode));
-
- if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
- query.setHits(0);
- query.setOffset(0);
+ try (var items = volatileItems()) { // Take a snapshot, and release it when we're done.
+ int maxHitsPerNode = dispatchConfig.maxHitsPerNode();
+ SearchInvoker invoker = getSearchPathInvoker(query, searcher, searchCluster.groupList(), items.get().invokerFactory, maxHitsPerNode)
+ .orElseGet(() -> getInternalInvoker(query, searcher, searchCluster, items.get().loadBalancer, items.get().invokerFactory, maxHitsPerNode));
+
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
+ query.setHits(0);
+ query.setOffset(0);
+ }
+ return items.register(invoker);
}
- return invoker;
}
/** Builds an invoker based on searchpath */
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java
new file mode 100644
index 00000000000..625a8bcb6da
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/ReconfigurableDispatcher.java
@@ -0,0 +1,37 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.container.handler.VipStatus;
+import com.yahoo.messagebus.network.rpc.SlobrokConfigSubscriber;
+import com.yahoo.vespa.config.search.DispatchConfig;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
+import com.yahoo.yolean.UncheckedInterruptedException;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * @author jonmv
+ */
+public class ReconfigurableDispatcher extends Dispatcher {
+
+ private final ConfigSubscriber subscriber;
+
+ public ReconfigurableDispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, VipStatus vipStatus) {
+ super(clusterId, dispatchConfig, new DispatchNodesConfig.Builder().build(), vipStatus);
+ this.subscriber = new ConfigSubscriber();
+ this.subscriber.subscribe(this::updateWithNewConfig, DispatchNodesConfig.class, clusterId.stringValue());
+ }
+
+ @Override
+ public void deconstruct() {
+ subscriber.close();
+ super.deconstruct();
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
index 1206277a103..6b134dc23a6 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
@@ -5,7 +5,7 @@ import java.time.Duration;
import java.time.Instant;
/**
- * Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests.
+ * Contains start and end time. Exposes a duration, and lets you measure the time difference between 2 requests.
* It does use System.nanoTime to get a steady clock.
*
* @author baldersheim
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 22ed8b6d9fa..6c1f666835c 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -12,7 +12,7 @@ import java.util.Optional;
*
* @author bratseth
*/
-interface Client {
+public interface Client {
/** Creates a connection to a particular node in this */
NodeConnection createConnection(String hostname, int port);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
index fd8e0e4f81a..a93ddb0b360 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
@@ -1,11 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
+
+import java.util.Collection;
+import java.util.List;
+
/**
* Interface for getting a connection given a node id.
*
* @author balderersheim
*/
-public interface RpcConnectionPool {
+public interface RpcConnectionPool extends AutoCloseable {
+
+ /** Returns a connection to the given node id. */
Client.NodeConnection getConnection(int nodeId);
+
+
+ /** Will return a list of items that need a delayed close when updating node set. */
+ default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); }
+
+ /** Shuts down all connections in the pool, and the underlying RPC client. */
+ @Override
+ void close();
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index 154002c4f77..b6228994ac8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory {
super(cluster, dispatchConfig);
this.rpcResourcePool = rpcResourcePool;
this.compressor = new CompressService();
- decodeType = convert(dispatchConfig.summaryDecodePolicy());
+ this.decodeType = convert(dispatchConfig.summaryDecodePolicy());
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index 53dc54f7bc5..a59097e5fff 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -37,7 +37,7 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
this.clusterMonitor = clusterMonitor;
this.pingSequenceId = node.createPingSequenceId();
this.pongHandler = pongHandler;
- this. compressor = compressor;
+ this.compressor = compressor;
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 63530a7f650..d1f22514481 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -3,8 +3,10 @@ package com.yahoo.search.dispatch.rpc;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
+import com.yahoo.search.dispatch.rpc.RpcClient.RpcNodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
+import com.yahoo.vespa.config.search.DispatchNodesConfig.Node;
import java.util.ArrayList;
import java.util.Collection;
@@ -19,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
+public class RpcResourcePool implements RpcConnectionPool {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of();
@@ -35,46 +37,35 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
}
public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
- super();
rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
numConnections = dispatchConfig.numJrtConnectionsPerNode();
- updateNodes(nodesConfig).forEach(item -> {
- try {
- item.close();
- } catch (Exception e) {}
+ updateNodes(nodesConfig).forEach(pool -> {
+ try { pool.close(); } catch (Exception ignored) { } // Shouldn't throw.
});
}
- /** Will return a list of items that need a delayed close */
- public Collection<AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
- List<AutoCloseable> toClose = new ArrayList<>();
- var builder = new HashMap<Integer, NodeConnectionPool>();
+ @Override
+ public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
+ Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools);
+ Map<Integer, NodeConnectionPool> nextPools = new HashMap<>();
// Who can be reused
- for (var node : nodesConfig.node()) {
- var prev = nodeConnectionPools.get(node.key());
- NodeConnection nc = prev != null ? prev.nextConnection() : null;
- if (nc instanceof RpcClient.RpcNodeConnection rpcNodeConnection
- && rpcNodeConnection.getPort() == node.port()
- && rpcNodeConnection.getHostname().equals(node.host()))
+ for (Node node : nodesConfig.node()) {
+ if ( currentPools.containsKey(node.key())
+ && currentPools.get(node.key()).nextConnection() instanceof RpcNodeConnection rpcNodeConnection
+ && rpcNodeConnection.getPort() == node.port()
+ && rpcNodeConnection.getHostname().equals(node.host()))
{
- builder.put(node.key(), prev);
+ nextPools.put(node.key(), currentPools.remove(node.key()));
} else {
- var connections = new ArrayList<NodeConnection>(numConnections);
+ ArrayList<NodeConnection> connections = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; i++) {
connections.add(rpcClient.createConnection(node.host(), node.port()));
}
- builder.put(node.key(), new NodeConnectionPool(connections));
+ nextPools.put(node.key(), new NodeConnectionPool(connections));
}
}
- // Who are not needed any more
- nodeConnectionPools.forEach((key, pool) -> {
- var survivor = builder.get(key);
- if (survivor == null || pool != survivor) {
- toClose.add(pool);
- }
- });
- this.nodeConnectionPools = Map.copyOf(builder);
- return toClose;
+ this.nodeConnectionPools = Map.copyOf(nextPools);
+ return currentPools.values();
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
index 121c12335f5..c8af5cea5aa 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.searchcluster;
+import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
@@ -51,7 +52,7 @@ public class Group {
/**
* Returns whether this group has sufficient active documents
- * (compared to other groups) that is should receive traffic
+ * (compared to other groups) that should receive traffic
*/
public boolean hasSufficientCoverage() {
return hasSufficientCoverage;
@@ -66,14 +67,16 @@ public class Group {
}
public void aggregateNodeValues() {
- long activeDocs = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getActiveDocuments).sum();
+ List<Node> workingNodes = new ArrayList<>(nodes);
+ workingNodes.removeIf(node -> node.isWorking() != Boolean.TRUE);
+ long activeDocs = workingNodes.stream().mapToLong(Node::getActiveDocuments).sum();
activeDocuments = activeDocs;
- targetActiveDocuments = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(Node::getTargetActiveDocuments).sum();
+ targetActiveDocuments = workingNodes.stream().mapToLong(Node::getTargetActiveDocuments).sum();
isBlockingWrites = nodes.stream().anyMatch(Node::isBlockingWrites);
- int numWorkingNodes = workingNodes();
+ int numWorkingNodes = workingNodes.size();
if (numWorkingNodes > 0) {
long average = activeDocs / numWorkingNodes;
- long skew = nodes.stream().filter(node -> node.isWorking() == Boolean.TRUE).mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
+ long skew = workingNodes.stream().mapToLong(node -> Math.abs(node.getActiveDocuments() - average)).sum();
boolean balanced = skew <= activeDocs * maxContentSkew;
if (balanced != isBalanced) {
if (!isSparse())
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 9c65cb3d4c0..3c8950f1f7f 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -6,15 +6,18 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
+import com.yahoo.yolean.UncheckedInterruptedException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.groupingBy;
/**
* A model of a search cluster we might want to dispatch queries to.
@@ -28,7 +31,7 @@ public class SearchCluster implements NodeManager<Node> {
private final String clusterId;
private final VipStatus vipStatus;
private final PingFactory pingFactory;
- private final SearchGroupsImpl groups;
+ private volatile SearchGroupsImpl groups;
private volatile long nextLogTime = 0;
/**
@@ -45,6 +48,7 @@ public class SearchCluster implements NodeManager<Node> {
VipStatus vipStatus, PingFactory pingFactory) {
this(clusterId, toGroups(nodes, minActivedocsPercentage), vipStatus, pingFactory);
}
+
public SearchCluster(String clusterId, SearchGroupsImpl groups, VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
this.vipStatus = vipStatus;
@@ -55,13 +59,28 @@ public class SearchCluster implements NodeManager<Node> {
@Override
public String name() { return clusterId; }
- public VipStatus getVipStatus() { return vipStatus; }
+
+ /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */
+ public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) {
+ Collection<Node> retainedNodes = groups.nodes();
+ Collection<Node> currentNodes = new HashSet<>(newNodes);
+ retainedNodes.retainAll(currentNodes); // Throw away all old nodes which are not in the new set.
+ currentNodes.removeIf(retainedNodes::contains); // Throw away all new nodes for which we have more information in an old object.
+ Collection<Node> addedNodes = List.copyOf(currentNodes);
+ currentNodes.addAll(retainedNodes); // Keep the old nodes that were replaced in the new set.
+ SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage);
+ ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false);
+ for (Node node : groups.nodes()) monitor.add(node, true);
+ monitor.start();
+ try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } }
+ catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
+ pingIterationCompleted(groups);
+ this.groups = groups;
+ return monitor;
+ }
public void addMonitoring(ClusterMonitor<Node> clusterMonitor) {
- for (var group : groups()) {
- for (var node : group.nodes())
- clusterMonitor.add(node, true);
- }
+ for (Node node : groups.nodes()) clusterMonitor.add(node, true);
}
private static Node findLocalCorpusDispatchTarget(String selfHostname, SearchGroups groups) {
@@ -86,14 +105,14 @@ public class SearchCluster implements NodeManager<Node> {
private static SearchGroupsImpl toGroups(Collection<Node> nodes, double minActivedocsPercentage) {
Map<Integer, Group> groups = new HashMap<>();
- for (Map.Entry<Integer, List<Node>> group : nodes.stream().collect(Collectors.groupingBy(Node::group)).entrySet()) {
- Group g = new Group(group.getKey(), group.getValue());
- groups.put(group.getKey(), g);
- }
+ nodes.stream().collect(groupingBy(Node::group)).forEach((groupId, groupNodes) -> {
+ groups.put(groupId, new Group(groupId, groupNodes));
+ });
return new SearchGroupsImpl(Map.copyOf(groups), minActivedocsPercentage);
}
public SearchGroups groupList() { return groups; }
+
public Group group(int id) { return groups.get(id); }
private Collection<Group> groups() { return groups.groups(); }
@@ -107,14 +126,14 @@ public class SearchCluster implements NodeManager<Node> {
* or empty if we should not dispatch directly.
*/
public Optional<Node> localCorpusDispatchTarget() {
- if ( localCorpusDispatchTarget == null) return Optional.empty();
+ if (localCorpusDispatchTarget == null) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
Group localSearchGroup = groups.get(localCorpusDispatchTarget.group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is not down
- if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty();
+ if (localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty();
return Optional.of(localCorpusDispatchTarget);
}
@@ -176,7 +195,7 @@ public class SearchCluster implements NodeManager<Node> {
return groups().stream().allMatch(group -> group.nodes().stream().allMatch(node -> node.isWorking() != null));
}
- public long nonWorkingNodeCount() {
+ long nonWorkingNodeCount() {
return groups().stream().flatMap(group -> group.nodes().stream()).filter(node -> node.isWorking() == Boolean.FALSE).count();
}
@@ -194,13 +213,13 @@ public class SearchCluster implements NodeManager<Node> {
/** Used by the cluster monitor to manage node status */
@Override
- public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) {
+ public void ping(ClusterMonitor<Node> clusterMonitor, Node node, Executor executor) {
Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor));
pinger.ping();
}
- private void pingIterationCompletedSingleGroup() {
- Group group = groups().iterator().next();
+ private void pingIterationCompletedSingleGroup(SearchGroupsImpl groups) {
+ Group group = groups.groups().iterator().next();
group.aggregateNodeValues();
// With just one group sufficient coverage may not be the same as full coverage, as the
// group will always be marked sufficient for use.
@@ -209,10 +228,10 @@ public class SearchCluster implements NodeManager<Node> {
trackGroupCoverageChanges(group, sufficientCoverage, group.activeDocuments());
}
- private void pingIterationCompletedMultipleGroups() {
- groups().forEach(Group::aggregateNodeValues);
+ private void pingIterationCompletedMultipleGroups(SearchGroupsImpl groups) {
+ groups.groups().forEach(Group::aggregateNodeValues);
long medianDocuments = groups.medianDocumentsPerGroup();
- for (Group group : groups()) {
+ for (Group group : groups.groups()) {
boolean sufficientCoverage = groups.isGroupCoverageSufficient(group.activeDocuments(), medianDocuments);
updateSufficientCoverage(group, sufficientCoverage);
trackGroupCoverageChanges(group, sufficientCoverage, medianDocuments);
@@ -226,20 +245,20 @@ public class SearchCluster implements NodeManager<Node> {
*/
@Override
public void pingIterationCompleted() {
+ pingIterationCompleted(groups);
+ }
+
+ private void pingIterationCompleted(SearchGroupsImpl groups) {
if (groups.size() == 1) {
- pingIterationCompletedSingleGroup();
+ pingIterationCompletedSingleGroup(groups);
} else {
- pingIterationCompletedMultipleGroups();
+ pingIterationCompletedMultipleGroups(groups);
}
}
-
-
/**
* Calculate whether a subset of nodes in a group has enough coverage
*/
-
-
private void trackGroupCoverageChanges(Group group, boolean fullCoverage, long medianDocuments) {
if ( ! hasInformationAboutAllNodes()) return; // Be silent until we know what we are talking about.
boolean changed = group.fullCoverageStatusChanged(fullCoverage);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java
index b041ba28db9..5727931281a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroups.java
@@ -1,8 +1,16 @@
package com.yahoo.search.dispatch.searchcluster;
+import com.yahoo.stream.CustomCollectors;
+
import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
import java.util.Set;
+import static java.util.Comparator.comparingInt;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toSet;
+
/**
* Simple interface for groups and their nodes in the content cluster
* @author baldersheim
@@ -14,6 +22,11 @@ public interface SearchGroups {
default boolean isEmpty() {
return size() == 0;
}
+ default Set<Node> nodes() {
+ return groups().stream().flatMap(group -> group.nodes().stream())
+ .sorted(comparingInt(Node::key))
+ .collect(toCollection(LinkedHashSet::new));
+ }
int size();
boolean isPartialGroupCoverageSufficient(Collection<Node> nodes);
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java
index 514f0de4fec..3c5dbe9927a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchGroupsImpl.java
@@ -3,10 +3,8 @@ package com.yahoo.search.dispatch.searchcluster;
import com.google.common.math.Quantiles;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
public class SearchGroupsImpl implements SearchGroups {
@@ -42,4 +40,5 @@ public class SearchGroupsImpl implements SearchGroups {
double[] activeDocuments = groups().stream().mapToDouble(Group::activeDocuments).toArray();
return (long) Quantiles.median().computeInPlace(activeDocuments);
}
+
}