diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 52 |
1 files changed, 35 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 9b0c79dc2bc..3b686da35f6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -12,6 +12,7 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; +import com.yahoo.search.dispatch.rpc.RpcConnectionPool; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -55,8 +56,9 @@ public class Dispatcher extends AbstractComponent { /** If set will control computation of how many hits will be fetched from each partition.*/ public static final CompoundName topKProbability = CompoundName.from(DISPATCH + "." + TOP_K_PROBABILITY); + private final InvokerFactoryFactory invokerFactories; private final DispatchConfig dispatchConfig; - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; private volatile VolatileItems volatileItems; @@ -105,24 +107,42 @@ public class Dispatcher extends AbstractComponent { public static QueryProfileType getArgumentType() { return argumentType; } + private interface InvokerFactoryFactory { + InvokerFactory create(RpcConnectionPool rpcConnectionPool, SearchGroups searchGroups, DispatchConfig dispatchConfig); + } + @Inject - public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, VipStatus vipStatus) { - this.dispatchConfig = dispatchConfig; - rpcResourcePool = new RpcResourcePool(dispatchConfig, nodesConfig); - searchCluster = new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), - toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcResourcePool)); - volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); + public Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus) { + this(clusterId, dispatchConfig, new RpcResourcePool(dispatchConfig, nodesConfig), nodesConfig, vipStatus, RpcInvokerFactory::new); initialWarmup(dispatchConfig.warmuptime()); } + Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, + new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), + toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), + invokerFactories); + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { + this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, true), invokerFactories); + } + + Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, + SearchCluster searchCluster, ClusterMonitor<Node> clusterMonitor, InvokerFactoryFactory invokerFactories) { + this.dispatchConfig = dispatchConfig; + this.rpcResourcePool = rpcConnectionPool; + this.searchCluster = searchCluster; + this.invokerFactories = invokerFactories; + this.volatileItems = update(clusterMonitor); + } + /* For simple mocking in tests. Beware that searchCluster is shutdown in deconstruct() */ Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { - this.dispatchConfig = dispatchConfig; - this.rpcResourcePool = null; - this.searchCluster = searchCluster; - this.volatileItems = update(invokerFactory, clusterMonitor); + this(dispatchConfig, null, searchCluster, clusterMonitor, (__, ___, ____) -> invokerFactory); } /** Returns the snapshot of volatile items that need to be kept together, incrementing its reference counter. */ @@ -173,18 +193,16 @@ public class Dispatcher extends AbstractComponent { searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. - this.volatileItems = update(null, new ClusterMonitor<>(searchCluster, true)); + this.volatileItems = update(new ClusterMonitor<>(searchCluster, true)); // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. items.get().clusterMonitor.shutdown(); } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(InvokerFactory invokerFactory, ClusterMonitor clusterMonitor) { + private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - (invokerFactory == null) - ? new RpcInvokerFactory(rpcResourcePool, searchCluster.groupList(), dispatchConfig) - : invokerFactory, + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), clusterMonitor); searchCluster.addMonitoring(clusterMonitor); return items; |