diff options
author | jonmv <venstad@gmail.com> | 2023-07-13 12:00:37 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-07-13 12:00:37 +0200 |
commit | bcdf634ebaf2b16e2f64937e453d37067750d172 (patch) | |
tree | 0c5a450444c2327ccd6f7314f14282a113410c0d /container-search | |
parent | 578101eb6d25be28387f1ab0b6d575a1ed6df869 (diff) |
Set up constructor wiring for testing rpc connection pool update
Diffstat (limited to 'container-search')
4 files changed, 55 insertions, 22 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 9b0c79dc2bc..3b686da35f6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -12,6 +12,7 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.rpc.RpcConnectionPool; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -55,8 +56,9 @@ public class Dispatcher extends AbstractComponent { /** If set will control computation of how many hits will be fetched from each partition.*/ public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY); + private final InvokerFactoryFactory invokerFactories; private final DispatchConfig dispatchConfig; - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; private volatile VolatileItems volatileItems; @@ -105,24 +107,42 @@ public class Dispatcher extends AbstractComponent { public static QueryProfileType getArgumentType() { return argumentType; } + private interface InvokerFactoryFactory { + InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig); + } + @Inject - public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, VipStatus vipStatus) { - this.dispatchConfig = dispatchConfig; - rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); - searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), - toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool)); - volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); + public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) { + this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new); initialWarmup(dispatchConfig.warmuptime()); } + Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, + new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), + toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), + invokerFactories); + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, true), invokerFactories); + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) { + this.dispatchConfig = dispatchConfig; + this.rpcResourcePool = rpcConnectionPool; + this.searchCluster = searchCluster; + this.invokerFactories = invokerFactories; + this.volatileItems = update(clusterMonitor); + } + /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { - this.dispatchConfig = dispatchConfig; - this.rpcResourcePool = null; - this.searchCluster = searchCluster; - this.volatileItems = update(invokerFactory, clusterMonitor); + this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory); } /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ @@ -173,18 +193,16 @@ public class Dispatcher extends AbstractComponent { searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. - this.volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); + this.volatileItems = update(new ClusterMonitor<>(searchCluster, true)); // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. items.get().clusterMonitor.shutdown(); } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) { + private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - (invokerFactory == null) - ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) - : invokerFactory, + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), clusterMonitor); searchCluster.addMonitoring(clusterMonitor); return items; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java index fd8e0e4f81a..a93ddb0b360 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java @@ -1,11 +1,27 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; +import com.yahoo.vespa.config.search.DispatchNodesConfig; + +import java.util.Collection; +import java.util.List; + /** * Interface for getting a connection given a node id. * * @author balderersheim */ -public interface RpcConnectionPool { +public interface RpcConnectionPool extends AutoCloseable { + + /** Returns a connection to the given node id. */ Client.NodeConnection getConnection(int nodeId); + + + /** Will return a list of items that need a delayed close when updating node set. */ + default Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { return List.of(); } + + /** Shuts down all connections in the pool, and the underlying RPC client. */ + @Override + void close(); + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 154002c4f77..b6228994ac8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -33,7 +33,7 @@ public class RpcInvokerFactory extends InvokerFactory { super(cluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; this.compressor = new CompressService(); - decodeType = convert(dispatchConfig.summaryDecodePolicy()); + this.decodeType = convert(dispatchConfig.summaryDecodePolicy()); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 993cab11cb5..d1f22514481 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -11,7 +11,6 @@ import com.yahoo.vespa.config.search.DispatchNodesConfig.Node; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -22,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom; * * @author ollivir */ -public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { +public class RpcResourcePool implements RpcConnectionPool { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private volatile Map<Integer, NodeConnectionPool> nodeConnectionPools = Map.of(); @@ -45,7 +44,7 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { }); } - /** Will return a list of items that need a delayed close. */ + @Override public Collection<? extends AutoCloseable> updateNodes(DispatchNodesConfig nodesConfig) { Map<Integer, NodeConnectionPool> currentPools = new HashMap<>(nodeConnectionPools); Map<Integer, NodeConnectionPool> nextPools = new HashMap<>(); |