diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-22 12:44:40 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-23 10:01:17 +0100 |
commit | b8165e0e316527dc956489bc416f9ccb83cf1904 (patch) | |
tree | f2242e63ca98c5293768b8a5858d7f605c95d27d /container-search/src/main | |
parent | 8ccd836e176f4d1bea05ee835428977c50463e0e (diff) |
Only have the DispatchNodesConfig inject into one component.
Let RpcResourcePool/RpcClient be owned by the dispatcher.
Step 2 in preparing for smooth handling of content cluster changes.
Diffstat (limited to 'container-search/src/main')
3 files changed, 53 insertions, 30 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index e4147f6ba14..36c5c8a16fa 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -12,6 +12,7 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.rpc.RpcClient; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -57,6 +58,8 @@ public class Dispatcher extends AbstractComponent { private final ClusterMonitor<Node> clusterMonitor; private final LoadBalancer loadBalancer; private final InvokerFactory invokerFactory; + private final RpcResourcePool rpcResourcePool; + private final RpcClient rpcClient; private final int maxHitsPerNode; private static final QueryProfileType argumentType; @@ -72,28 +75,34 @@ public class Dispatcher extends AbstractComponent { public static QueryProfileType getArgumentType() { return argumentType; } @Inject - public Dispatcher(RpcResourcePool resourcePool, - ComponentId clusterId, + public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) { - this(resourcePool, new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig, - vipStatus, new RpcPingFactory(resourcePool)), - dispatchConfig); + this(clusterId, dispatchConfig, nodesConfig, vipStatus, + new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads())); + } + private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, + DispatchNodesConfig nodesConfig, VipStatus vipStatus, RpcClient rpcClient) { + this(clusterId, dispatchConfig, nodesConfig, vipStatus, rpcClient, + new RpcResourcePool(rpcClient, nodesConfig, dispatchConfig.numJrtConnectionsPerNode())); } + private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, + DispatchNodesConfig nodesConfig, VipStatus vipStatus, + RpcClient rpcClient, RpcResourcePool resourcePool) { + this(new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig, + vipStatus, new RpcPingFactory(resourcePool)), + dispatchConfig, rpcClient, resourcePool); - private Dispatcher(RpcResourcePool resourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) { - this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster)); } - private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) { - return switch (policy) { - case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN; - case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2; - case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS; - case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME; - }; + private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, + RpcClient rpcClient, RpcResourcePool rpcResourcePool) { + this(new ClusterMonitor<>(searchCluster, true), + searchCluster, dispatchConfig, + new RpcInvokerFactory(rpcResourcePool, searchCluster), + rpcClient, rpcResourcePool); } /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ @@ -101,11 +110,22 @@ public class Dispatcher extends AbstractComponent { SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { + this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null, null); + } + + private Dispatcher(ClusterMonitor<Node> clusterMonitor, + SearchCluster searchCluster, + DispatchConfig dispatchConfig, + InvokerFactory invokerFactory, + RpcClient rpcClient, + RpcResourcePool rpcResourcePool) { this.searchCluster = searchCluster; this.clusterMonitor = clusterMonitor; this.loadBalancer = new LoadBalancer(searchCluster, toLoadBalancerPolicy(dispatchConfig.distributionPolicy())); this.invokerFactory = invokerFactory; + this.rpcClient = rpcClient; + this.rpcResourcePool = rpcResourcePool; this.maxHitsPerNode = dispatchConfig.maxHitsPerNode(); searchCluster.addMonitoring(clusterMonitor); Thread warmup = new Thread(() -> warmup(dispatchConfig.warmuptime())); @@ -124,6 +144,15 @@ public class Dispatcher extends AbstractComponent { searchCluster.pingIterationCompleted(); } + private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) { + return switch (policy) { + case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN; + case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2; + case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS; + case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME; + }; + } + /** * Will run important code in order to trigger JIT compilation and avoid cold start issues. * Currently warms up lz4 compression code. @@ -142,6 +171,12 @@ public class Dispatcher extends AbstractComponent { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. clusterMonitor.shutdown(); invokerFactory.release(); + if (rpcResourcePool != null) { + rpcResourcePool.close(); + } + if (rpcClient != null) { + rpcClient.close(); + } } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 073d10b8f49..762438aa489 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -18,7 +18,7 @@ import com.yahoo.jrt.Values; * * @author bratseth */ -class RpcClient implements Client { +public class RpcClient implements Client { private final Supervisor supervisor; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 7ecdb24c211..bbe60d0df23 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -2,8 +2,6 @@ package com.yahoo.search.dispatch.rpc; import com.google.common.collect.ImmutableMap; -import com.yahoo.component.annotation.Inject; -import com.yahoo.component.AbstractComponent; import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.compress.Compressor.Compression; @@ -11,7 +9,6 @@ import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; -import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.util.ArrayList; @@ -26,7 +23,7 @@ import java.util.concurrent.ThreadLocalRandom; * * @author ollivir */ -public class RpcResourcePool extends AbstractComponent { +public class RpcResourcePool implements AutoCloseable { /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); @@ -35,23 +32,18 @@ public class RpcResourcePool extends AbstractComponent { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools; - private final RpcClient client; RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) { var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); this.nodeConnectionPools = builder.build(); - client = null; } - @Inject - public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { + public RpcResourcePool(RpcClient client, DispatchNodesConfig nodesConfig, int numConnections) { super(); - client = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); // Create rpc node connection pools indexed by the node distribution key var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); - var numConnections = dispatchConfig.numJrtConnectionsPerNode(); for (var node : nodesConfig.node()) { var connections = new ArrayList<NodeConnection>(numConnections); for (int i = 0; i < numConnections; i++) { @@ -81,12 +73,8 @@ public class RpcResourcePool extends AbstractComponent { } @Override - public void deconstruct() { - super.deconstruct(); + public void close() { nodeConnectionPools.values().forEach(NodeConnectionPool::release); - if (client != null) { - client.close(); - } } private static class NodeConnectionPool { |