diff options
Diffstat (limited to 'storage/src/vespa/storage/storageserver/communicationmanager.cpp')
-rw-r--r-- | storage/src/vespa/storage/storageserver/communicationmanager.cpp | 124 |
1 files changed, 77 insertions, 47 deletions
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 95ed9188422..bbd4e87cb40 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "communicationmanager.h" #include "rpcrequestwrapper.h" @@ -216,18 +216,21 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun } -CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Communication manager"), +CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, + const config::ConfigUri& configUri, + const CommunicationManagerConfig& bootstrap_config) + : StorageLink("Communication manager", MsgDownOnFlush::Allowed, MsgUpOnClosed::Disallowed), _component(compReg, "communicationmanager"), _metrics(), _shared_rpc_resources(), // Created upon initial configuration _storage_api_rpc_service(), // (ditto) _cc_rpc_service(), // (ditto) _eventQueue(), + _bootstrap_config(std::make_unique<CommunicationManagerConfig>(bootstrap_config)), _mbus(), _configUri(configUri), _closed(false), - _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), + _docApiConverter(std::make_shared<PlaceHolderBucketResolver>()), _thread() { _component.registerMetricUpdateHook(*this, 5s); @@ -237,9 +240,13 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co void CommunicationManager::onOpen() { - _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext()); - _configFetcher->subscribe<vespa::config::content::core::StorCommunicationmanagerConfig>(_configUri.getConfigId(), this); - _configFetcher->start(); + // We have to hold on to the bootstrap config until we reach the open-phase, as the + // actual RPC/mbus endpoints are started at the first config edge. + // Note: this is called as part of synchronous node initialization, which explicitly + // prevents any concurrent reconfiguration prior to opening all storage chain components, + // i.e. there's no risk of on_configure() being called _prior_ to us getting here. + on_configure(*_bootstrap_config); + _bootstrap_config.reset(); _thread = _component.startThread(*this, 60s); if (_shared_rpc_resources) { @@ -275,28 +282,25 @@ CommunicationManager::~CommunicationManager() void CommunicationManager::onClose() { - // Avoid getting config during shutdown - _configFetcher.reset(); - - _closed = true; - - if (_mbus) { - if (_messageBusSession) { - _messageBusSession->close(); - } - } - - // TODO remove? this no longer has any particularly useful semantics + _closed.store(true, std::memory_order_seq_cst); if (_cc_rpc_service) { - _cc_rpc_service->close(); + _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on } - // TODO do this after we drain queues? + // Sync all RPC threads to ensure that any subsequent RPCs must observe the closed-flags we just set if (_shared_rpc_resources) { - _shared_rpc_resources->shutdown(); + _shared_rpc_resources->sync_all_threads(); + } + + if (_mbus && _messageBusSession) { + // Closing the mbus session unregisters the destination session and syncs the worker + // thread(s), so once this call returns we should not observe further incoming requests + // through this pipeline. Previous messages may already be in flight internally; these + // will be handled by flushing-phases. + _messageBusSession->close(); } - // Stopping pumper thread should stop all incoming messages from being - // processed. + // Stopping internal message dispatch thread should stop all incoming _async_ messages + // from being processed. _Synchronously_ dispatched RPCs are still passing through. if (_thread) { _thread->interrupt(); _eventQueue.signal(); @@ -305,13 +309,12 @@ CommunicationManager::onClose() } // Emptying remaining queued messages - // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this std::shared_ptr<api::StorageMessage> msg; api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down"); while (_eventQueue.size() > 0) { assert(_eventQueue.getNext(msg, 0ms)); if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply()); + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(*msg).makeReply()); reply->setResult(code); sendReply(reply); } @@ -319,6 +322,29 @@ CommunicationManager::onClose() } void +CommunicationManager::onFlush(bool downwards) +{ + if (downwards) { + // Sync RPC threads once more (with feeling!) to ensure that any closing done by other components + // during the storage chain onClose() is visible to these. + if (_shared_rpc_resources) { + _shared_rpc_resources->sync_all_threads(); + } + // By this point, no inbound RPCs (requests and responses) should be allowed any further down + // than the Bouncer component, where they will be, well, bounced. + } else { + // All components further down the storage chain should now be completely closed + // and flushed, and all message-dispatching threads should have been shut down. + // It's possible that the RPC threads are still butting heads up against the Bouncer + // component, so we conclude the shutdown ceremony by taking down the RPC subsystem. + // This transitively waits for all RPC threads to complete. + if (_shared_rpc_resources) { + _shared_rpc_resources->shutdown(); + } + } +} + +void CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig& cfg) { const bool isDist(_component.getNodeType() == lib::NodeType::DISTRIBUTOR); @@ -330,20 +356,20 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig } void -CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config) +CommunicationManager::on_configure(const CommunicationManagerConfig& config) { // Only allow dynamic (live) reconfiguration of message bus limits. if (_mbus) { - configureMessageBusLimits(*config); - if (_mbus->getRPCNetwork().getPort() != config->mbusport) { + configureMessageBusLimits(config); + if (_mbus->getRPCNetwork().getPort() != config.mbusport) { auto m = make_string("mbus port changed from %d to %d. Will conduct a quick, but controlled restart.", - _mbus->getRPCNetwork().getPort(), config->mbusport); + _mbus->getRPCNetwork().getPort(), config.mbusport); LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } - if (_shared_rpc_resources->listen_port() != config->rpcport) { + if (_shared_rpc_resources->listen_port() != config.rpcport) { auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.", - _shared_rpc_resources->listen_port(), config->rpcport); + _shared_rpc_resources->listen_port(), config.rpcport); LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } @@ -353,25 +379,25 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf if (!_configUri.empty()) { LOG(debug, "setting up slobrok config from id: '%s", _configUri.getConfigId().c_str()); mbus::RPCNetworkParams params(_configUri); - params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); - params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads)); - params.setNumRpcTargets(std::max(1, config->mbus.numRpcTargets)); - params.events_before_wakeup(std::max(1, config->mbus.eventsBeforeWakeup)); - params.setTcpNoDelay(config->mbus.tcpNoDelay); + params.setConnectionExpireSecs(config.mbus.rpctargetcache.ttl); + params.setNumNetworkThreads(std::max(1, config.mbus.numNetworkThreads)); + params.setNumRpcTargets(std::max(1, config.mbus.numRpcTargets)); + params.events_before_wakeup(std::max(1, config.mbus.eventsBeforeWakeup)); + params.setTcpNoDelay(config.mbus.tcpNoDelay); params.required_capabilities(vespalib::net::tls::CapabilitySet::of({ vespalib::net::tls::Capability::content_document_api() })); params.setIdentity(mbus::Identity(_component.getIdentity())); - if (config->mbusport != -1) { - params.setListenPort(config->mbusport); + if (config.mbusport != -1) { + params.setListenPort(config.mbusport); } using CompressionConfig = vespalib::compression::CompressionConfig; CompressionConfig::Type compressionType = CompressionConfig::toType( - CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str()); - params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level, - 90, config->mbus.compress.limit)); + CommunicationManagerConfig::Mbus::Compress::getTypeName(config.mbus.compress.type).c_str()); + params.setCompressionConfig(CompressionConfig(compressionType, config.mbus.compress.level, + 90, config.mbus.compress.limit)); // Configure messagebus here as we for legacy reasons have // config here. @@ -381,16 +407,16 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf params, _configUri); - configureMessageBusLimits(*config); + configureMessageBusLimits(config); } _message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo); - _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, - config->rpc.numNetworkThreads, config->rpc.eventsBeforeWakeup); + _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config.rpcport, + config.rpc.numNetworkThreads, config.rpc.eventsBeforeWakeup); _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); rpc::StorageApiRpcService::Params rpc_params; - rpc_params.compression_config = convert_to_rpc_compression_config(*config); - rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode; + rpc_params.compression_config = convert_to_rpc_compression_config(config); + rpc_params.num_rpc_targets_per_node = config.rpc.numTargetsPerNode; _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>( *this, *_shared_rpc_resources, *_message_codec_provider, rpc_params); @@ -438,11 +464,15 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) } } +// Called directly by RPC threads void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + // If process is shutting down, msg will be synchronously aborted by the Bouncer component process(msg); } +// Called directly by RPC threads (for incoming CC requests) and by any other request-dispatching +// threads (i.e. calling sendUp) when address resolution fails and an internal error response is generated. void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Enqueued dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); _eventQueue.enqueue(std::move(msg)); |