aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/communicationmanager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa/storage/storageserver/communicationmanager.cpp')
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp124
1 files changed, 77 insertions, 47 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 95ed9188422..bbd4e87cb40 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -1,4 +1,4 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "communicationmanager.h"
#include "rpcrequestwrapper.h"
@@ -216,18 +216,21 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun
}
-CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
- : StorageLink("Communication manager"),
+CommunicationManager::CommunicationManager(StorageComponentRegister& compReg,
+ const config::ConfigUri& configUri,
+ const CommunicationManagerConfig& bootstrap_config)
+ : StorageLink("Communication manager", MsgDownOnFlush::Allowed, MsgUpOnClosed::Disallowed),
_component(compReg, "communicationmanager"),
_metrics(),
_shared_rpc_resources(), // Created upon initial configuration
_storage_api_rpc_service(), // (ditto)
_cc_rpc_service(), // (ditto)
_eventQueue(),
+ _bootstrap_config(std::make_unique<CommunicationManagerConfig>(bootstrap_config)),
_mbus(),
_configUri(configUri),
_closed(false),
- _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()),
+ _docApiConverter(std::make_shared<PlaceHolderBucketResolver>()),
_thread()
{
_component.registerMetricUpdateHook(*this, 5s);
@@ -237,9 +240,13 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
void
CommunicationManager::onOpen()
{
- _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext());
- _configFetcher->subscribe<vespa::config::content::core::StorCommunicationmanagerConfig>(_configUri.getConfigId(), this);
- _configFetcher->start();
+ // We have to hold on to the bootstrap config until we reach the open-phase, as the
+ // actual RPC/mbus endpoints are started at the first config edge.
+ // Note: this is called as part of synchronous node initialization, which explicitly
+ // prevents any concurrent reconfiguration prior to opening all storage chain components,
+ // i.e. there's no risk of on_configure() being called _prior_ to us getting here.
+ on_configure(*_bootstrap_config);
+ _bootstrap_config.reset();
_thread = _component.startThread(*this, 60s);
if (_shared_rpc_resources) {
@@ -275,28 +282,25 @@ CommunicationManager::~CommunicationManager()
void
CommunicationManager::onClose()
{
- // Avoid getting config during shutdown
- _configFetcher.reset();
-
- _closed = true;
-
- if (_mbus) {
- if (_messageBusSession) {
- _messageBusSession->close();
- }
- }
-
- // TODO remove? this no longer has any particularly useful semantics
+ _closed.store(true, std::memory_order_seq_cst);
if (_cc_rpc_service) {
- _cc_rpc_service->close();
+ _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on
}
- // TODO do this after we drain queues?
+ // Sync all RPC threads to ensure that any subsequent RPCs must observe the closed-flags we just set
if (_shared_rpc_resources) {
- _shared_rpc_resources->shutdown();
+ _shared_rpc_resources->sync_all_threads();
+ }
+
+ if (_mbus && _messageBusSession) {
+ // Closing the mbus session unregisters the destination session and syncs the worker
+ // thread(s), so once this call returns we should not observe further incoming requests
+ // through this pipeline. Previous messages may already be in flight internally; these
+ // will be handled by flushing-phases.
+ _messageBusSession->close();
}
- // Stopping pumper thread should stop all incoming messages from being
- // processed.
+ // Stopping internal message dispatch thread should stop all incoming _async_ messages
+ // from being processed. _Synchronously_ dispatched RPCs are still passing through.
if (_thread) {
_thread->interrupt();
_eventQueue.signal();
@@ -305,13 +309,12 @@ CommunicationManager::onClose()
}
// Emptying remaining queued messages
- // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this
std::shared_ptr<api::StorageMessage> msg;
api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down");
while (_eventQueue.size() > 0) {
assert(_eventQueue.getNext(msg, 0ms));
if (!msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(*msg).makeReply());
reply->setResult(code);
sendReply(reply);
}
@@ -319,6 +322,29 @@ CommunicationManager::onClose()
}
void
+CommunicationManager::onFlush(bool downwards)
+{
+ if (downwards) {
+ // Sync RPC threads once more (with feeling!) to ensure that any closing done by other components
+ // during the storage chain onClose() is visible to these.
+ if (_shared_rpc_resources) {
+ _shared_rpc_resources->sync_all_threads();
+ }
+ // By this point, no inbound RPCs (requests and responses) should be allowed any further down
+ // than the Bouncer component, where they will be, well, bounced.
+ } else {
+ // All components further down the storage chain should now be completely closed
+ // and flushed, and all message-dispatching threads should have been shut down.
+ // It's possible that the RPC threads are still butting heads up against the Bouncer
+ // component, so we conclude the shutdown ceremony by taking down the RPC subsystem.
+ // This transitively waits for all RPC threads to complete.
+ if (_shared_rpc_resources) {
+ _shared_rpc_resources->shutdown();
+ }
+ }
+}
+
+void
CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig& cfg)
{
const bool isDist(_component.getNodeType() == lib::NodeType::DISTRIBUTOR);
@@ -330,20 +356,20 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig
}
void
-CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config)
+CommunicationManager::on_configure(const CommunicationManagerConfig& config)
{
// Only allow dynamic (live) reconfiguration of message bus limits.
if (_mbus) {
- configureMessageBusLimits(*config);
- if (_mbus->getRPCNetwork().getPort() != config->mbusport) {
+ configureMessageBusLimits(config);
+ if (_mbus->getRPCNetwork().getPort() != config.mbusport) {
auto m = make_string("mbus port changed from %d to %d. Will conduct a quick, but controlled restart.",
- _mbus->getRPCNetwork().getPort(), config->mbusport);
+ _mbus->getRPCNetwork().getPort(), config.mbusport);
LOG(warning, "%s", m.c_str());
_component.requestShutdown(m);
}
- if (_shared_rpc_resources->listen_port() != config->rpcport) {
+ if (_shared_rpc_resources->listen_port() != config.rpcport) {
auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.",
- _shared_rpc_resources->listen_port(), config->rpcport);
+ _shared_rpc_resources->listen_port(), config.rpcport);
LOG(warning, "%s", m.c_str());
_component.requestShutdown(m);
}
@@ -353,25 +379,25 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf
if (!_configUri.empty()) {
LOG(debug, "setting up slobrok config from id: '%s", _configUri.getConfigId().c_str());
mbus::RPCNetworkParams params(_configUri);
- params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl);
- params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads));
- params.setNumRpcTargets(std::max(1, config->mbus.numRpcTargets));
- params.events_before_wakeup(std::max(1, config->mbus.eventsBeforeWakeup));
- params.setTcpNoDelay(config->mbus.tcpNoDelay);
+ params.setConnectionExpireSecs(config.mbus.rpctargetcache.ttl);
+ params.setNumNetworkThreads(std::max(1, config.mbus.numNetworkThreads));
+ params.setNumRpcTargets(std::max(1, config.mbus.numRpcTargets));
+ params.events_before_wakeup(std::max(1, config.mbus.eventsBeforeWakeup));
+ params.setTcpNoDelay(config.mbus.tcpNoDelay);
params.required_capabilities(vespalib::net::tls::CapabilitySet::of({
vespalib::net::tls::Capability::content_document_api()
}));
params.setIdentity(mbus::Identity(_component.getIdentity()));
- if (config->mbusport != -1) {
- params.setListenPort(config->mbusport);
+ if (config.mbusport != -1) {
+ params.setListenPort(config.mbusport);
}
using CompressionConfig = vespalib::compression::CompressionConfig;
CompressionConfig::Type compressionType = CompressionConfig::toType(
- CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str());
- params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level,
- 90, config->mbus.compress.limit));
+ CommunicationManagerConfig::Mbus::Compress::getTypeName(config.mbus.compress.type).c_str());
+ params.setCompressionConfig(CompressionConfig(compressionType, config.mbus.compress.level,
+ 90, config.mbus.compress.limit));
// Configure messagebus here as we for legacy reasons have
// config here.
@@ -381,16 +407,16 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf
params,
_configUri);
- configureMessageBusLimits(*config);
+ configureMessageBusLimits(config);
}
_message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo);
- _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport,
- config->rpc.numNetworkThreads, config->rpc.eventsBeforeWakeup);
+ _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config.rpcport,
+ config.rpc.numNetworkThreads, config.rpc.eventsBeforeWakeup);
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
rpc::StorageApiRpcService::Params rpc_params;
- rpc_params.compression_config = convert_to_rpc_compression_config(*config);
- rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode;
+ rpc_params.compression_config = convert_to_rpc_compression_config(config);
+ rpc_params.num_rpc_targets_per_node = config.rpc.numTargetsPerNode;
_storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(
*this, *_shared_rpc_resources, *_message_codec_provider, rpc_params);
@@ -438,11 +464,15 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
}
}
+// Called directly by RPC threads
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
+ // If process is shutting down, msg will be synchronously aborted by the Bouncer component
process(msg);
}
+// Called directly by RPC threads (for incoming CC requests) and by any other request-dispatching
+// threads (i.e. calling sendUp) when address resolution fails and an internal error response is generated.
void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Enqueued dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
_eventQueue.enqueue(std::move(msg));