aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-22 12:44:40 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-23 10:01:17 +0100
commitb8165e0e316527dc956489bc416f9ccb83cf1904 (patch)
treef2242e63ca98c5293768b8a5858d7f605c95d27d /container-search/src/main
parent8ccd836e176f4d1bea05ee835428977c50463e0e (diff)
Only have the DispatchNodesConfig inject into one component.
Let RpcResourcePool/RpcClient be owned by the dispatcher. Step 2 in preparing for smooth handling of content cluster changes.
Diffstat (limited to 'container-search/src/main')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java63
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java18
3 files changed, 53 insertions, 30 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index e4147f6ba14..36c5c8a16fa 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -12,6 +12,7 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
+import com.yahoo.search.dispatch.rpc.RpcClient;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
@@ -57,6 +58,8 @@ public class Dispatcher extends AbstractComponent {
private final ClusterMonitor<Node> clusterMonitor;
private final LoadBalancer loadBalancer;
private final InvokerFactory invokerFactory;
+ private final RpcResourcePool rpcResourcePool;
+ private final RpcClient rpcClient;
private final int maxHitsPerNode;
private static final QueryProfileType argumentType;
@@ -72,28 +75,34 @@ public class Dispatcher extends AbstractComponent {
public static QueryProfileType getArgumentType() { return argumentType; }
@Inject
- public Dispatcher(RpcResourcePool resourcePool,
- ComponentId clusterId,
+ public Dispatcher(ComponentId clusterId,
DispatchConfig dispatchConfig,
DispatchNodesConfig nodesConfig,
VipStatus vipStatus) {
- this(resourcePool, new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig,
- vipStatus, new RpcPingFactory(resourcePool)),
- dispatchConfig);
+ this(clusterId, dispatchConfig, nodesConfig, vipStatus,
+ new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()));
+ }
+ private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
+ DispatchNodesConfig nodesConfig, VipStatus vipStatus, RpcClient rpcClient) {
+ this(clusterId, dispatchConfig, nodesConfig, vipStatus, rpcClient,
+ new RpcResourcePool(rpcClient, nodesConfig, dispatchConfig.numJrtConnectionsPerNode()));
}
+ private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
+ DispatchNodesConfig nodesConfig, VipStatus vipStatus,
+ RpcClient rpcClient, RpcResourcePool resourcePool) {
+ this(new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig,
+ vipStatus, new RpcPingFactory(resourcePool)),
+ dispatchConfig, rpcClient, resourcePool);
- private Dispatcher(RpcResourcePool resourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) {
- this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster));
}
- private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) {
- return switch (policy) {
- case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN;
- case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2;
- case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS;
- case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
- };
+ private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig,
+ RpcClient rpcClient, RpcResourcePool rpcResourcePool) {
+ this(new ClusterMonitor<>(searchCluster, true),
+ searchCluster, dispatchConfig,
+ new RpcInvokerFactory(rpcResourcePool, searchCluster),
+ rpcClient, rpcResourcePool);
}
/* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
@@ -101,11 +110,22 @@ public class Dispatcher extends AbstractComponent {
SearchCluster searchCluster,
DispatchConfig dispatchConfig,
InvokerFactory invokerFactory) {
+ this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null, null);
+ }
+
+ private Dispatcher(ClusterMonitor<Node> clusterMonitor,
+ SearchCluster searchCluster,
+ DispatchConfig dispatchConfig,
+ InvokerFactory invokerFactory,
+ RpcClient rpcClient,
+ RpcResourcePool rpcResourcePool) {
this.searchCluster = searchCluster;
this.clusterMonitor = clusterMonitor;
this.loadBalancer = new LoadBalancer(searchCluster, toLoadBalancerPolicy(dispatchConfig.distributionPolicy()));
this.invokerFactory = invokerFactory;
+ this.rpcClient = rpcClient;
+ this.rpcResourcePool = rpcResourcePool;
this.maxHitsPerNode = dispatchConfig.maxHitsPerNode();
searchCluster.addMonitoring(clusterMonitor);
Thread warmup = new Thread(() -> warmup(dispatchConfig.warmuptime()));
@@ -124,6 +144,15 @@ public class Dispatcher extends AbstractComponent {
searchCluster.pingIterationCompleted();
}
+ private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) {
+ return switch (policy) {
+ case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN;
+ case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2;
+ case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS;
+ case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
+ };
+ }
+
/**
* Will run important code in order to trigger JIT compilation and avoid cold start issues.
* Currently warms up lz4 compression code.
@@ -142,6 +171,12 @@ public class Dispatcher extends AbstractComponent {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
clusterMonitor.shutdown();
invokerFactory.release();
+ if (rpcResourcePool != null) {
+ rpcResourcePool.close();
+ }
+ if (rpcClient != null) {
+ rpcClient.close();
+ }
}
public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 073d10b8f49..762438aa489 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -18,7 +18,7 @@ import com.yahoo.jrt.Values;
*
* @author bratseth
*/
-class RpcClient implements Client {
+public class RpcClient implements Client {
private final Supervisor supervisor;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 7ecdb24c211..bbe60d0df23 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -2,8 +2,6 @@
package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
-import com.yahoo.component.annotation.Inject;
-import com.yahoo.component.AbstractComponent;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.compress.Compressor.Compression;
@@ -11,7 +9,6 @@ import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
-import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
import java.util.ArrayList;
@@ -26,7 +23,7 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool extends AbstractComponent {
+public class RpcResourcePool implements AutoCloseable {
/** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
@@ -35,23 +32,18 @@ public class RpcResourcePool extends AbstractComponent {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
- private final RpcClient client;
RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) {
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
this.nodeConnectionPools = builder.build();
- client = null;
}
- @Inject
- public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
+ public RpcResourcePool(RpcClient client, DispatchNodesConfig nodesConfig, int numConnections) {
super();
- client = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
// Create rpc node connection pools indexed by the node distribution key
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
- var numConnections = dispatchConfig.numJrtConnectionsPerNode();
for (var node : nodesConfig.node()) {
var connections = new ArrayList<NodeConnection>(numConnections);
for (int i = 0; i < numConnections; i++) {
@@ -81,12 +73,8 @@ public class RpcResourcePool extends AbstractComponent {
}
@Override
- public void deconstruct() {
- super.deconstruct();
+ public void close() {
nodeConnectionPools.values().forEach(NodeConnectionPool::release);
- if (client != null) {
- client.close();
- }
}
private static class NodeConnectionPool {