diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-10-12 09:32:41 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-10-12 09:32:41 +0000 |
commit | 39d0a12ae3a1c371660b7a92b1098af9aba037d9 (patch) | |
tree | 9aab749db9c2e1398b6f52a362b7215ff2430cc6 /storage | |
parent | 64d2d1dff58fa1f66515e24369968ac79d1b97f6 (diff) |
Allow Bouncer to send messages up when in state `CLOSED`
This is required to allow messages to be bounced during the
final chain flushing step where the `CommunicationManager` is
shutting down the RPC subsystem and waiting for all RPC threads
to complete. At this point the Bouncer component below it has
completed transition into its final `CLOSED` state.
This is symmetrical to allowing the `CommunicationManager` to
send messages down while in a `FLUSHINGUP` state.
Diffstat (limited to 'storage')
4 files changed, 17 insertions, 6 deletions
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index e774e6967b9..020c2f10467 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -14,17 +14,20 @@ using namespace storage::api; namespace storage { -StorageLink::StorageLink(const std::string& name, bool allow_msg_down_during_flushing) +StorageLink::StorageLink(const std::string& name, + bool allow_msg_down_during_flushing, + bool allow_msg_up_during_closed) : _name(name), _up(nullptr), _down(), _state(CREATED), - _allow_msg_down_during_flushing(allow_msg_down_during_flushing) + _allow_msg_down_during_flushing(allow_msg_down_during_flushing), + _allow_msg_up_during_closed(allow_msg_up_during_closed) { } StorageLink::StorageLink(const std::string& name) - : StorageLink(name, false) + : StorageLink(name, false, false) { } @@ -191,6 +194,11 @@ void StorageLink::sendUp(const std::shared_ptr<StorageMessage> & msg) case FLUSHINGDOWN: case FLUSHINGUP: break; + case CLOSED: + if (_allow_msg_up_during_closed) { + break; + } + [[fallthrough]]; default: LOG(error, "Link %s trying to send %s up while in state %s. Stacktrace: %s", toString().c_str(), msg->toString(true).c_str(), stateToString(getState()), diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index 1682804a746..0330728d69f 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -47,9 +47,12 @@ private: std::unique_ptr<StorageLink> _down; std::atomic<State> _state; const bool _allow_msg_down_during_flushing; + const bool _allow_msg_up_during_closed; public: - StorageLink(const std::string& name, bool allow_msg_down_during_flushing); + StorageLink(const std::string& name, + bool allow_msg_down_during_flushing, + bool allow_msg_up_during_closed); explicit StorageLink(const std::string& name); StorageLink(const StorageLink &) = delete; diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index fa98afe9489..66dd35a9d81 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -22,7 +22,7 @@ LOG_SETUP(".bouncer"); namespace storage { Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Bouncer"), + : StorageLink("Bouncer", false, true), // Explicitly allow msg up when closed (bouncing to flushing comm. mgr) _config(new vespa::config::content::core::StorBouncerConfig()), _component(compReg, "bouncer"), _lock(), diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 37ee7cc2301..a3a2512d2ae 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -217,7 +217,7 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Communication manager", true), // Explicitly allow msg down during flushing (will be bounced) + : StorageLink("Communication manager", true, false), // Explicitly allow msg down during flushing (will be bounced) _component(compReg, "communicationmanager"), _metrics(), _shared_rpc_resources(), // Created upon initial configuration |