aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-07-13 12:00:37 +0200
committerjonmv <venstad@gmail.com>2023-07-13 12:00:37 +0200
commitbcdf634ebaf2b16e2f64937e453d37067750d172 (patch)
tree0c5a450444c2327ccd6f7314f14282a113410c0d /container-search
parent578101eb6d25be28387f1ab0b6d575a1ed6df869 (diff)
Set up constructor wiring for testing rpc connection pool update
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java52
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java18
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java5
4 files changed, 55 insertions, 22 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 9b0c79dc2bc..3b686da35f6 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -12,6 +12,7 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
+import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
@@ -55,8 +56,9 @@ public class Dispatcher extends AbstractComponent {
/** If set will control computation of how many hits will be fetched from each partition.*/
public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY);
+ private final InvokerFactoryFactory invokerFactories;
private final DispatchConfig dispatchConfig;
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
private final SearchCluster searchCluster;
private volatile VolatileItems volatileItems;
@@ -105,24 +107,42 @@ public class Dispatcher extends AbstractComponent {
public static QueryProfileType getArgumentType() { return argumentType; }
+ private interface InvokerFactoryFactory {
+ InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig);
+ }
+
@Inject
- public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
- DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
- this.dispatchConfig = dispatchConfig;
- rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig);
- searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
- toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool));
- volatileItems = update(null, new ClusterMonitor<>(searchCluster, true));
+ public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
+ this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new);
initialWarmup(dispatchConfig.warmuptime());
}
+ Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool,
+ new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
+ toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
+ invokerFactories);
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, true), invokerFactories);
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) {
+ this.dispatchConfig = dispatchConfig;
+ this.rpcResourcePool = rpcConnectionPool;
+ this.searchCluster = searchCluster;
+ this.invokerFactories = invokerFactories;
+ this.volatileItems = update(clusterMonitor);
+ }
+
/* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */
Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
- this.dispatchConfig = dispatchConfig;
- this.rpcResourcePool = null;
- this.searchCluster = searchCluster;
- this.volatileItems = update(invokerFactory, clusterMonitor);
+ this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory);
}
/** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */
@@ -173,18 +193,16 @@ public class Dispatcher extends AbstractComponent {
searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
// Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
- this.volatileItems = update(null, new ClusterMonitor<>(searchCluster, true));
+ this.volatileItems = update(new ClusterMonitor<>(searchCluster, true));
// Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
items.get().clusterMonitor.shutdown();
} // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
}
- private VolatileItems update(InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) {
+ private VolatileItems update(ClusterMonitor<Node> clusterMonitor) {
var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
- (invokerFactory == null)
- ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig)
- : invokerFactory,
+ invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
clusterMonitor);
searchCluster.addMonitoring(clusterMonitor);
return items;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
index fd8e0e4f81a..a93ddb0b360 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
@@ -1,11 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.vespa.config.search.DispatchNodesConfig;
+
+import java.util.Collection;
+import java.util.List;
+
/**
* Interface for getting a connection given a node id.
*
* @author balderersheim
*/
-public interface RpcConnectionPool {
+public interface RpcConnectionPool extends AutoCloseable {
+
+ /** Returns a connection to the given node id. */
Client.NodeConnection getConnection(int nodeId);
+
+
+ /** Will return a list of items that need a delayed close when updating node set. */
+ default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); }
+
+ /** Shuts down all connections in the pool, and the underlying RPC client. */
+ @Override
+ void close();
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index 154002c4f77..b6228994ac8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory {
super(cluster, dispatchConfig);
this.rpcResourcePool = rpcResourcePool;
this.compressor = new CompressService();
- decodeType = convert(dispatchConfig.summaryDecodePolicy());
+ this.decodeType = convert(dispatchConfig.summaryDecodePolicy());
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 993cab11cb5..d1f22514481 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -11,7 +11,6 @@ import com.yahoo.vespa.config.search.DispatchNodesConfig.Node;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -22,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
+public class RpcResourcePool implements RpcConnectionPool {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of();
@@ -45,7 +44,7 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
});
}
- /** Will return a list of items that need a delayed close. */
+ @Override
public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) {
Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools);
Map<Integer, NodeConnectionPool> nextPools = new HashMap<>();