summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-04 09:43:18 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-04 09:43:42 +0000
commit63e8ab9d534488434ca3e25169ad5d22af46aded (patch)
treee1a966951d38d10adf3023b002c3d64f0d91dec3 /container-search
parent14e5202f2c270d1230596d7aceb8c96efada641c (diff)
Decouple so ClusterMonitor is on the outside of the searchcluster and can be provided.
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java10
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java21
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java30
-rw-r--r--container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java3
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java12
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java18
8 files changed, 55 insertions, 57 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index d871a256a62..74a007d35d3 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -110,7 +110,7 @@ public class ClusterMonitor<T> {
public void ping(Executor executor) {
for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) {
BaseNodeMonitor<T> monitor= i.next();
- nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded
+ nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded
}
if (closed.get()) return; // Do nothing to change state if close has started.
nodeManager.pingIterationCompleted();
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
index 20f56c86f7b..2d05168731a 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
@@ -58,7 +58,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
this(id, connections, hasher, internal, true);
}
- public ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal, boolean startPingThread) {
+ protected ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal, boolean startPingThread) {
super(id);
this.hasher = hasher;
this.monitor = new ClusterMonitor<>(this, startPingThread);
@@ -70,7 +70,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
/** Pinging a node, called from ClusterMonitor */
@Override
- public final void ping(T p, Executor executor) {
+ public final void ping(ClusterMonitor<T> clusterMonitor, T p, Executor executor) {
log(LogLevel.FINE, "Sending ping to: ", p);
Pinger pinger = new Pinger(p);
FutureTask<Pong> future = new FutureTask<>(pinger);
@@ -80,7 +80,7 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
Throwable logThrowable = null;
try {
- pong = future.get(monitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
+ pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
pong = new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + p));
logThrowable = e;
@@ -96,10 +96,10 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod
future.cancel(true);
if (pong.badResponse()) {
- monitor.failed(p, pong.error().get());
+ clusterMonitor.failed(p, pong.error().get());
log(LogLevel.FINE, "Failed ping - ", pong);
} else {
- monitor.responded(p);
+ clusterMonitor.responded(p);
log(LogLevel.FINE, "Answered ping - ", p);
}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
index 9b20139e3c5..ffd26e3088d 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
@@ -19,9 +19,21 @@ public interface NodeManager<T> {
/**
* Called when a node should be pinged.
- * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded
+ * This *must* lead to either a call to NodeMonitor.failed or NodeMonitor.responded
+ * @deprecated Use ping(ClusterMonitor<T> clusterMonitor, T node, Executor executor) instead.
*/
- void ping(T node, Executor executor);
+ @Deprecated
+ default void ping(T node, Executor executor) {
+ throw new IllegalStateException("If you have not overrriden ping(ClusterMonitor<T> clusterMonitor, T node, Executor executor), you should at least have overriden this method.");
+ }
+
+ /**
+ * Called when a node should be pinged.
+ * This *must* lead to either a call to ClusterMonitor.failed or ClusterMonitor.responded
+ */
+ default void ping(ClusterMonitor<T> clusterMonitor, T node, Executor executor) {
+ ping(node, executor);
+ }
/** Called right after a ping has been issued to each node. This default implementation does nothing. */
default void pingIterationCompleted() {}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 1f31d807024..91bd5c6da11 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -11,6 +11,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
@@ -59,6 +60,7 @@ public class Dispatcher extends AbstractComponent {
/** A model of the search cluster this dispatches to */
private final SearchCluster searchCluster;
+ private final ClusterMonitor clusterMonitor;
private final LoadBalancer loadBalancer;
@@ -97,20 +99,19 @@ public class Dispatcher extends AbstractComponent {
ClusterInfoConfig clusterInfoConfig,
VipStatus vipStatus,
Metric metric) {
- this(resourcePool, new SearchCluster(clusterId.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus, new RpcPingFactory(resourcePool)),
+ this(resourcePool, new SearchCluster(clusterId.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(),
+ vipStatus, new RpcPingFactory(resourcePool)),
dispatchConfig, metric);
}
private Dispatcher(RpcResourcePool resourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig, Metric metric) {
- this(searchCluster,
- dispatchConfig,
- new RpcInvokerFactory(resourcePool, searchCluster),
- metric);
+ this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster), metric);
}
/* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
- protected Dispatcher(SearchCluster searchCluster,
+ protected Dispatcher(ClusterMonitor clusterMonitor,
+ SearchCluster searchCluster,
DispatchConfig dispatchConfig,
InvokerFactory invokerFactory,
Metric metric) {
@@ -118,14 +119,14 @@ public class Dispatcher extends AbstractComponent {
throw new IllegalArgumentException(searchCluster + " is configured with multilevel dispatch, but this is not supported");
this.searchCluster = searchCluster;
+ this.clusterMonitor = clusterMonitor;
this.loadBalancer = new LoadBalancer(searchCluster,
dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN);
this.invokerFactory = invokerFactory;
this.metric = metric;
this.metricContext = metric.createContext(null);
this.maxHitsPerNode = dispatchConfig.maxHitsPerNode();
-
- searchCluster.startClusterMonitoring(true);
+ searchCluster.addMonitoring(clusterMonitor);
try {
while ( ! searchCluster.hasInformationAboutAllNodes()) {
Thread.sleep(1);
@@ -140,8 +141,8 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
- /* The seach cluster must be shutdown first as it uses the invokerfactory. */
- searchCluster.shutDown();
+ /* The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. */
+ clusterMonitor.shutdown();
invokerFactory.release();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 54da33ac6dc..d462479226a 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -36,7 +36,6 @@ public class SearchCluster implements NodeManager<Node> {
private final ImmutableMap<Integer, Group> groups;
private final ImmutableMultimap<String, Node> nodesByHost;
private final ImmutableList<Group> orderedGroups;
- private final ClusterMonitor<Node> clusterMonitor;
private final VipStatus vipStatus;
private final PingFactory pingFactory;
private long nextLogTime = 0;
@@ -51,7 +50,8 @@ public class SearchCluster implements NodeManager<Node> {
*/
private final Optional<Node> localCorpusDispatchTarget;
- public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus, PingFactory pingFactory) {
+ public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize,
+ VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
this.dispatchConfig = dispatchConfig;
this.vipStatus = vipStatus;
@@ -78,30 +78,18 @@ public class SearchCluster implements NodeManager<Node> {
this.nodesByHost = nodesByHostBuilder.build();
this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(),
- size,
- containerClusterSize,
- nodesByHost,
- groups);
-
- this.clusterMonitor = new ClusterMonitor<>(this, false);
+ size,
+ containerClusterSize,
+ nodesByHost,
+ groups);
+ }
+ public void addMonitoring(ClusterMonitor clusterMonitor) {
for (var group : orderedGroups) {
for (var node : group.nodes())
clusterMonitor.add(node, true);
}
}
- public void shutDown() {
- clusterMonitor.shutdown();
- }
-
- public void startClusterMonitoring(boolean startPingThread) {
- if (startPingThread) {
- clusterMonitor.start();
- }
- }
-
- ClusterMonitor<Node> clusterMonitor() { return clusterMonitor; }
-
private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
int searchClusterSize,
int containerClusterSize,
@@ -280,7 +268,7 @@ public class SearchCluster implements NodeManager<Node> {
/** Used by the cluster monitor to manage node status */
@Override
- public void ping(Node node, Executor executor) {
+ public void ping(ClusterMonitor clusterMonitor, Node node, Executor executor) {
if (pingFactory == null) return; // not initialized yet
Pinger pinger = pingFactory.createPinger(node, clusterMonitor, new PongCallback(node, clusterMonitor));
diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
index 4b65e26c6c2..d5e43fba92d 100644
--- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
+++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
@@ -2,6 +2,7 @@
package com.yahoo.prelude.fastsearch.test;
import com.yahoo.container.handler.VipStatus;
+import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
@@ -32,7 +33,7 @@ class MockDispatcher extends Dispatcher {
}
private MockDispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcInvokerFactory invokerFactory) {
- super(searchCluster, dispatchConfig, invokerFactory, new MockMetric());
+ super(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, invokerFactory, new MockMetric());
}
static DispatchConfig toDispatchConfig(List<Node> nodes) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index dad73b9b8a9..5433a28dd6e 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -38,9 +38,10 @@ public class DispatcherTest {
assertEquals(2, nodes.get(0).key());
return true;
});
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
SearchInvoker invoker = disp.getSearchInvoker(q, null);
invokerFactory.verifyAllEventsProcessed();
+ disp.deconstruct();
}
@Test
@@ -52,9 +53,10 @@ public class DispatcherTest {
}
};
MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> true);
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
SearchInvoker invoker = disp.getSearchInvoker(new Query(), null);
invokerFactory.verifyAllEventsProcessed();
+ disp.deconstruct();
}
@Test
@@ -68,9 +70,10 @@ public class DispatcherTest {
assertTrue(acceptIncompleteCoverage);
return true;
});
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
SearchInvoker invoker = disp.getSearchInvoker(new Query(), null);
invokerFactory.verifyAllEventsProcessed();
+ disp.deconstruct();
}
@Test
@@ -79,8 +82,9 @@ public class DispatcherTest {
SearchCluster cl = new MockSearchCluster("1", 2, 1);
MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> false, (n, a) -> false);
- Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockMetric());
+ Dispatcher disp = new Dispatcher(new ClusterMonitor(cl, false), cl, createDispatchConfig(), invokerFactory, new MockMetric());
disp.getSearchInvoker(new Query(), null);
+ disp.deconstruct();
fail("Expected exception");
}
catch (IllegalStateException e) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
index 370b4def2c0..766f9ea6c2d 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
@@ -35,6 +35,7 @@ public class SearchClusterTest {
final int nodesPerGroup;
final VipStatus vipStatus;
final SearchCluster searchCluster;
+ final ClusterMonitor clusterMonitor;
final List<AtomicInteger> numDocsPerNode;
List<AtomicInteger> pingCounts;
@@ -60,10 +61,8 @@ public class SearchClusterTest {
}
searchCluster = new SearchCluster(clusterId, MockSearchCluster.createDispatchConfig(nodes), nodes.size() / nodesPerGroup,
vipStatus, new Factory(nodesPerGroup, numDocsPerNode, pingCounts));
- }
-
- void startMonitoring() {
- searchCluster.startClusterMonitoring(false);
+ clusterMonitor = new ClusterMonitor(searchCluster, false);
+ searchCluster.addMonitoring(clusterMonitor);
}
private int maxPingCount() {
@@ -91,7 +90,7 @@ public class SearchClusterTest {
int atLeast = maxPingCount() + 1;
while (minPingCount < atLeast) {
ExecutorService executor = Executors.newCachedThreadPool();
- searchCluster.clusterMonitor().ping(executor);
+ clusterMonitor.ping(executor);
executor.shutdown();
try {
boolean completed = executor.awaitTermination(120, TimeUnit.SECONDS);
@@ -108,7 +107,7 @@ public class SearchClusterTest {
@Override
public void close() {
- searchCluster.shutDown();
+ clusterMonitor.shutdown();
}
static class Factory implements PingFactory {
@@ -158,7 +157,6 @@ public class SearchClusterTest {
assertTrue(test.searchCluster.localCorpusDispatchTarget().isEmpty());
assertFalse(test.vipStatus.isInRotation());
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
}
@@ -167,7 +165,6 @@ public class SearchClusterTest {
@Test
public void requireThatZeroDocsAreFine() {
try (State test = new State("cluster.1", 2, "a", "b")) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
@@ -189,7 +186,6 @@ public class SearchClusterTest {
assertTrue(test.searchCluster.localCorpusDispatchTarget().isPresent());
assertFalse(test.vipStatus.isInRotation());
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
}
@@ -201,7 +197,6 @@ public class SearchClusterTest {
assertTrue(test.searchCluster.localCorpusDispatchTarget().isPresent());
assertFalse(test.vipStatus.isInRotation());
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
test.numDocsPerNode.get(0).set(-1);
@@ -214,7 +209,6 @@ public class SearchClusterTest {
public void requireThatVipStatusDownWhenLocalIsDown() {
try (State test = new State("cluster.1",1,HostName.getLocalhost(), "b")) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
assertTrue(test.searchCluster.localCorpusDispatchTarget().isPresent());
@@ -250,7 +244,6 @@ public class SearchClusterTest {
List<String> nodeNames = generateNodeNames(numGroups, nodesPerGroup);
try (State test = new State("cluster.1", nodesPerGroup, nodeNames)) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
assertTrue(test.searchCluster.localCorpusDispatchTarget().isEmpty());
@@ -289,7 +282,6 @@ public class SearchClusterTest {
List<String> nodeNames = generateNodeNames(numGroups, nodesPerGroup);
try (State test = new State("cluster.1", nodesPerGroup, nodeNames)) {
- test.startMonitoring();
test.waitOneFullPingRound();
assertTrue(test.vipStatus.isInRotation());
assertTrue(test.searchCluster.localCorpusDispatchTarget().isEmpty());