summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-10-11 12:07:44 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-10-11 12:18:50 +0000
commitb8890fd12881f96e9990924e6072b21d77e924a2 (patch)
tree8d412c6e4de45e24380e4a5fb23f3876f68bc7ee /storage
parentbc01cce7bd7f0137adc55a29596f6dc0ae202115 (diff)
Allow CommunicationManager to send down messages during flushing
Since we now shut down the RPC server as the last step during flushing, it's possible for incoming RPCs to arrive before we get to this point. These will be immediately bounced (or swallowed) by the Bouncer component that lies directly below the CommunicationManager, but to actually get there we need to allow messages down in the StorageLink `FLUSHINGUP` state. This commit allows this explicitly for the CommunicationManager and disallows it for everyone else. Also added stack trace dumping to the log in the case that a violation is detected.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp29
-rw-r--r--storage/src/vespa/storage/common/storagelink.h26
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp2
3 files changed, 42 insertions, 15 deletions
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index feed32f9b94..e774e6967b9 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -14,6 +14,20 @@ using namespace storage::api;
namespace storage {
+StorageLink::StorageLink(const std::string& name, bool allow_msg_down_during_flushing)
+ : _name(name),
+ _up(nullptr),
+ _down(),
+ _state(CREATED),
+ _allow_msg_down_during_flushing(allow_msg_down_during_flushing)
+{
+}
+
+StorageLink::StorageLink(const std::string& name)
+ : StorageLink(name, false)
+{
+}
+
StorageLink::~StorageLink() {
LOG(debug, "Destructing link %s.", toString().c_str());
}
@@ -129,9 +143,15 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
case CLOSING:
case FLUSHINGDOWN:
break;
+ case FLUSHINGUP:
+ if (_allow_msg_down_during_flushing) {
+ break;
+ }
+ [[fallthrough]];
default:
- LOG(error, "Link %s trying to send %s down while in state %s",
- toString().c_str(), msg->toString().c_str(), stateToString(getState()));
+ LOG(error, "Link %s trying to send %s down while in state %s. Stacktrace: %s",
+ toString().c_str(), msg->toString().c_str(), stateToString(getState()),
+ vespalib::getStackTrace(0).c_str());
assert(false);
}
assert(msg);
@@ -172,8 +192,9 @@ void StorageLink::sendUp(const std::shared_ptr<StorageMessage> & msg)
case FLUSHINGUP:
break;
default:
- LOG(error, "Link %s trying to send %s up while in state %s",
- toString().c_str(), msg->toString(true).c_str(), stateToString(getState()));
+ LOG(error, "Link %s trying to send %s up while in state %s. Stacktrace: %s",
+ toString().c_str(), msg->toString(true).c_str(), stateToString(getState()),
+ vespalib::getStackTrace(0).c_str());
assert(false);
}
assert(msg);
diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h
index 2b470d029d8..1682804a746 100644
--- a/storage/src/vespa/storage/common/storagelink.h
+++ b/storage/src/vespa/storage/common/storagelink.h
@@ -42,28 +42,34 @@ public:
enum State { CREATED, OPENED, CLOSING, FLUSHINGDOWN, FLUSHINGUP, CLOSED };
private:
- std::string _name;
- StorageLink* _up;
+ const std::string _name;
+ StorageLink* _up;
std::unique_ptr<StorageLink> _down;
- std::atomic<State> _state;
+ std::atomic<State> _state;
+ const bool _allow_msg_down_during_flushing;
public:
+ StorageLink(const std::string& name, bool allow_msg_down_during_flushing);
+ explicit StorageLink(const std::string& name);
+
StorageLink(const StorageLink &) = delete;
StorageLink & operator = (const StorageLink &) = delete;
- StorageLink(const std::string& name)
- : _name(name), _up(0), _down(), _state(CREATED) {}
~StorageLink() override;
- const std::string& getName() const { return _name; }
- bool isTop() const { return (_up == 0); }
- bool isBottom() const { return (_down.get() == 0); }
- unsigned int size() const { return (isBottom() ? 1 : _down->size() + 1); }
+ const std::string& getName() const noexcept { return _name; }
+ [[nodiscard]] bool isTop() const noexcept { return !_up; }
+ [[nodiscard]] bool isBottom() const noexcept { return !_down; }
+ [[nodiscard]] unsigned int size() const noexcept {
+ return (isBottom() ? 1 : _down->size() + 1);
+ }
/** Adds the link to the end of the chain. */
void push_back(StorageLink::UP);
/** Get the current state of the storage link. */
- State getState() const noexcept { return _state.load(std::memory_order_relaxed); }
+ [[nodiscard]] State getState() const noexcept {
+ return _state.load(std::memory_order_relaxed);
+ }
/**
* Called by storage server after the storage chain have been created.
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ed279f53cf0..37ee7cc2301 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -217,7 +217,7 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
- : StorageLink("Communication manager"),
+ : StorageLink("Communication manager", true), // Explicitly allow msg down during flushing (will be bounced)
_component(compReg, "communicationmanager"),
_metrics(),
_shared_rpc_resources(), // Created upon initial configuration