summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp29
-rw-r--r--storage/src/vespa/storage/common/storagelink.h26
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp2
3 files changed, 42 insertions, 15 deletions
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index feed32f9b94..e774e6967b9 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -14,6 +14,20 @@ using namespace storage::api;
namespace storage {
+StorageLink::StorageLink(const std::string& name, bool allow_msg_down_during_flushing)
+ : _name(name),
+ _up(nullptr),
+ _down(),
+ _state(CREATED),
+ _allow_msg_down_during_flushing(allow_msg_down_during_flushing)
+{
+}
+
+StorageLink::StorageLink(const std::string& name)
+ : StorageLink(name, false)
+{
+}
+
StorageLink::~StorageLink() {
LOG(debug, "Destructing link %s.", toString().c_str());
}
@@ -129,9 +143,15 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
case CLOSING:
case FLUSHINGDOWN:
break;
+ case FLUSHINGUP:
+ if (_allow_msg_down_during_flushing) {
+ break;
+ }
+ [[fallthrough]];
default:
- LOG(error, "Link %s trying to send %s down while in state %s",
- toString().c_str(), msg->toString().c_str(), stateToString(getState()));
+ LOG(error, "Link %s trying to send %s down while in state %s. Stacktrace: %s",
+ toString().c_str(), msg->toString().c_str(), stateToString(getState()),
+ vespalib::getStackTrace(0).c_str());
assert(false);
}
assert(msg);
@@ -172,8 +192,9 @@ void StorageLink::sendUp(const std::shared_ptr<StorageMessage> & msg)
case FLUSHINGUP:
break;
default:
- LOG(error, "Link %s trying to send %s up while in state %s",
- toString().c_str(), msg->toString(true).c_str(), stateToString(getState()));
+ LOG(error, "Link %s trying to send %s up while in state %s. Stacktrace: %s",
+ toString().c_str(), msg->toString(true).c_str(), stateToString(getState()),
+ vespalib::getStackTrace(0).c_str());
assert(false);
}
assert(msg);
diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h
index 2b470d029d8..1682804a746 100644
--- a/storage/src/vespa/storage/common/storagelink.h
+++ b/storage/src/vespa/storage/common/storagelink.h
@@ -42,28 +42,34 @@ public:
enum State { CREATED, OPENED, CLOSING, FLUSHINGDOWN, FLUSHINGUP, CLOSED };
private:
- std::string _name;
- StorageLink* _up;
+ const std::string _name;
+ StorageLink* _up;
std::unique_ptr<StorageLink> _down;
- std::atomic<State> _state;
+ std::atomic<State> _state;
+ const bool _allow_msg_down_during_flushing;
public:
+ StorageLink(const std::string& name, bool allow_msg_down_during_flushing);
+ explicit StorageLink(const std::string& name);
+
StorageLink(const StorageLink &) = delete;
StorageLink & operator = (const StorageLink &) = delete;
- StorageLink(const std::string& name)
- : _name(name), _up(0), _down(), _state(CREATED) {}
~StorageLink() override;
- const std::string& getName() const { return _name; }
- bool isTop() const { return (_up == 0); }
- bool isBottom() const { return (_down.get() == 0); }
- unsigned int size() const { return (isBottom() ? 1 : _down->size() + 1); }
+ const std::string& getName() const noexcept { return _name; }
+ [[nodiscard]] bool isTop() const noexcept { return !_up; }
+ [[nodiscard]] bool isBottom() const noexcept { return !_down; }
+ [[nodiscard]] unsigned int size() const noexcept {
+ return (isBottom() ? 1 : _down->size() + 1);
+ }
/** Adds the link to the end of the chain. */
void push_back(StorageLink::UP);
/** Get the current state of the storage link. */
- State getState() const noexcept { return _state.load(std::memory_order_relaxed); }
+ [[nodiscard]] State getState() const noexcept {
+ return _state.load(std::memory_order_relaxed);
+ }
/**
* Called by storage server after the storage chain have been created.
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ed279f53cf0..37ee7cc2301 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -217,7 +217,7 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
- : StorageLink("Communication manager"),
+ : StorageLink("Communication manager", true), // Explicitly allow msg down during flushing (will be bounced)
_component(compReg, "communicationmanager"),
_metrics(),
_shared_rpc_resources(), // Created upon initial configuration