diff options
-rw-r--r-- | storage/src/vespa/storage/common/storagelink.cpp | 29 | ||||
-rw-r--r-- | storage/src/vespa/storage/common/storagelink.h | 26 | ||||
-rw-r--r-- | storage/src/vespa/storage/storageserver/communicationmanager.cpp | 2 |
3 files changed, 42 insertions, 15 deletions
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index feed32f9b94..e774e6967b9 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -14,6 +14,20 @@ using namespace storage::api; namespace storage { +StorageLink::StorageLink(const std::string& name, bool allow_msg_down_during_flushing) + : _name(name), + _up(nullptr), + _down(), + _state(CREATED), + _allow_msg_down_during_flushing(allow_msg_down_during_flushing) +{ +} + +StorageLink::StorageLink(const std::string& name) + : StorageLink(name, false) +{ +} + StorageLink::~StorageLink() { LOG(debug, "Destructing link %s.", toString().c_str()); } @@ -129,9 +143,15 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) case CLOSING: case FLUSHINGDOWN: break; + case FLUSHINGUP: + if (_allow_msg_down_during_flushing) { + break; + } + [[fallthrough]]; default: - LOG(error, "Link %s trying to send %s down while in state %s", - toString().c_str(), msg->toString().c_str(), stateToString(getState())); + LOG(error, "Link %s trying to send %s down while in state %s. Stacktrace: %s", + toString().c_str(), msg->toString().c_str(), stateToString(getState()), + vespalib::getStackTrace(0).c_str()); assert(false); } assert(msg); @@ -172,8 +192,9 @@ void StorageLink::sendUp(const std::shared_ptr<StorageMessage> & msg) case FLUSHINGUP: break; default: - LOG(error, "Link %s trying to send %s up while in state %s", - toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); + LOG(error, "Link %s trying to send %s up while in state %s. Stacktrace: %s", + toString().c_str(), msg->toString(true).c_str(), stateToString(getState()), + vespalib::getStackTrace(0).c_str()); assert(false); } assert(msg); diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index 2b470d029d8..1682804a746 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -42,28 +42,34 @@ public: enum State { CREATED, OPENED, CLOSING, FLUSHINGDOWN, FLUSHINGUP, CLOSED }; private: - std::string _name; - StorageLink* _up; + const std::string _name; + StorageLink* _up; std::unique_ptr<StorageLink> _down; - std::atomic<State> _state; + std::atomic<State> _state; + const bool _allow_msg_down_during_flushing; public: + StorageLink(const std::string& name, bool allow_msg_down_during_flushing); + explicit StorageLink(const std::string& name); + StorageLink(const StorageLink &) = delete; StorageLink & operator = (const StorageLink &) = delete; - StorageLink(const std::string& name) - : _name(name), _up(0), _down(), _state(CREATED) {} ~StorageLink() override; - const std::string& getName() const { return _name; } - bool isTop() const { return (_up == 0); } - bool isBottom() const { return (_down.get() == 0); } - unsigned int size() const { return (isBottom() ? 1 : _down->size() + 1); } + const std::string& getName() const noexcept { return _name; } + [[nodiscard]] bool isTop() const noexcept { return !_up; } + [[nodiscard]] bool isBottom() const noexcept { return !_down; } + [[nodiscard]] unsigned int size() const noexcept { + return (isBottom() ? 1 : _down->size() + 1); + } /** Adds the link to the end of the chain. */ void push_back(StorageLink::UP); /** Get the current state of the storage link. */ - State getState() const noexcept { return _state.load(std::memory_order_relaxed); } + [[nodiscard]] State getState() const noexcept { + return _state.load(std::memory_order_relaxed); + } /** * Called by storage server after the storage chain have been created. diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index ed279f53cf0..37ee7cc2301 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -217,7 +217,7 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Communication manager"), + : StorageLink("Communication manager", true), // Explicitly allow msg down during flushing (will be bounced) _component(compReg, "communicationmanager"), _metrics(), _shared_rpc_resources(), // Created upon initial configuration |