summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/cluster
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-09-18 07:04:43 +0200
committerGitHub <noreply@github.com>2019-09-18 07:04:43 +0200
commitccabf1c7454994aa2534e8c82abc31fdacd326aa (patch)
tree463ab5145edf4a26243cc4a92d04bfb82a3c1580 /container-search/src/main/java/com/yahoo/prelude/cluster
parentbd6c6f09f32005ddb63b3f452ad1ac94709681a1 (diff)
Revert "Revert "Revert "Balder/no more fs4 dispatching from fastsearcher"""
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/cluster')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java161
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java122
2 files changed, 277 insertions, 6 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
new file mode 100644
index 00000000000..c075a0f842b
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
@@ -0,0 +1,161 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.cluster;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.yahoo.component.provider.Freezable;
+import com.yahoo.container.handler.VipStatus;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.search.result.ErrorMessage;
+
+/**
+ * Monitors of a cluster of remote nodes. The monitor uses an internal thread
+ * for node monitoring.
+ *
+ * @author bratseth
+ * @author Steinar Knutsen
+ */
+public class ClusterMonitor implements Runnable, Freezable {
+
+ // The ping thread wil start using the system, but we cannot be guaranteed that all components
+ // in the system is up. As a workaround for not being able to find out when the system
+ // is ready to be used, we wait some time before starting the ping thread
+ private static final int pingThreadInitialDelayMs = 3000;
+
+ private final MonitorConfiguration configuration;
+
+ private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName());
+
+ private final ClusterSearcher nodeManager;
+
+ private final Optional<VipStatus> vipStatus;
+
+ /** A map from Node to corresponding MonitoredNode */
+ private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>();
+
+ private ScheduledFuture<?> future;
+
+ private boolean isFrozen = false;
+
+ ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional<VipStatus> vipStatus) {
+ configuration = new MonitorConfiguration(monitorConfig);
+ nodeManager = manager;
+ this.vipStatus = vipStatus;
+ log.fine("checkInterval is " + configuration.getCheckInterval() + " ms");
+ }
+
+ /** Returns the configuration of this cluster monitor */
+ MonitorConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ void startPingThread() {
+ if ( ! isFrozen())
+ throw new IllegalStateException("Do not start the monitoring thread before the set of " +
+ "nodes to monitor is complete/the ClusterMonitor is frozen.");
+ future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Adds a new node for monitoring.
+ */
+ void add(VespaBackEndSearcher node) {
+ if (isFrozen())
+ throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen.");
+ nodeMonitors.put(node, new NodeMonitor(node));
+ updateVipStatus();
+ }
+
+ /** Called from ClusterSearcher/NodeManager when a node failed */
+ void failed(VespaBackEndSearcher node, ErrorMessage error) {
+ NodeMonitor monitor = nodeMonitors.get(node);
+ boolean wasWorking = monitor.isWorking();
+ monitor.failed(error);
+ if (wasWorking && !monitor.isWorking()) {
+ log.info("Failed monitoring node '" + node + "' due to '" + error);
+ nodeManager.failed(node);
+ }
+ updateVipStatus();
+ }
+
+ /** Called when a node responded */
+ void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) {
+ NodeMonitor monitor = nodeMonitors.get(node);
+ boolean wasFailing = !monitor.isWorking();
+ monitor.responded(hasSearchNodesOnline);
+ if (wasFailing && monitor.isWorking()) {
+ log.info("Failed node '" + node + "' started working again.");
+ nodeManager.working(node);
+ }
+ updateVipStatus();
+ }
+
+ private void updateVipStatus() {
+ if ( ! vipStatus.isPresent()) return;
+ if ( ! hasInformationAboutAllNodes()) return;
+
+ if (hasWorkingNodesWithDocumentsOnline()) {
+ vipStatus.get().addToRotation(nodeManager.getId().stringValue());
+ } else {
+ vipStatus.get().removeFromRotation(nodeManager.getId().stringValue());
+ }
+ }
+
+ private boolean hasInformationAboutAllNodes() {
+ for (NodeMonitor monitor : nodeMonitors.values()) {
+ if ( ! monitor.statusIsKnown())
+ return false;
+ }
+ return true;
+ }
+
+ private boolean hasWorkingNodesWithDocumentsOnline() {
+ for (NodeMonitor node : nodeMonitors.values()) {
+ if (node.isWorking() && node.searchNodesOnline())
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Ping all nodes which needs pinging to discover state changes
+ */
+ private void ping() throws InterruptedException {
+ for (NodeMonitor monitor : nodeMonitors.values()) {
+ nodeManager.ping(monitor.getNode());
+ }
+ }
+
+ @Override
+ public void run() {
+ log.finest("Activating ping");
+ try {
+ ping();
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Error in monitor thread", e);
+ }
+ }
+
+ public void shutdown() {
+ if (future != null) {
+ future.cancel(true);
+ }
+ }
+
+ @Override
+ public void freeze() {
+ isFrozen = true;
+
+ }
+
+ @Override
+ public boolean isFrozen() {
+ return isFrozen;
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
index 0780d5e9d65..4ffcc0a4330 100644
--- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java
@@ -2,13 +2,20 @@
package com.yahoo.prelude.cluster;
import com.yahoo.cloud.config.ClusterInfoConfig;
+import com.yahoo.collections.Tuple2;
import com.yahoo.component.ComponentId;
+import com.yahoo.component.chain.Chain;
import com.yahoo.component.chain.dependencies.After;
+import com.yahoo.concurrent.Receiver;
+import com.yahoo.concurrent.Receiver.MessageState;
import com.yahoo.container.QrSearchersConfig;
import com.yahoo.container.handler.VipStatus;
+import com.yahoo.fs4.mplex.Backend;
import com.yahoo.jdisc.Metric;
import com.yahoo.net.HostName;
import com.yahoo.prelude.IndexFacts;
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.Pong;
import com.yahoo.prelude.fastsearch.ClusterParams;
import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
@@ -39,7 +46,11 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING;
@@ -53,6 +64,10 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S
@After("*")
public class ClusterSearcher extends Searcher {
+ private final static Logger log = Logger.getLogger(ClusterSearcher.class.getName());
+
+ private final ClusterMonitor monitor;
+
private final Value cacheHitRatio;
private final String clusterModelName;
@@ -63,6 +78,8 @@ public class ClusterSearcher extends Searcher {
// Mapping from rank profile names to document types containing them
private final Map<String, Set<String>> rankProfiles = new HashMap<>();
+ private final FS4ResourcePool fs4ResourcePool;
+
private final long maxQueryTimeout; // in milliseconds
private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L;
@@ -71,6 +88,7 @@ public class ClusterSearcher extends Searcher {
private VespaBackEndSearcher server = null;
+
/**
* Creates a new ClusterSearcher.
*/
@@ -78,6 +96,7 @@ public class ClusterSearcher extends Searcher {
QrSearchersConfig qrsConfig,
ClusterConfig clusterConfig,
DocumentdbInfoConfig documentDbConfig,
+ QrMonitorConfig monitorConfig,
DispatchConfig dispatchConfig,
ClusterInfoConfig clusterInfoConfig,
Statistics manager,
@@ -85,8 +104,13 @@ public class ClusterSearcher extends Searcher {
FS4ResourcePool fs4ResourcePool,
VipStatus vipStatus) {
super(id);
+ this.fs4ResourcePool = fs4ResourcePool;
+
+ Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, fs4ResourcePool, clusterInfoConfig.nodeCount(), vipStatus, metric);
- Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus, metric);
+ monitor = (dispatcher.searchCluster().directDispatchTarget().isPresent()) // dispatcher should decide vip status instead
+ ? new ClusterMonitor(this, monitorConfig, Optional.empty())
+ : new ClusterMonitor(this, monitorConfig, Optional.of(vipStatus));
int searchClusterIndex = clusterConfig.clusterId();
clusterModelName = clusterConfig.clusterName();
@@ -124,8 +148,9 @@ public class ClusterSearcher extends Searcher {
for (int dispatcherIndex = 0; dispatcherIndex < searchClusterConfig.dispatcher().size(); dispatcherIndex++) {
try {
if ( ! isRemote(searchClusterConfig.dispatcher(dispatcherIndex).host())) {
- FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool.getServerId(), docSumParams,
- documentDbConfig, dispatcher, dispatcherIndex);
+ Backend dispatchBackend = createBackend(searchClusterConfig.dispatcher(dispatcherIndex));
+ FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool, docSumParams,
+ documentDbConfig, dispatchBackend, dispatcher, dispatcherIndex);
addBackendSearcher(searcher);
}
} catch (UnknownHostException e) {
@@ -137,6 +162,8 @@ public class ClusterSearcher extends Searcher {
if ( server == null ) {
throw new IllegalStateException("ClusterSearcher should have a top level dispatch.");
}
+ monitor.freeze();
+ monitor.startPingThread();
}
private static QrSearchersConfig.Searchcluster getSearchClusterConfigFromClusterName(QrSearchersConfig config, String name) {
@@ -162,14 +189,15 @@ public class ClusterSearcher extends Searcher {
}
private static FastSearcher searchDispatch(int searchclusterIndex,
- String serverId,
+ FS4ResourcePool fs4ResourcePool,
SummaryParameters docSumParams,
DocumentdbInfoConfig documentdbInfoConfig,
+ Backend backend,
Dispatcher dispatcher,
int dispatcherIndex)
{
ClusterParams clusterParams = makeClusterParams(searchclusterIndex, dispatcherIndex);
- return new FastSearcher(serverId, dispatcher, docSumParams, clusterParams, documentdbInfoConfig);
+ return new FastSearcher(backend, fs4ResourcePool, dispatcher, docSumParams, clusterParams, documentdbInfoConfig);
}
private static VdsStreamingSearcher vdsCluster(String serverId,
@@ -194,14 +222,25 @@ public class ClusterSearcher extends Searcher {
/** Do not use, for internal testing purposes only. **/
ClusterSearcher(Set<String> documentTypes) {
this.documentTypes = documentTypes;
+ monitor = new ClusterMonitor(this, new QrMonitorConfig(new QrMonitorConfig.Builder()), Optional.of(new VipStatus()));
cacheHitRatio = new Value("com.yahoo.prelude.cluster.ClusterSearcher.ClusterSearcher().dummy",
Statistics.nullImplementation, new Value.Parameters());
clusterModelName = "testScenario";
+ fs4ResourcePool = null;
maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT;
maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT;
}
+ private Backend createBackend(QrSearchersConfig.Searchcluster.Dispatcher disp) {
+ return fs4ResourcePool.getBackend(disp.host(), disp.port());
+ }
+
+ ClusterMonitor getMonitor() {
+ return monitor;
+ }
+
void addBackendSearcher(VespaBackEndSearcher searcher) {
+ monitor.add(searcher);
server = searcher;
}
@@ -440,6 +479,77 @@ public class ClusterSearcher extends Searcher {
cacheHitRatio.put(0.0);
}
+ /** NodeManager method, called from ClusterMonitor. */
+ void working(VespaBackEndSearcher node) {
+ server = node;
+ }
+
+ /** Called from ClusterMonitor. */
+ void failed(VespaBackEndSearcher node) {
+ server = null;
+ }
+
+ /**
+ * Pinging a node, called from ClusterMonitor.
+ */
+ void ping(VespaBackEndSearcher node) throws InterruptedException {
+ log.fine("Sending ping to: " + node);
+ Pinger pinger = new Pinger(node);
+
+ getExecutor().execute(pinger);
+ Pong pong = pinger.getPong(); // handles timeout
+ if (pong == null) {
+ monitor.failed(node, ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out."));
+ } else if (pong.badResponse()) {
+ monitor.failed(node, pong.getError(0));
+ } else {
+ monitor.responded(node, backendCanServeDocuments(pong));
+ }
+ }
+
+ private boolean backendCanServeDocuments(Pong pong) {
+ if ( ! pong.activeNodes().isPresent()) return true; // no information; assume true
+ return pong.activeNodes().get() > 0;
+ }
+
@Override
- public void deconstruct() { }
+ public void deconstruct() {
+ monitor.shutdown();
+ }
+
+ ExecutorService getExecutor() {
+ return fs4ResourcePool.getExecutor();
+ }
+
+ ScheduledExecutorService getScheduledExecutor() {
+ return fs4ResourcePool.getScheduledExecutor();
+ }
+
+ private class Pinger implements Runnable {
+
+ private final Searcher searcher;
+ private final Ping pingChallenge = new Ping(monitor.getConfiguration().getRequestTimeout());
+ private final Receiver<Pong> pong = new Receiver<>();
+
+ Pinger(final Searcher searcher) {
+ this.searcher = searcher;
+ }
+
+ @Override
+ public void run() {
+ pong.put(createExecution().ping(pingChallenge));
+ }
+
+ private Execution createExecution() {
+ return new Execution(new Chain<>(searcher),
+ new Execution.Context(null, null, null, null, null));
+ }
+
+ public Pong getPong() throws InterruptedException {
+ Tuple2<MessageState, Pong> reply = pong.get(pingChallenge.getTimeout() + 150);
+ return (reply.first != MessageState.VALID) ? null : reply.second;
+ }
+
+ }
+
}