aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java52
1 files changed, 35 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 9b0c79dc2bc..3b686da35f6 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -12,6 +12,7 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
+import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
@@ -55,8 +56,9 @@ public class Dispatcher extends AbstractComponent {
/** If set will control computation of how many hits will be fetched from each partition.*/
public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY);
+ private final InvokerFactoryFactory invokerFactories;
private final DispatchConfig dispatchConfig;
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
private final SearchCluster searchCluster;
private volatile VolatileItems volatileItems;
@@ -105,24 +107,42 @@ public class Dispatcher extends AbstractComponent {
public static QueryProfileType getArgumentType() { return argumentType; }
+ private interface InvokerFactoryFactory {
+ InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig);
+ }
+
@Inject
- public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
- DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
- this.dispatchConfig = dispatchConfig;
- rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig);
- searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
- toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool));
- volatileItems = update(null, new ClusterMonitor<>(searchCluster, true));
+ public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) {
+ this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new);
initialWarmup(dispatchConfig.warmuptime());
}
+ Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool,
+ new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
+ toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
+ invokerFactories);
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
+ this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, true), invokerFactories);
+ }
+
+ Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
+ SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) {
+ this.dispatchConfig = dispatchConfig;
+ this.rpcResourcePool = rpcConnectionPool;
+ this.searchCluster = searchCluster;
+ this.invokerFactories = invokerFactories;
+ this.volatileItems = update(clusterMonitor);
+ }
+
/* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */
Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster,
DispatchConfig dispatchConfig, InvokerFactory invokerFactory) {
- this.dispatchConfig = dispatchConfig;
- this.rpcResourcePool = null;
- this.searchCluster = searchCluster;
- this.volatileItems = update(invokerFactory, clusterMonitor);
+ this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory);
}
/** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */
@@ -173,18 +193,16 @@ public class Dispatcher extends AbstractComponent {
searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
// Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
- this.volatileItems = update(null, new ClusterMonitor<>(searchCluster, true));
+ this.volatileItems = update(new ClusterMonitor<>(searchCluster, true));
// Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
items.get().clusterMonitor.shutdown();
} // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
}
- private VolatileItems update(InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) {
+ private VolatileItems update(ClusterMonitor<Node> clusterMonitor) {
var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
- (invokerFactory == null)
- ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig)
- : invokerFactory,
+ invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
clusterMonitor);
searchCluster.addMonitoring(clusterMonitor);
return items;