diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-10-10 13:05:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-10 13:05:33 +0200 |
commit | 2f635d64e4ba166db3b15854cc0339e0bcdb2f3d (patch) | |
tree | 1a461ea9fc22418035d4d6fcc77174ce7f453ee3 /storage | |
parent | f606c6d91246e2f029da40c66a1f7ad827c7c14a (diff) | |
parent | 1689de5f1f2b3a1adb55d60b17156deb2ab72281 (diff) |
Merge pull request #28825 from vespa-engine/vekterli/ensure-internal-messages-flushed-prior-to-rpc-shutdown
Ensure internal messages are flushed before shutting down RPC subsystem
Diffstat (limited to 'storage')
9 files changed, 164 insertions, 87 deletions
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp index c41696e1a02..5b7d279537e 100644 --- a/storage/src/tests/storageserver/bouncertest.cpp +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -51,9 +51,10 @@ struct BouncerTest : public Test { api::Timestamp timestamp, document::BucketSpace bucketSpace); - void expectMessageBouncedWithRejection(); - void expectMessageBouncedWithAbort(); - void expectMessageNotBounced(); + void expectMessageBouncedWithRejection() const; + void expect_message_bounced_with_node_down_abort() const; + void expect_message_bounced_with_shutdown_abort() const; + void expectMessageNotBounced() const; }; BouncerTest::BouncerTest() @@ -181,7 +182,7 @@ TEST_F(BouncerTest, allow_notify_bucket_change_even_when_distributor_down) { } void -BouncerTest::expectMessageBouncedWithRejection() +BouncerTest::expectMessageBouncedWithRejection() const { ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); @@ -191,7 +192,7 @@ BouncerTest::expectMessageBouncedWithRejection() } void -BouncerTest::expectMessageBouncedWithAbort() +BouncerTest::expect_message_bounced_with_node_down_abort() const { ASSERT_EQ(1, _upper->getNumReplies()); EXPECT_EQ(0, _upper->getNumCommands()); @@ -204,7 +205,17 @@ BouncerTest::expectMessageBouncedWithAbort() } void -BouncerTest::expectMessageNotBounced() +BouncerTest::expect_message_bounced_with_shutdown_abort() const +{ + ASSERT_EQ(1, _upper->getNumReplies()); + EXPECT_EQ(0, _upper->getNumCommands()); + auto& reply = dynamic_cast<api::StorageReply&>(*_upper->getReply(0)); + EXPECT_EQ(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"), reply.getResult()); + EXPECT_EQ(0, _lower->getNumCommands()); +} + +void +BouncerTest::expectMessageNotBounced() const { EXPECT_EQ(size_t(0), _upper->getNumReplies()); EXPECT_EQ(size_t(1), _lower->getNumCommands()); @@ -296,7 +307,7 @@ TEST_F(BouncerTest, abort_request_when_derived_bucket_space_node_state_is_marked auto state = makeClusterStateBundle("distributor:3 storage:3", {{ document::FixedBucketSpaces::default_space(), "distributor:3 storage:3 .2.s:d" }}); _node->getNodeStateUpdater().setClusterStateBundle(state); _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); - expectMessageBouncedWithAbort(); + expect_message_bounced_with_node_down_abort(); EXPECT_EQ(1, _manager->metrics().unavailable_node_aborts.getValue()); _upper->reset(); @@ -362,5 +373,23 @@ TEST_F(BouncerTest, operation_with_sufficient_bucket_bits_is_not_rejected) { expectMessageNotBounced(); } +TEST_F(BouncerTest, requests_are_rejected_after_close) { + _manager->close(); + _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space())); + expect_message_bounced_with_shutdown_abort(); +} + +TEST_F(BouncerTest, replies_are_swallowed_after_close) { + _manager->close(); + auto req = createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space()); + auto reply = req->makeReply(); + _upper->sendDown(std::move(reply)); + + EXPECT_EQ(0, _upper->getNumCommands()); + EXPECT_EQ(0, _upper->getNumReplies()); + EXPECT_EQ(0, _lower->getNumCommands()); + EXPECT_EQ(0, _lower->getNumReplies()); +} + } // storage diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index beccd605650..feed32f9b94 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -281,15 +281,14 @@ Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, vespalib::duration tim void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { - { - std::lock_guard sync(_lock); - _queue.emplace(std::move(msg)); - } + std::lock_guard sync(_lock); + _queue.emplace(std::move(msg)); _cond.notify_one(); } void Queue::signal() { + std::lock_guard sync(_lock); _cond.notify_one(); } diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index 404058325b9..fa98afe9489 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -30,19 +30,19 @@ Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & co _derivedNodeStates(), _clusterState(&lib::State::UP), _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), - _metrics(std::make_unique<BouncerMetrics>()) + _metrics(std::make_unique<BouncerMetrics>()), + _closed(false) { _component.getStateUpdater().addStateListener(*this); _component.registerMetric(*_metrics); // Register for config. Normally not critical, so catching config // exception allowing program to continue if missing/faulty config. - try{ + try { if (!configUri.empty()) { _configFetcher->subscribe<vespa::config::content::core::StorBouncerConfig>(configUri.getConfigId(), this); _configFetcher->start(); } else { - LOG(info, "No config id specified. Using defaults rather than " - "config"); + LOG(info, "No config id specified. Using defaults rather than config"); } } catch (config::InvalidConfigException& e) { LOG(info, "Bouncer failed to load config '%s'. This " @@ -70,6 +70,8 @@ Bouncer::onClose() { _configFetcher->close(); _component.getStateUpdater().removeStateListener(*this); + std::lock_guard guard(_lock); + _closed = true; } void @@ -86,8 +88,7 @@ const BouncerMetrics& Bouncer::metrics() const noexcept { } void -Bouncer::validateConfig( - const vespa::config::content::core::StorBouncerConfig& newConfig) const +Bouncer::validateConfig(const vespa::config::content::core::StorBouncerConfig& newConfig) const { if (newConfig.feedRejectionPriorityThreshold != -1) { if (newConfig.feedRejectionPriorityThreshold @@ -112,12 +113,11 @@ void Bouncer::append_node_identity(std::ostream& target_stream) const { } void -Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, - const lib::State& state) +Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::State& state) { // If we're not up or retired, fail due to this nodes state. std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "We don't allow command of type " << msg.getType() << " when node is in state " << state.toString(true); @@ -128,8 +128,7 @@ Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, } void -Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, - int maxClockSkewInSeconds) +Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxClockSkewInSeconds) { auto& as_cmd = dynamic_cast<api::StorageCommand&>(msg); std::ostringstream ost; @@ -140,7 +139,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, as_cmd.getSourceIndex(), ost.str().c_str()); _metrics->clock_skew_aborts.inc(); - std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply().release()); + std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); sendUp(reply); } @@ -148,8 +147,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, void Bouncer::abortCommandDueToClusterDown(api::StorageMessage& msg, const lib::State& cluster_state) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "We don't allow external load while cluster is in state " << cluster_state.toString(true); @@ -172,35 +170,35 @@ uint64_t Bouncer::extractMutationTimestampIfAny(const api::StorageMessage& msg) { switch (msg.getType().getId()) { - case api::MessageType::PUT_ID: - return static_cast<const api::PutCommand&>(msg).getTimestamp(); - case api::MessageType::REMOVE_ID: - return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); - case api::MessageType::UPDATE_ID: - return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); - default: - return 0; + case api::MessageType::PUT_ID: + return static_cast<const api::PutCommand&>(msg).getTimestamp(); + case api::MessageType::REMOVE_ID: + return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); + case api::MessageType::UPDATE_ID: + return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); + default: + return 0; } } bool -Bouncer::isExternalLoad(const api::MessageType& type) const noexcept +Bouncer::isExternalLoad(const api::MessageType& type) noexcept { switch (type.getId()) { - case api::MessageType::PUT_ID: - case api::MessageType::REMOVE_ID: - case api::MessageType::UPDATE_ID: - case api::MessageType::GET_ID: - case api::MessageType::VISITOR_CREATE_ID: - case api::MessageType::STATBUCKET_ID: - return true; - default: - return false; + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::GET_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::STATBUCKET_ID: + return true; + default: + return false; } } bool -Bouncer::isExternalWriteOperation(const api::MessageType& type) const noexcept { +Bouncer::isExternalWriteOperation(const api::MessageType& type) noexcept { switch (type.getId()) { case api::MessageType::PUT_ID: case api::MessageType::REMOVE_ID: @@ -216,8 +214,7 @@ Bouncer::rejectDueToInsufficientPriority( api::StorageMessage& msg, api::StorageMessage::Priority feedPriorityLowerBound) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "Operation priority (" << int(msg.getPriority()) << ") is lower than currently configured threshold (" @@ -231,8 +228,7 @@ Bouncer::rejectDueToInsufficientPriority( void Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { - std::shared_ptr<api::StorageReply> reply( - dynamic_cast<api::StorageCommand&>(msg).makeReply()); + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, vespalib::make_string("Operation bucket %s has too few bits used (%u < minimum of %u)", msg.getBucketId().toString().c_str(), @@ -241,31 +237,22 @@ Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { sendUp(reply); } +void +Bouncer::reject_due_to_node_shutdown(api::StorageMessage& msg) { + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down")); + sendUp(reply); +} + bool Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) { - const api::MessageType& type(msg->getType()); - // All replies can come in. - if (type.isReply()) { - return false; - } - - switch (type.getId()) { - case api::MessageType::SETNODESTATE_ID: - case api::MessageType::GETNODESTATE_ID: - case api::MessageType::SETSYSTEMSTATE_ID: - case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: - case api::MessageType::NOTIFYBUCKETCHANGE_ID: - // state commands are always ok - return false; - default: - break; - } const lib::State* state; int maxClockSkewInSeconds; bool isInAvailableState; bool abortLoadWhenClusterDown; - const lib::State *cluster_state; + bool closed; + const lib::State* cluster_state; int feedPriorityLowerBound; { std::lock_guard lock(_lock); @@ -275,7 +262,34 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) cluster_state = _clusterState; isInAvailableState = state->oneOf(_config->stopAllLoadWhenNodestateNotIn.c_str()); feedPriorityLowerBound = _config->feedRejectionPriorityThreshold; + closed = _closed; + } + const api::MessageType& type = msg->getType(); + // If the node is shutting down, we want to prevent _any_ messages from reaching + // components further down the call chain. This means this case must be handled + // _before_ any logic that explicitly allows through certain message types. + if (closed) [[unlikely]] { + if (!type.isReply()) { + reject_due_to_node_shutdown(*msg); + } // else: swallow all replies + return true; } + // All replies can come in. + if (type.isReply()) { + return false; + } + switch (type.getId()) { + case api::MessageType::SETNODESTATE_ID: + case api::MessageType::GETNODESTATE_ID: + case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: + case api::MessageType::NOTIFYBUCKETCHANGE_ID: + // state commands are always ok + return false; + default: + break; + } + // Special case for point lookup Gets while node is in maintenance mode // to allow reads to complete during two-phase cluster state transitions if ((*state == lib::State::MAINTENANCE) && (type.getId() == api::MessageType::GET_ID) && clusterIsUp(*cluster_state)) { diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h index 78f07f10316..1038e94ee94 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.h +++ b/storage/src/vespa/storage/storageserver/bouncer.h @@ -41,6 +41,7 @@ class Bouncer : public StorageLink, const lib::State* _clusterState; std::unique_ptr<config::ConfigFetcher> _configFetcher; std::unique_ptr<BouncerMetrics> _metrics; + bool _closed; public: Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri); @@ -60,11 +61,12 @@ private: void abortCommandDueToClusterDown(api::StorageMessage&, const lib::State&); void rejectDueToInsufficientPriority(api::StorageMessage&, api::StorageMessage::Priority); void reject_due_to_too_few_bucket_bits(api::StorageMessage&); + void reject_due_to_node_shutdown(api::StorageMessage&); static bool clusterIsUp(const lib::State& cluster_state); bool isDistributor() const; - bool isExternalLoad(const api::MessageType&) const noexcept; - bool isExternalWriteOperation(const api::MessageType&) const noexcept; - bool priorityRejectionIsEnabled(int configuredPriority) const noexcept { + static bool isExternalLoad(const api::MessageType&) noexcept; + static bool isExternalWriteOperation(const api::MessageType&) noexcept; + static bool priorityRejectionIsEnabled(int configuredPriority) noexcept { return (configuredPriority != -1); } @@ -72,7 +74,7 @@ private: * If msg is a command containing a mutating timestamp (put, remove or * update commands), return that timestamp. Otherwise, return 0. */ - uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg); + static uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg); bool onDown(const std::shared_ptr<api::StorageMessage>&) override; void handleNewState() noexcept override; const lib::NodeState &getDerivedNodeState(document::BucketSpace bucketSpace) const; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 610d9c8d707..ed279f53cf0 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -278,25 +278,25 @@ CommunicationManager::onClose() // Avoid getting config during shutdown _configFetcher.reset(); - _closed = true; - - if (_mbus) { - if (_messageBusSession) { - _messageBusSession->close(); - } - } - - // TODO remove? this no longer has any particularly useful semantics + _closed.store(true, std::memory_order_seq_cst); if (_cc_rpc_service) { - _cc_rpc_service->close(); + _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on } - // TODO do this after we drain queues? + // Sync all RPC threads to ensure that any subsequent RPCs must observe the closed-flags we just set if (_shared_rpc_resources) { - _shared_rpc_resources->shutdown(); + _shared_rpc_resources->sync_all_threads(); + } + + if (_mbus && _messageBusSession) { + // Closing the mbus session unregisters the destination session and syncs the worker + // thread(s), so once this call returns we should not observe further incoming requests + // through this pipeline. Previous messages may already be in flight internally; these + // will be handled by flushing-phases. + _messageBusSession->close(); } - // Stopping pumper thread should stop all incoming messages from being - // processed. + // Stopping internal message dispatch thread should stop all incoming _async_ messages + // from being processed. _Synchronously_ dispatched RPCs are still passing through. if (_thread) { _thread->interrupt(); _eventQueue.signal(); @@ -305,13 +305,12 @@ CommunicationManager::onClose() } // Emptying remaining queued messages - // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this std::shared_ptr<api::StorageMessage> msg; api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down"); while (_eventQueue.size() > 0) { assert(_eventQueue.getNext(msg, 0ms)); if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply()); + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(*msg).makeReply()); reply->setResult(code); sendReply(reply); } @@ -319,6 +318,29 @@ CommunicationManager::onClose() } void +CommunicationManager::onFlush(bool downwards) +{ + if (downwards) { + // Sync RPC threads once more (with feeling!) to ensure that any closing done by other components + // during the storage chain onClose() is visible to these. + if (_shared_rpc_resources) { + _shared_rpc_resources->sync_all_threads(); + } + // By this point, no inbound RPCs (requests and responses) should be allowed any further down + // than the Bouncer component, where they will be, well, bounced. + } else { + // All components further down the storage chain should now be completely closed + // and flushed, and all message-dispatching threads should have been shut down. + // It's possible that the RPC threads are still butting heads up against the Bouncer + // component, so we conclude the shutdown ceremony by taking down the RPC subsystem. + // This transitively waits for all RPC threads to complete. + if (_shared_rpc_resources) { + _shared_rpc_resources->shutdown(); + } + } +} + +void CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig& cfg) { const bool isDist(_component.getNodeType() == lib::NodeType::DISTRIBUTOR); @@ -438,11 +460,15 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) } } +// Called directly by RPC threads void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + // If process is shutting down, msg will be synchronously aborted by the Bouncer component process(msg); } +// Called directly by RPC threads (for incoming CC requests) and by any other request-dispatching +// threads (i.e. calling sendUp) when address resolution fails and an internal error response is generated. void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Enqueued dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); _eventQueue.enqueue(std::move(msg)); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index da45124ed2d..3c986c59c5e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -89,6 +89,7 @@ private: void onOpen() override; void onClose() override; + void onFlush(bool downwards) override; void process(const std::shared_ptr<api::StorageMessage>& msg); diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index 172084662e2..eb933f5eb2c 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -105,6 +105,10 @@ void SharedRpcResources::wait_until_slobrok_is_ready() { } } +void SharedRpcResources::sync_all_threads() { + _transport->sync(); +} + void SharedRpcResources::shutdown() { assert(!_shutdown); if (listen_port() > 0) { diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index 1da89dd8869..d8f7eefad53 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -42,6 +42,8 @@ public: // To be called after all RPC handlers have been registered. void start_server_and_register_slobrok(vespalib::stringref my_handle); + void sync_all_threads(); + void shutdown(); [[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 3231deef268..452a94496af 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -358,7 +358,7 @@ StorageNode::shutdown() { // Try to shut down in opposite order of initialize. Bear in mind that // we might be shutting down after init exception causing only parts - // of the server to have initialize + // of the server to have been initialized LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str()); if (!attemptedStopped()) { LOG(debug, "Storage killed before requestShutdown() was called. No " |