aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-04 17:28:33 +0100
committerGitHub <noreply@github.com>2020-02-04 17:28:33 +0100
commit396a4dbfca5a6b81bd4c3608a0372afb9c084354 (patch)
tree00d8259806e5f89962378d17d01d949edbc9907f
parent7be078bc6c5a8c6624f615fffdb613cf2309387c (diff)
parent7e63c344b701067faabd0da99315399cd529eee7 (diff)
Merge pull request #12059 from vespa-engine/balder/explicit-control-of-monitor-thread-start
Do not start cluster monitor thread in test as it will race with expl…
-rw-r--r--container-search/abi-spec.json8
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java10
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java42
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java18
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java30
-rw-r--r--container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java6
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java15
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java8
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java2
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java23
13 files changed, 110 insertions, 86 deletions
diff --git a/container-search/abi-spec.json b/container-search/abi-spec.json
index b5fbe235c43..437db99c45b 100644
--- a/container-search/abi-spec.json
+++ b/container-search/abi-spec.json
@@ -1954,6 +1954,7 @@
"methods": [
"public void <init>(com.yahoo.search.cluster.NodeManager)",
"public void <init>(com.yahoo.search.cluster.NodeManager, boolean)",
+ "public void start()",
"public com.yahoo.search.cluster.MonitorConfiguration getConfiguration()",
"public void add(java.lang.Object, boolean)",
"public com.yahoo.search.cluster.BaseNodeMonitor getNodeMonitor(java.lang.Object)",
@@ -1978,8 +1979,8 @@
"methods": [
"public void <init>(com.yahoo.component.ComponentId, java.util.List, boolean)",
"public void <init>(com.yahoo.component.ComponentId, java.util.List, com.yahoo.search.cluster.Hasher, boolean)",
- "public void <init>(com.yahoo.component.ComponentId, java.util.List, com.yahoo.search.cluster.Hasher, boolean, boolean)",
- "public final void ping(java.lang.Object, java.util.concurrent.Executor)",
+ "protected void <init>(com.yahoo.component.ComponentId, java.util.List, com.yahoo.search.cluster.Hasher, boolean, boolean)",
+ "public final void ping(com.yahoo.search.cluster.ClusterMonitor, java.lang.Object, java.util.concurrent.Executor)",
"protected abstract com.yahoo.prelude.Pong ping(com.yahoo.prelude.Ping, java.lang.Object)",
"protected java.lang.Object getFirstConnection(com.yahoo.search.cluster.Hasher$NodeList, int, int, com.yahoo.search.Query)",
"public final com.yahoo.search.Result search(com.yahoo.search.Query, com.yahoo.search.searchchain.Execution)",
@@ -2074,7 +2075,8 @@
"methods": [
"public abstract void working(java.lang.Object)",
"public abstract void failed(java.lang.Object)",
- "public abstract void ping(java.lang.Object, java.util.concurrent.Executor)",
+ "public void ping(java.lang.Object, java.util.concurrent.Executor)",
+ "public void ping(com.yahoo.search.cluster.ClusterMonitor, java.lang.Object, java.util.concurrent.Executor)",
"public void pingIterationCompleted()"
],
"fields": []
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index d4b6279be89..55f0816514d 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -38,6 +38,9 @@ public class ClusterMonitor<T> {
/** A map from Node to corresponding MonitoredNode */
private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
+ /** @deprecated It is not advised to start the monitoring thread in the constructor.
+ * Use ClusterMonitor(NodeManager manager, false) and explicit start(). */
+ @Deprecated
public ClusterMonitor(NodeManager<T> manager) {
this(manager, true);
}
@@ -50,6 +53,12 @@ public class ClusterMonitor<T> {
}
}
+ public void start() {
+ if ( ! monitorThread.isAlive()) {
+ monitorThread.start();
+ }
+ }
+
/** Returns the configuration of this cluster monitor */
public MonitorConfiguration getConfiguration() { return configuration; }
@@ -101,7 +110,7 @@ public class ClusterMonitor<T> {
public void ping(Executor executor) {
for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) {
BaseNodeMonitor<T> monitor= i.next();
- nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded
+ nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded
}
if (closed.get()) return; // Do nothing to change state if close has started.
nodeManager.pingIterationCompleted();
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
index 20f56c86f7b..2d05168731a 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
@@ -58,7 +58,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
this(id, connections, hasher, internal, true);
}
- public ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal, boolean startPingThread) {
+ protected ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal, boolean startPingThread) {
super(id);
this.hasher = hasher;
this.monitor = new ClusterMonitor<>(this, startPingThread);
@@ -70,7 +70,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
/** Pinging a node, called from ClusterMonitor */
@Override
- public final void ping(T p, Executor executor) {
+ public final void ping(ClusterMonitor<T> clusterMonitor, T p, Executor executor) {
log(LogLevel.FINE, "Sending ping to: ", p);
Pinger pinger = new Pinger(p);
FutureTask<Pong> future = new FutureTask<>(pinger);
@@ -80,7 +80,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
Throwable logThrowable = null;
try {
- pong = future.get(monitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
+ pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
pong = new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + p));
logThrowable = e;
@@ -96,10 +96,10 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
future.cancel(true);
if (pong.badResponse()) {
- monitor.failed(p, pong.error().get());
+ clusterMonitor.failed(p, pong.error().get());
log(LogLevel.FINE, "Failed ping - ", pong);
} else {
- monitor.responded(p);
+ clusterMonitor.responded(p);
log(LogLevel.FINE, "Answered ping - ", p);
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
index 9b20139e3c5..481f1e1b5a5 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
@@ -19,9 +19,21 @@ public interface NodeManager<T> {
/**
* Called when a node should be pinged.
- * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded
+ * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded
+ * @deprecated Use ping(ClusterMonitor clusterMonitor, T node, Executor executor) instead.
*/
- void ping(T node, Executor executor);
+ @Deprecated
+ default void ping(T node, Executor executor) {
+ throw new IllegalStateException("If you have not overrriden ping(ClusterMonitor<T> clusterMonitor, T node, Executor executor), you should at least have overriden this method.");
+ }
+
+ /**
+ * Called when a node should be pinged.
+ * This *must* lead to either a call to ClusterMonitor.failed or ClusterMonitor.responded
+ */
+ default void ping(ClusterMonitor<T> clusterMonitor, T node, Executor executor) {
+ ping(node, executor);
+ }
/** Called right after a ping has been issued to each node. This default implementation does nothing. */
default void pingIterationCompleted() {}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 3fb0059ecb9..91bd5c6da11 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -11,8 +11,10 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
+import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
@@ -58,6 +60,7 @@ public class Dispatcher extends AbstractComponent {
/** A model of the search cluster this dispatches to */
private final SearchCluster searchCluster;
+ private final ClusterMonitor clusterMonitor;
private final LoadBalancer loadBalancer;
@@ -87,44 +90,43 @@ public class Dispatcher extends AbstractComponent {
ClusterInfoConfig clusterInfoConfig,
VipStatus vipStatus,
Metric metric) {
- this(new SearchCluster(clusterId.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus),
- dispatchConfig,
- metric);
+ this(new RpcResourcePool(dispatchConfig), clusterId, dispatchConfig, clusterInfoConfig, vipStatus, metric);
}
- private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, Metric metric) {
- this(searchCluster,
- dispatchConfig,
- new RpcInvokerFactory(new RpcResourcePool(dispatchConfig), searchCluster),
- metric);
+ private Dispatcher(RpcResourcePool resourcePool,
+ ComponentId clusterId,
+ DispatchConfig dispatchConfig,
+ ClusterInfoConfig clusterInfoConfig,
+ VipStatus vipStatus,
+ Metric metric) {
+ this(resourcePool, new SearchCluster(clusterId.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(),
+ vipStatus, new RpcPingFactory(resourcePool)),
+ dispatchConfig, metric);
+
}
- /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
- protected Dispatcher(SearchCluster searchCluster,
- DispatchConfig dispatchConfig,
- RpcInvokerFactory rcpInvokerFactory,
- Metric metric) {
- this(searchCluster, dispatchConfig, rcpInvokerFactory, rcpInvokerFactory, metric);
+ private Dispatcher(RpcResourcePool resourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig, Metric metric) {
+ this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster), metric);
}
/* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
- protected Dispatcher(SearchCluster searchCluster,
+ protected Dispatcher(ClusterMonitor clusterMonitor,
+ SearchCluster searchCluster,
DispatchConfig dispatchConfig,
InvokerFactory invokerFactory,
- PingFactory pingFactory,
Metric metric) {
if (dispatchConfig.useMultilevelDispatch())
throw new IllegalArgumentException(searchCluster + " is configured with multilevel dispatch, but this is not supported");
this.searchCluster = searchCluster;
+ this.clusterMonitor = clusterMonitor;
this.loadBalancer = new LoadBalancer(searchCluster,
dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN);
this.invokerFactory = invokerFactory;
this.metric = metric;
this.metricContext = metric.createContext(null);
this.maxHitsPerNode = dispatchConfig.maxHitsPerNode();
-
- searchCluster.startClusterMonitoring(pingFactory);
+ searchCluster.addMonitoring(clusterMonitor);
try {
while ( ! searchCluster.hasInformationAboutAllNodes()) {
Thread.sleep(1);
@@ -139,8 +141,8 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
- /* The seach cluster must be shutdown first as it uses the invokerfactory. */
- searchCluster.shutDown();
+ /* The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. */
+ clusterMonitor.shutdown();
invokerFactory.release();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index 8333080cdf0..a45ec59c3ee 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Callable;
/**
* @author ollivir
*/
-public class RpcInvokerFactory extends InvokerFactory implements PingFactory {
+public class RpcInvokerFactory extends InvokerFactory {
/** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */
private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries");
@@ -65,9 +65,4 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory {
public void release() {
rpcResourcePool.release();
}
-
- @Override
- public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) {
- return new RpcPing(node, monitor, rpcResourcePool, pongHandler);
- }
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
new file mode 100644
index 00000000000..ac8f0a59c20
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
@@ -0,0 +1,18 @@
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.search.cluster.ClusterMonitor;
+import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.dispatch.searchcluster.PingFactory;
+import com.yahoo.search.dispatch.searchcluster.Pinger;
+import com.yahoo.search.dispatch.searchcluster.PongHandler;
+
+public class RpcPingFactory implements PingFactory {
+ private final RpcResourcePool rpcResourcePool;
+ public RpcPingFactory(RpcResourcePool rpcResourcePool) {
+ this.rpcResourcePool = rpcResourcePool;
+ }
+ @Override
+ public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) {
+ return new RpcPing(node, monitor, rpcResourcePool, pongHandler);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index cbe24eb6907..d462479226a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -36,9 +36,8 @@ public class SearchCluster implements NodeManager<Node> {
private final ImmutableMap<Integer, Group> groups;
private final ImmutableMultimap<String, Node> nodesByHost;
private final ImmutableList<Group> orderedGroups;
- private final ClusterMonitor<Node> clusterMonitor;
private final VipStatus vipStatus;
- private PingFactory pingFactory;
+ private final PingFactory pingFactory;
private long nextLogTime = 0;
/**
@@ -51,10 +50,12 @@ public class SearchCluster implements NodeManager<Node> {
*/
private final Optional<Node> localCorpusDispatchTarget;
- public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) {
+ public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize,
+ VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
this.dispatchConfig = dispatchConfig;
this.vipStatus = vipStatus;
+ this.pingFactory = pingFactory;
List<Node> nodes = toNodes(dispatchConfig);
this.size = nodes.size();
@@ -77,29 +78,18 @@ public class SearchCluster implements NodeManager<Node> {
this.nodesByHost = nodesByHostBuilder.build();
this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(),
- size,
- containerClusterSize,
- nodesByHost,
- groups);
-
- this.clusterMonitor = new ClusterMonitor<>(this);
- }
-
- public void shutDown() {
- clusterMonitor.shutdown();
+ size,
+ containerClusterSize,
+ nodesByHost,
+ groups);
}
-
- public void startClusterMonitoring(PingFactory pingFactory) {
- this.pingFactory = pingFactory;
-
+ public void addMonitoring(ClusterMonitor clusterMonitor) {
for (var group : orderedGroups) {
for (var node : group.nodes())
clusterMonitor.add(node, true);
}
}
- ClusterMonitor<Node> clusterMonitor() { return clusterMonitor; }
-
private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
int searchClusterSize,
int containerClusterSize,
@@ -278,7 +268,7 @@ public class SearchCluster implements NodeManager<Node> {
/** Used by the cluster monitor to manage node status */
@Override
- public void ping(Node node, Executor executor) {
+ public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) {
if (pingFactory == null) return; // not initialized yet
Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor));
diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
index 440e3b8d78f..d5e43fba92d 100644
--- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
+++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
@@ -2,8 +2,10 @@
package com.yahoo.prelude.fastsearch.test;
import com.yahoo.container.handler.VipStatus;
+import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
+import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
@@ -22,7 +24,7 @@ class MockDispatcher extends Dispatcher {
public static MockDispatcher create(List<Node> nodes, RpcResourcePool rpcResourcePool,
int containerClusterSize, VipStatus vipStatus) {
var dispatchConfig = toDispatchConfig(nodes);
- var searchCluster = new SearchCluster("a", dispatchConfig, containerClusterSize, vipStatus);
+ var searchCluster = new SearchCluster("a", dispatchConfig, containerClusterSize, vipStatus, new RpcPingFactory(rpcResourcePool));
return new MockDispatcher(searchCluster, dispatchConfig, rpcResourcePool);
}
@@ -31,7 +33,7 @@ class MockDispatcher extends Dispatcher {
}
private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcInvokerFactory invokerFactory) {
- super(searchCluster, dispatchConfig, invokerFactory, new MockMetric());
+ super(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, invokerFactory, new MockMetric());
}
static DispatchConfig toDispatchConfig(List<Node> nodes) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index a1ae3b6a19d..5433a28dd6e 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -1,10 +1,8 @@
// Copyright 2019 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.prelude.fastsearch.test.MockMetric;
-import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
@@ -18,7 +16,6 @@ import org.junit.Test;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
-import java.util.concurrent.Callable;
import static com.yahoo.search.dispatch.MockSearchCluster.createDispatchConfig;
import static org.junit.Assert.assertEquals;
@@ -41,9 +38,10 @@ public class DispatcherTest {
assertEquals(2, nodes.get(0).key());
return true;
});
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
SearchInvoker invoker = disp.getSearchInvoker(q, null);
invokerFactory.verifyAllEventsProcessed();
+ disp.deconstruct();
}
@Test
@@ -55,9 +53,10 @@ public class DispatcherTest {
}
};
MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> true);
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
SearchInvoker invoker = disp.getSearchInvoker(new Query(), null);
invokerFactory.verifyAllEventsProcessed();
+ disp.deconstruct();
}
@Test
@@ -71,9 +70,10 @@ public class DispatcherTest {
assertTrue(acceptIncompleteCoverage);
return true;
});
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
SearchInvoker invoker = disp.getSearchInvoker(new Query(), null);
invokerFactory.verifyAllEventsProcessed();
+ disp.deconstruct();
}
@Test
@@ -82,8 +82,9 @@ public class DispatcherTest {
SearchCluster cl = new MockSearchCluster("1", 2, 1);
MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> false, (n, a) -> false);
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
disp.getSearchInvoker(new Query(), null);
+ disp.deconstruct();
fail("Expected exception");
}
catch (IllegalStateException e) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
index 0496194f8ed..6eedb8239a9 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
@@ -29,7 +29,7 @@ public class LoadBalancerTest {
@Test
public void requireThatLoadBalancerServesSingleNodeSetups() {
Node n1 = new Node(0, "test-node1", 0);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), 1, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), 1, null, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
Optional<Group> grp = lb.takeGroup(null);
@@ -43,7 +43,7 @@ public class LoadBalancerTest {
public void requireThatLoadBalancerServesMultiGroupSetups() {
Node n1 = new Node(0, "test-node1", 0);
Node n2 = new Node(1, "test-node2", 1);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), 1, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), 1, null, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
Optional<Group> grp = lb.takeGroup(null);
@@ -59,7 +59,7 @@ public class LoadBalancerTest {
Node n2 = new Node(1, "test-node2", 0);
Node n3 = new Node(0, "test-node3", 1);
Node n4 = new Node(1, "test-node4", 1);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2, n3, n4), 2, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2, n3, n4), 2, null, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
Optional<Group> grp = lb.takeGroup(null);
@@ -70,7 +70,7 @@ public class LoadBalancerTest {
public void requireThatLoadBalancerReturnsDifferentGroups() {
Node n1 = new Node(0, "test-node1", 0);
Node n2 = new Node(1, "test-node2", 1);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), 1, null);
+ SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), 1, null,null);
LoadBalancer lb = new LoadBalancer(cluster, true);
// get first group
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
index a976b287f63..3b4d58cdfc2 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
@@ -30,7 +30,7 @@ public class MockSearchCluster extends SearchCluster {
}
public MockSearchCluster(String clusterId, DispatchConfig dispatchConfig, int groups, int nodesPerGroup) {
- super(clusterId, dispatchConfig, 1, null);
+ super(clusterId, dispatchConfig, 1, null, null);
ImmutableList.Builder<Group> orderedGroupBuilder = ImmutableList.builder();
ImmutableMap.Builder<Integer, Group> groupBuilder = ImmutableMap.builder();
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
index aaef06363a0..766f9ea6c2d 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
@@ -35,6 +35,7 @@ public class SearchClusterTest {
final int nodesPerGroup;
final VipStatus vipStatus;
final SearchCluster searchCluster;
+ final ClusterMonitor clusterMonitor;
final List<AtomicInteger> numDocsPerNode;
List<AtomicInteger> pingCounts;
@@ -58,11 +59,10 @@ public class SearchClusterTest {
numDocsPerNode.add(new AtomicInteger(1));
pingCounts.add(new AtomicInteger(0));
}
- searchCluster = new SearchCluster(clusterId, MockSearchCluster.createDispatchConfig(nodes), nodes.size() / nodesPerGroup, vipStatus);
- }
-
- void startMonitoring() {
- searchCluster.startClusterMonitoring(new Factory(nodesPerGroup, numDocsPerNode, pingCounts));
+ searchCluster = new SearchCluster(clusterId, MockSearchCluster.createDispatchConfig(nodes), nodes.size() / nodesPerGroup,
+ vipStatus, new Factory(nodesPerGroup, numDocsPerNode, pingCounts));
+ clusterMonitor = new ClusterMonitor(searchCluster, false);
+ searchCluster.addMonitoring(clusterMonitor);
}
private int maxPingCount() {
@@ -87,10 +87,10 @@ public class SearchClusterTest {
void waitOneFullPingRound() {
int minPingCount = minPingCount();
- int atLeast = maxPingCount() + 2;
+ int atLeast = maxPingCount() + 1;
while (minPingCount < atLeast) {
ExecutorService executor = Executors.newCachedThreadPool();
- searchCluster.clusterMonitor().ping(executor);
+ clusterMonitor.ping(executor);
executor.shutdown();
try {
boolean completed = executor.awaitTermination(120, TimeUnit.SECONDS);
@@ -107,7 +107,7 @@ public class SearchClusterTest {
@Override
public void close() {
- searchCluster.shutDown();
+ clusterMonitor.shutdown();
}
static class Factory implements PingFactory {
@@ -157,7 +157,6 @@ public class SearchClusterTest {
assertTrue(test.searchCluster.localCorpusDispatchTarget().isEmpty());
assertFalse(test.vipStatus.isInRotation());
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
}
@@ -166,7 +165,6 @@ public class SearchClusterTest {
@Test
public void requireThatZeroDocsAreFine() {
try (State test = new State("cluster.1", 2, "a", "b")) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
@@ -188,7 +186,6 @@ public class SearchClusterTest {
assertTrue(test.searchCluster.localCorpusDispatchTarget().isPresent());
assertFalse(test.vipStatus.isInRotation());
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
}
@@ -200,7 +197,6 @@ public class SearchClusterTest {
assertTrue(test.searchCluster.localCorpusDispatchTarget().isPresent());
assertFalse(test.vipStatus.isInRotation());
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
test.numDocsPerNode.get(0).set(-1);
@@ -213,7 +209,6 @@ public class SearchClusterTest {
public void requireThatVipStatusDownWhenLocalIsDown() {
try (State test = new State("cluster.1",1,HostName.getLocalhost(), "b")) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
assertTrue(test.searchCluster.localCorpusDispatchTarget().isPresent());
@@ -249,7 +244,6 @@ public class SearchClusterTest {
List<String> nodeNames = generateNodeNames(numGroups, nodesPerGroup);
try (State test = new State("cluster.1", nodesPerGroup, nodeNames)) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
assertTrue(test.searchCluster.localCorpusDispatchTarget().isEmpty());
@@ -288,7 +282,6 @@ public class SearchClusterTest {
List<String> nodeNames = generateNodeNames(numGroups, nodesPerGroup);
try (State test = new State("cluster.1", nodesPerGroup, nodeNames)) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
assertTrue(test.searchCluster.localCorpusDispatchTarget().isEmpty());