diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-09-18 07:04:43 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-18 07:04:43 +0200 |
commit | ccabf1c7454994aa2534e8c82abc31fdacd326aa (patch) | |
tree | 463ab5145edf4a26243cc4a92d04bfb82a3c1580 /container-search/src/main/java/com/yahoo/prelude/cluster | |
parent | bd6c6f09f32005ddb63b3f452ad1ac94709681a1 (diff) |
Revert "Revert "Revert "Balder/no more fs4 dispatching from fastsearcher"""
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/cluster')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java | 161 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java | 122 |
2 files changed, 277 insertions, 6 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java new file mode 100644 index 00000000000..c075a0f842b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java @@ -0,0 +1,161 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.cluster; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.yahoo.component.provider.Freezable; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.search.result.ErrorMessage; + +/** + * Monitors of a cluster of remote nodes. The monitor uses an internal thread + * for node monitoring. + * + * @author bratseth + * @author Steinar Knutsen + */ +public class ClusterMonitor implements Runnable, Freezable { + + // The ping thread wil start using the system, but we cannot be guaranteed that all components + // in the system is up. As a workaround for not being able to find out when the system + // is ready to be used, we wait some time before starting the ping thread + private static final int pingThreadInitialDelayMs = 3000; + + private final MonitorConfiguration configuration; + + private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName()); + + private final ClusterSearcher nodeManager; + + private final Optional<VipStatus> vipStatus; + + /** A map from Node to corresponding MonitoredNode */ + private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>(); + + private ScheduledFuture<?> future; + + private boolean isFrozen = false; + + ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional<VipStatus> vipStatus) { + configuration = new MonitorConfiguration(monitorConfig); + nodeManager = manager; + this.vipStatus = vipStatus; + log.fine("checkInterval is " + configuration.getCheckInterval() + " ms"); + } + + /** Returns the configuration of this cluster monitor */ + MonitorConfiguration getConfiguration() { + return configuration; + } + + void startPingThread() { + if ( ! isFrozen()) + throw new IllegalStateException("Do not start the monitoring thread before the set of " + + "nodes to monitor is complete/the ClusterMonitor is frozen."); + future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS); + } + + /** + * Adds a new node for monitoring. + */ + void add(VespaBackEndSearcher node) { + if (isFrozen()) + throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen."); + nodeMonitors.put(node, new NodeMonitor(node)); + updateVipStatus(); + } + + /** Called from ClusterSearcher/NodeManager when a node failed */ + void failed(VespaBackEndSearcher node, ErrorMessage error) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasWorking = monitor.isWorking(); + monitor.failed(error); + if (wasWorking && !monitor.isWorking()) { + log.info("Failed monitoring node '" + node + "' due to '" + error); + nodeManager.failed(node); + } + updateVipStatus(); + } + + /** Called when a node responded */ + void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasFailing = !monitor.isWorking(); + monitor.responded(hasSearchNodesOnline); + if (wasFailing && monitor.isWorking()) { + log.info("Failed node '" + node + "' started working again."); + nodeManager.working(node); + } + updateVipStatus(); + } + + private void updateVipStatus() { + if ( ! vipStatus.isPresent()) return; + if ( ! hasInformationAboutAllNodes()) return; + + if (hasWorkingNodesWithDocumentsOnline()) { + vipStatus.get().addToRotation(nodeManager.getId().stringValue()); + } else { + vipStatus.get().removeFromRotation(nodeManager.getId().stringValue()); + } + } + + private boolean hasInformationAboutAllNodes() { + for (NodeMonitor monitor : nodeMonitors.values()) { + if ( ! monitor.statusIsKnown()) + return false; + } + return true; + } + + private boolean hasWorkingNodesWithDocumentsOnline() { + for (NodeMonitor node : nodeMonitors.values()) { + if (node.isWorking() && node.searchNodesOnline()) + return true; + } + return false; + } + + /** + * Ping all nodes which needs pinging to discover state changes + */ + private void ping() throws InterruptedException { + for (NodeMonitor monitor : nodeMonitors.values()) { + nodeManager.ping(monitor.getNode()); + } + } + + @Override + public void run() { + log.finest("Activating ping"); + try { + ping(); + } catch (Exception e) { + log.log(Level.WARNING, "Error in monitor thread", e); + } + } + + public void shutdown() { + if (future != null) { + future.cancel(true); + } + } + + @Override + public void freeze() { + isFrozen = true; + + } + + @Override + public boolean isFrozen() { + return isFrozen; + } + +} diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java index 0780d5e9d65..4ffcc0a4330 100644 --- a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterSearcher.java @@ -2,13 +2,20 @@ package com.yahoo.prelude.cluster; import com.yahoo.cloud.config.ClusterInfoConfig; +import com.yahoo.collections.Tuple2; import com.yahoo.component.ComponentId; +import com.yahoo.component.chain.Chain; import com.yahoo.component.chain.dependencies.After; +import com.yahoo.concurrent.Receiver; +import com.yahoo.concurrent.Receiver.MessageState; import com.yahoo.container.QrSearchersConfig; import com.yahoo.container.handler.VipStatus; +import com.yahoo.fs4.mplex.Backend; import com.yahoo.jdisc.Metric; import com.yahoo.net.HostName; import com.yahoo.prelude.IndexFacts; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; import com.yahoo.prelude.fastsearch.ClusterParams; import com.yahoo.prelude.fastsearch.DocumentdbInfoConfig; import com.yahoo.prelude.fastsearch.FS4ResourcePool; @@ -39,7 +46,11 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Logger; import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.STREAMING; @@ -53,6 +64,10 @@ import static com.yahoo.container.QrSearchersConfig.Searchcluster.Indexingmode.S @After("*") public class ClusterSearcher extends Searcher { + private final static Logger log = Logger.getLogger(ClusterSearcher.class.getName()); + + private final ClusterMonitor monitor; + private final Value cacheHitRatio; private final String clusterModelName; @@ -63,6 +78,8 @@ public class ClusterSearcher extends Searcher { // Mapping from rank profile names to document types containing them private final Map<String, Set<String>> rankProfiles = new HashMap<>(); + private final FS4ResourcePool fs4ResourcePool; + private final long maxQueryTimeout; // in milliseconds private final static long DEFAULT_MAX_QUERY_TIMEOUT = 600000L; @@ -71,6 +88,7 @@ public class ClusterSearcher extends Searcher { private VespaBackEndSearcher server = null; + /** * Creates a new ClusterSearcher. */ @@ -78,6 +96,7 @@ public class ClusterSearcher extends Searcher { QrSearchersConfig qrsConfig, ClusterConfig clusterConfig, DocumentdbInfoConfig documentDbConfig, + QrMonitorConfig monitorConfig, DispatchConfig dispatchConfig, ClusterInfoConfig clusterInfoConfig, Statistics manager, @@ -85,8 +104,13 @@ public class ClusterSearcher extends Searcher { FS4ResourcePool fs4ResourcePool, VipStatus vipStatus) { super(id); + this.fs4ResourcePool = fs4ResourcePool; + + Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, fs4ResourcePool, clusterInfoConfig.nodeCount(), vipStatus, metric); - Dispatcher dispatcher = Dispatcher.create(id.stringValue(), dispatchConfig, clusterInfoConfig.nodeCount(), vipStatus, metric); + monitor = (dispatcher.searchCluster().directDispatchTarget().isPresent()) // dispatcher should decide vip status instead + ? new ClusterMonitor(this, monitorConfig, Optional.empty()) + : new ClusterMonitor(this, monitorConfig, Optional.of(vipStatus)); int searchClusterIndex = clusterConfig.clusterId(); clusterModelName = clusterConfig.clusterName(); @@ -124,8 +148,9 @@ public class ClusterSearcher extends Searcher { for (int dispatcherIndex = 0; dispatcherIndex < searchClusterConfig.dispatcher().size(); dispatcherIndex++) { try { if ( ! isRemote(searchClusterConfig.dispatcher(dispatcherIndex).host())) { - FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool.getServerId(), docSumParams, - documentDbConfig, dispatcher, dispatcherIndex); + Backend dispatchBackend = createBackend(searchClusterConfig.dispatcher(dispatcherIndex)); + FastSearcher searcher = searchDispatch(searchClusterIndex, fs4ResourcePool, docSumParams, + documentDbConfig, dispatchBackend, dispatcher, dispatcherIndex); addBackendSearcher(searcher); } } catch (UnknownHostException e) { @@ -137,6 +162,8 @@ public class ClusterSearcher extends Searcher { if ( server == null ) { throw new IllegalStateException("ClusterSearcher should have a top level dispatch."); } + monitor.freeze(); + monitor.startPingThread(); } private static QrSearchersConfig.Searchcluster getSearchClusterConfigFromClusterName(QrSearchersConfig config, String name) { @@ -162,14 +189,15 @@ public class ClusterSearcher extends Searcher { } private static FastSearcher searchDispatch(int searchclusterIndex, - String serverId, + FS4ResourcePool fs4ResourcePool, SummaryParameters docSumParams, DocumentdbInfoConfig documentdbInfoConfig, + Backend backend, Dispatcher dispatcher, int dispatcherIndex) { ClusterParams clusterParams = makeClusterParams(searchclusterIndex, dispatcherIndex); - return new FastSearcher(serverId, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); + return new FastSearcher(backend, fs4ResourcePool, dispatcher, docSumParams, clusterParams, documentdbInfoConfig); } private static VdsStreamingSearcher vdsCluster(String serverId, @@ -194,14 +222,25 @@ public class ClusterSearcher extends Searcher { /** Do not use, for internal testing purposes only. **/ ClusterSearcher(Set<String> documentTypes) { this.documentTypes = documentTypes; + monitor = new ClusterMonitor(this, new QrMonitorConfig(new QrMonitorConfig.Builder()), Optional.of(new VipStatus())); cacheHitRatio = new Value("com.yahoo.prelude.cluster.ClusterSearcher.ClusterSearcher().dummy", Statistics.nullImplementation, new Value.Parameters()); clusterModelName = "testScenario"; + fs4ResourcePool = null; maxQueryTimeout = DEFAULT_MAX_QUERY_TIMEOUT; maxQueryCacheTimeout = DEFAULT_MAX_QUERY_CACHE_TIMEOUT; } + private Backend createBackend(QrSearchersConfig.Searchcluster.Dispatcher disp) { + return fs4ResourcePool.getBackend(disp.host(), disp.port()); + } + + ClusterMonitor getMonitor() { + return monitor; + } + void addBackendSearcher(VespaBackEndSearcher searcher) { + monitor.add(searcher); server = searcher; } @@ -440,6 +479,77 @@ public class ClusterSearcher extends Searcher { cacheHitRatio.put(0.0); } + /** NodeManager method, called from ClusterMonitor. */ + void working(VespaBackEndSearcher node) { + server = node; + } + + /** Called from ClusterMonitor. */ + void failed(VespaBackEndSearcher node) { + server = null; + } + + /** + * Pinging a node, called from ClusterMonitor. + */ + void ping(VespaBackEndSearcher node) throws InterruptedException { + log.fine("Sending ping to: " + node); + Pinger pinger = new Pinger(node); + + getExecutor().execute(pinger); + Pong pong = pinger.getPong(); // handles timeout + if (pong == null) { + monitor.failed(node, ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out.")); + } else if (pong.badResponse()) { + monitor.failed(node, pong.getError(0)); + } else { + monitor.responded(node, backendCanServeDocuments(pong)); + } + } + + private boolean backendCanServeDocuments(Pong pong) { + if ( ! pong.activeNodes().isPresent()) return true; // no information; assume true + return pong.activeNodes().get() > 0; + } + @Override - public void deconstruct() { } + public void deconstruct() { + monitor.shutdown(); + } + + ExecutorService getExecutor() { + return fs4ResourcePool.getExecutor(); + } + + ScheduledExecutorService getScheduledExecutor() { + return fs4ResourcePool.getScheduledExecutor(); + } + + private class Pinger implements Runnable { + + private final Searcher searcher; + private final Ping pingChallenge = new Ping(monitor.getConfiguration().getRequestTimeout()); + private final Receiver<Pong> pong = new Receiver<>(); + + Pinger(final Searcher searcher) { + this.searcher = searcher; + } + + @Override + public void run() { + pong.put(createExecution().ping(pingChallenge)); + } + + private Execution createExecution() { + return new Execution(new Chain<>(searcher), + new Execution.Context(null, null, null, null, null)); + } + + public Pong getPong() throws InterruptedException { + Tuple2<MessageState, Pong> reply = pong.get(pingChallenge.getTimeout() + 150); + return (reply.first != MessageState.VALID) ? null : reply.second; + } + + } + } |