diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo')
7 files changed, 80 insertions, 54 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index d4b6279be89..55f0816514d 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -38,6 +38,9 @@ public class ClusterMonitor<T> { /** A map from Node to corresponding MonitoredNode */ private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); + /** @deprecated It is not advised to start the monitoring thread in the constructor. + * Use ClusterMonitor(NodeManager manager, false) and explicit start(). */ + @Deprecated public ClusterMonitor(NodeManager<T> manager) { this(manager, true); } @@ -50,6 +53,12 @@ public class ClusterMonitor<T> { } } + public void start() { + if ( ! monitorThread.isAlive()) { + monitorThread.start(); + } + } + /** Returns the configuration of this cluster monitor */ public MonitorConfiguration getConfiguration() { return configuration; } @@ -101,7 +110,7 @@ public class ClusterMonitor<T> { public void ping(Executor executor) { for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) { BaseNodeMonitor<T> monitor= i.next(); - nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded + nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded } if (closed.get()) return; // Do nothing to change state if close has started. nodeManager.pingIterationCompleted(); diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java index 20f56c86f7b..2d05168731a 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -58,7 +58,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod this(id, connections, hasher, internal, true); } - public ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal, boolean startPingThread) { + protected ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal, boolean startPingThread) { super(id); this.hasher = hasher; this.monitor = new ClusterMonitor<>(this, startPingThread); @@ -70,7 +70,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod /** Pinging a node, called from ClusterMonitor */ @Override - public final void ping(T p, Executor executor) { + public final void ping(ClusterMonitor<T> clusterMonitor, T p, Executor executor) { log(LogLevel.FINE, "Sending ping to: ", p); Pinger pinger = new Pinger(p); FutureTask<Pong> future = new FutureTask<>(pinger); @@ -80,7 +80,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod Throwable logThrowable = null; try { - pong = future.get(monitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); + pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { pong = new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + p)); logThrowable = e; @@ -96,10 +96,10 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod future.cancel(true); if (pong.badResponse()) { - monitor.failed(p, pong.error().get()); + clusterMonitor.failed(p, pong.error().get()); log(LogLevel.FINE, "Failed ping - ", pong); } else { - monitor.responded(p); + clusterMonitor.responded(p); log(LogLevel.FINE, "Answered ping - ", p); } diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java index 9b20139e3c5..481f1e1b5a5 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java @@ -19,9 +19,21 @@ public interface NodeManager<T> { /** * Called when a node should be pinged. - * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded + * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded + * @deprecated Use ping(ClusterMonitor clusterMonitor, T node, Executor executor) instead. */ - void ping(T node, Executor executor); + @Deprecated + default void ping(T node, Executor executor) { + throw new IllegalStateException("If you have not overrriden ping(ClusterMonitor<T> clusterMonitor, T node, Executor executor), you should at least have overriden this method."); + } + + /** + * Called when a node should be pinged. + * This *must* lead to either a call to ClusterMonitor.failed or ClusterMonitor.responded + */ + default void ping(ClusterMonitor<T> clusterMonitor, T node, Executor executor) { + ping(node, executor); + } /** Called right after a ping has been issued to each node. This default implementation does nothing. */ default void pingIterationCompleted() {} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 3fb0059ecb9..91bd5c6da11 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -11,8 +11,10 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; +import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; @@ -58,6 +60,7 @@ public class Dispatcher extends AbstractComponent { /** A model of the search cluster this dispatches to */ private final SearchCluster searchCluster; + private final ClusterMonitor clusterMonitor; private final LoadBalancer loadBalancer; @@ -87,44 +90,43 @@ public class Dispatcher extends AbstractComponent { ClusterInfoConfig clusterInfoConfig, VipStatus vipStatus, Metric metric) { - this(new SearchCluster(clusterId.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus), - dispatchConfig, - metric); + this(new RpcResourcePool(dispatchConfig), clusterId, dispatchConfig, clusterInfoConfig, vipStatus, metric); } - private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, Metric metric) { - this(searchCluster, - dispatchConfig, - new RpcInvokerFactory(new RpcResourcePool(dispatchConfig), searchCluster), - metric); + private Dispatcher(RpcResourcePool resourcePool, + ComponentId clusterId, + DispatchConfig dispatchConfig, + ClusterInfoConfig clusterInfoConfig, + VipStatus vipStatus, + Metric metric) { + this(resourcePool, new SearchCluster(clusterId.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), + vipStatus, new RpcPingFactory(resourcePool)), + dispatchConfig, metric); + } - /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ - protected Dispatcher(SearchCluster searchCluster, - DispatchConfig dispatchConfig, - RpcInvokerFactory rcpInvokerFactory, - Metric metric) { - this(searchCluster, dispatchConfig, rcpInvokerFactory, rcpInvokerFactory, metric); + private Dispatcher(RpcResourcePool resourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig, Metric metric) { + this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster), metric); } /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ - protected Dispatcher(SearchCluster searchCluster, + protected Dispatcher(ClusterMonitor clusterMonitor, + SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory, - PingFactory pingFactory, Metric metric) { if (dispatchConfig.useMultilevelDispatch()) throw new IllegalArgumentException(searchCluster + " is configured with multilevel dispatch, but this is not supported"); this.searchCluster = searchCluster; + this.clusterMonitor = clusterMonitor; this.loadBalancer = new LoadBalancer(searchCluster, dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); this.invokerFactory = invokerFactory; this.metric = metric; this.metricContext = metric.createContext(null); this.maxHitsPerNode = dispatchConfig.maxHitsPerNode(); - - searchCluster.startClusterMonitoring(pingFactory); + searchCluster.addMonitoring(clusterMonitor); try { while ( ! searchCluster.hasInformationAboutAllNodes()) { Thread.sleep(1); @@ -139,8 +141,8 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { - /* The seach cluster must be shutdown first as it uses the invokerfactory. */ - searchCluster.shutDown(); + /* The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. */ + clusterMonitor.shutdown(); invokerFactory.release(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 8333080cdf0..a45ec59c3ee 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -24,7 +24,7 @@ import java.util.concurrent.Callable; /** * @author ollivir */ -public class RpcInvokerFactory extends InvokerFactory implements PingFactory { +public class RpcInvokerFactory extends InvokerFactory { /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); @@ -65,9 +65,4 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory { public void release() { rpcResourcePool.release(); } - - @Override - public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) { - return new RpcPing(node, monitor, rpcResourcePool, pongHandler); - } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java new file mode 100644 index 00000000000..ac8f0a59c20 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java @@ -0,0 +1,18 @@ +package com.yahoo.search.dispatch.rpc; + +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.PingFactory; +import com.yahoo.search.dispatch.searchcluster.Pinger; +import com.yahoo.search.dispatch.searchcluster.PongHandler; + +public class RpcPingFactory implements PingFactory { + private final RpcResourcePool rpcResourcePool; + public RpcPingFactory(RpcResourcePool rpcResourcePool) { + this.rpcResourcePool = rpcResourcePool; + } + @Override + public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) { + return new RpcPing(node, monitor, rpcResourcePool, pongHandler); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index cbe24eb6907..d462479226a 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -36,9 +36,8 @@ public class SearchCluster implements NodeManager<Node> { private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; private final ImmutableList<Group> orderedGroups; - private final ClusterMonitor<Node> clusterMonitor; private final VipStatus vipStatus; - private PingFactory pingFactory; + private final PingFactory pingFactory; private long nextLogTime = 0; /** @@ -51,10 +50,12 @@ public class SearchCluster implements NodeManager<Node> { */ private final Optional<Node> localCorpusDispatchTarget; - public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) { + public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, + VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; this.dispatchConfig = dispatchConfig; this.vipStatus = vipStatus; + this.pingFactory = pingFactory; List<Node> nodes = toNodes(dispatchConfig); this.size = nodes.size(); @@ -77,29 +78,18 @@ public class SearchCluster implements NodeManager<Node> { this.nodesByHost = nodesByHostBuilder.build(); this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), - size, - containerClusterSize, - nodesByHost, - groups); - - this.clusterMonitor = new ClusterMonitor<>(this); - } - - public void shutDown() { - clusterMonitor.shutdown(); + size, + containerClusterSize, + nodesByHost, + groups); } - - public void startClusterMonitoring(PingFactory pingFactory) { - this.pingFactory = pingFactory; - + public void addMonitoring(ClusterMonitor clusterMonitor) { for (var group : orderedGroups) { for (var node : group.nodes()) clusterMonitor.add(node, true); } } - ClusterMonitor<Node> clusterMonitor() { return clusterMonitor; } - private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname, int searchClusterSize, int containerClusterSize, @@ -278,7 +268,7 @@ public class SearchCluster implements NodeManager<Node> { /** Used by the cluster monitor to manage node status */ @Override - public void ping(Node node, Executor executor) { + public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) { if (pingFactory == null) return; // not initialized yet Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor)); |