summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2023-10-11 16:27:16 +0200
committerGitHub <noreply@github.com>2023-10-11 16:27:16 +0200
commit4414b198b94675cac60883c3f206089973d5e600 (patch)
tree3bfc3566f1e0ada80b7383607c638b9db757a15c /storage
parent0ea942082fb2c7c3d0d67b61dbdd0c8f89f77a57 (diff)
Revert "Ensure internal messages are flushed before shutting down RPC subsystem, take 2"
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/bouncertest.cpp43
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp36
-rw-r--r--storage/src/vespa/storage/common/storagelink.h26
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.cpp124
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.h10
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp60
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h2
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp2
10 files changed, 102 insertions, 206 deletions
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp
index 5b7d279537e..c41696e1a02 100644
--- a/storage/src/tests/storageserver/bouncertest.cpp
+++ b/storage/src/tests/storageserver/bouncertest.cpp
@@ -51,10 +51,9 @@ struct BouncerTest : public Test {
api::Timestamp timestamp,
document::BucketSpace bucketSpace);
- void expectMessageBouncedWithRejection() const;
- void expect_message_bounced_with_node_down_abort() const;
- void expect_message_bounced_with_shutdown_abort() const;
- void expectMessageNotBounced() const;
+ void expectMessageBouncedWithRejection();
+ void expectMessageBouncedWithAbort();
+ void expectMessageNotBounced();
};
BouncerTest::BouncerTest()
@@ -182,7 +181,7 @@ TEST_F(BouncerTest, allow_notify_bucket_change_even_when_distributor_down) {
}
void
-BouncerTest::expectMessageBouncedWithRejection() const
+BouncerTest::expectMessageBouncedWithRejection()
{
ASSERT_EQ(1, _upper->getNumReplies());
EXPECT_EQ(0, _upper->getNumCommands());
@@ -192,7 +191,7 @@ BouncerTest::expectMessageBouncedWithRejection() const
}
void
-BouncerTest::expect_message_bounced_with_node_down_abort() const
+BouncerTest::expectMessageBouncedWithAbort()
{
ASSERT_EQ(1, _upper->getNumReplies());
EXPECT_EQ(0, _upper->getNumCommands());
@@ -205,17 +204,7 @@ BouncerTest::expect_message_bounced_with_node_down_abort() const
}
void
-BouncerTest::expect_message_bounced_with_shutdown_abort() const
-{
- ASSERT_EQ(1, _upper->getNumReplies());
- EXPECT_EQ(0, _upper->getNumCommands());
- auto& reply = dynamic_cast<api::StorageReply&>(*_upper->getReply(0));
- EXPECT_EQ(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"), reply.getResult());
- EXPECT_EQ(0, _lower->getNumCommands());
-}
-
-void
-BouncerTest::expectMessageNotBounced() const
+BouncerTest::expectMessageNotBounced()
{
EXPECT_EQ(size_t(0), _upper->getNumReplies());
EXPECT_EQ(size_t(1), _lower->getNumCommands());
@@ -307,7 +296,7 @@ TEST_F(BouncerTest, abort_request_when_derived_bucket_space_node_state_is_marked
auto state = makeClusterStateBundle("distributor:3 storage:3", {{ document::FixedBucketSpaces::default_space(), "distributor:3 storage:3 .2.s:d" }});
_node->getNodeStateUpdater().setClusterStateBundle(state);
_upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space()));
- expect_message_bounced_with_node_down_abort();
+ expectMessageBouncedWithAbort();
EXPECT_EQ(1, _manager->metrics().unavailable_node_aborts.getValue());
_upper->reset();
@@ -373,23 +362,5 @@ TEST_F(BouncerTest, operation_with_sufficient_bucket_bits_is_not_rejected) {
expectMessageNotBounced();
}
-TEST_F(BouncerTest, requests_are_rejected_after_close) {
- _manager->close();
- _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space()));
- expect_message_bounced_with_shutdown_abort();
-}
-
-TEST_F(BouncerTest, replies_are_swallowed_after_close) {
- _manager->close();
- auto req = createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space());
- auto reply = req->makeReply();
- _upper->sendDown(std::move(reply));
-
- EXPECT_EQ(0, _upper->getNumCommands());
- EXPECT_EQ(0, _upper->getNumReplies());
- EXPECT_EQ(0, _lower->getNumCommands());
- EXPECT_EQ(0, _lower->getNumReplies());
-}
-
} // storage
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index e774e6967b9..beccd605650 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -14,20 +14,6 @@ using namespace storage::api;
namespace storage {
-StorageLink::StorageLink(const std::string& name, bool allow_msg_down_during_flushing)
- : _name(name),
- _up(nullptr),
- _down(),
- _state(CREATED),
- _allow_msg_down_during_flushing(allow_msg_down_during_flushing)
-{
-}
-
-StorageLink::StorageLink(const std::string& name)
- : StorageLink(name, false)
-{
-}
-
StorageLink::~StorageLink() {
LOG(debug, "Destructing link %s.", toString().c_str());
}
@@ -143,15 +129,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg)
case CLOSING:
case FLUSHINGDOWN:
break;
- case FLUSHINGUP:
- if (_allow_msg_down_during_flushing) {
- break;
- }
- [[fallthrough]];
default:
- LOG(error, "Link %s trying to send %s down while in state %s. Stacktrace: %s",
- toString().c_str(), msg->toString().c_str(), stateToString(getState()),
- vespalib::getStackTrace(0).c_str());
+ LOG(error, "Link %s trying to send %s down while in state %s",
+ toString().c_str(), msg->toString().c_str(), stateToString(getState()));
assert(false);
}
assert(msg);
@@ -192,9 +172,8 @@ void StorageLink::sendUp(const std::shared_ptr<StorageMessage> & msg)
case FLUSHINGUP:
break;
default:
- LOG(error, "Link %s trying to send %s up while in state %s. Stacktrace: %s",
- toString().c_str(), msg->toString(true).c_str(), stateToString(getState()),
- vespalib::getStackTrace(0).c_str());
+ LOG(error, "Link %s trying to send %s up while in state %s",
+ toString().c_str(), msg->toString(true).c_str(), stateToString(getState()));
assert(false);
}
assert(msg);
@@ -302,14 +281,15 @@ Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, vespalib::duration tim
void
Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) {
- std::lock_guard sync(_lock);
- _queue.emplace(std::move(msg));
+ {
+ std::lock_guard sync(_lock);
+ _queue.emplace(std::move(msg));
+ }
_cond.notify_one();
}
void
Queue::signal() {
- std::lock_guard sync(_lock);
_cond.notify_one();
}
diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h
index 1682804a746..2b470d029d8 100644
--- a/storage/src/vespa/storage/common/storagelink.h
+++ b/storage/src/vespa/storage/common/storagelink.h
@@ -42,34 +42,28 @@ public:
enum State { CREATED, OPENED, CLOSING, FLUSHINGDOWN, FLUSHINGUP, CLOSED };
private:
- const std::string _name;
- StorageLink* _up;
+ std::string _name;
+ StorageLink* _up;
std::unique_ptr<StorageLink> _down;
- std::atomic<State> _state;
- const bool _allow_msg_down_during_flushing;
+ std::atomic<State> _state;
public:
- StorageLink(const std::string& name, bool allow_msg_down_during_flushing);
- explicit StorageLink(const std::string& name);
-
StorageLink(const StorageLink &) = delete;
StorageLink & operator = (const StorageLink &) = delete;
+ StorageLink(const std::string& name)
+ : _name(name), _up(0), _down(), _state(CREATED) {}
~StorageLink() override;
- const std::string& getName() const noexcept { return _name; }
- [[nodiscard]] bool isTop() const noexcept { return !_up; }
- [[nodiscard]] bool isBottom() const noexcept { return !_down; }
- [[nodiscard]] unsigned int size() const noexcept {
- return (isBottom() ? 1 : _down->size() + 1);
- }
+ const std::string& getName() const { return _name; }
+ bool isTop() const { return (_up == 0); }
+ bool isBottom() const { return (_down.get() == 0); }
+ unsigned int size() const { return (isBottom() ? 1 : _down->size() + 1); }
/** Adds the link to the end of the chain. */
void push_back(StorageLink::UP);
/** Get the current state of the storage link. */
- [[nodiscard]] State getState() const noexcept {
- return _state.load(std::memory_order_relaxed);
- }
+ State getState() const noexcept { return _state.load(std::memory_order_relaxed); }
/**
* Called by storage server after the storage chain have been created.
diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp
index fa98afe9489..404058325b9 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.cpp
+++ b/storage/src/vespa/storage/storageserver/bouncer.cpp
@@ -30,19 +30,19 @@ Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & co
_derivedNodeStates(),
_clusterState(&lib::State::UP),
_configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
- _metrics(std::make_unique<BouncerMetrics>()),
- _closed(false)
+ _metrics(std::make_unique<BouncerMetrics>())
{
_component.getStateUpdater().addStateListener(*this);
_component.registerMetric(*_metrics);
// Register for config. Normally not critical, so catching config
// exception allowing program to continue if missing/faulty config.
- try {
+ try{
if (!configUri.empty()) {
_configFetcher->subscribe<vespa::config::content::core::StorBouncerConfig>(configUri.getConfigId(), this);
_configFetcher->start();
} else {
- LOG(info, "No config id specified. Using defaults rather than config");
+ LOG(info, "No config id specified. Using defaults rather than "
+ "config");
}
} catch (config::InvalidConfigException& e) {
LOG(info, "Bouncer failed to load config '%s'. This "
@@ -70,8 +70,6 @@ Bouncer::onClose()
{
_configFetcher->close();
_component.getStateUpdater().removeStateListener(*this);
- std::lock_guard guard(_lock);
- _closed = true;
}
void
@@ -88,7 +86,8 @@ const BouncerMetrics& Bouncer::metrics() const noexcept {
}
void
-Bouncer::validateConfig(const vespa::config::content::core::StorBouncerConfig& newConfig) const
+Bouncer::validateConfig(
+ const vespa::config::content::core::StorBouncerConfig& newConfig) const
{
if (newConfig.feedRejectionPriorityThreshold != -1) {
if (newConfig.feedRejectionPriorityThreshold
@@ -113,11 +112,12 @@ void Bouncer::append_node_identity(std::ostream& target_stream) const {
}
void
-Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::State& state)
+Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg,
+ const lib::State& state)
{
// If we're not up or retired, fail due to this nodes state.
std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply());
+ static_cast<api::StorageCommand&>(msg).makeReply().release());
std::ostringstream ost;
ost << "We don't allow command of type " << msg.getType()
<< " when node is in state " << state.toString(true);
@@ -128,7 +128,8 @@ Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::Sta
}
void
-Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxClockSkewInSeconds)
+Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg,
+ int maxClockSkewInSeconds)
{
auto& as_cmd = dynamic_cast<api::StorageCommand&>(msg);
std::ostringstream ost;
@@ -139,7 +140,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxCloc
as_cmd.getSourceIndex(), ost.str().c_str());
_metrics->clock_skew_aborts.inc();
- std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply());
+ std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply().release());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
sendUp(reply);
}
@@ -147,7 +148,8 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxCloc
void
Bouncer::abortCommandDueToClusterDown(api::StorageMessage& msg, const lib::State& cluster_state)
{
- std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(
+ static_cast<api::StorageCommand&>(msg).makeReply().release());
std::ostringstream ost;
ost << "We don't allow external load while cluster is in state "
<< cluster_state.toString(true);
@@ -170,35 +172,35 @@ uint64_t
Bouncer::extractMutationTimestampIfAny(const api::StorageMessage& msg)
{
switch (msg.getType().getId()) {
- case api::MessageType::PUT_ID:
- return static_cast<const api::PutCommand&>(msg).getTimestamp();
- case api::MessageType::REMOVE_ID:
- return static_cast<const api::RemoveCommand&>(msg).getTimestamp();
- case api::MessageType::UPDATE_ID:
- return static_cast<const api::UpdateCommand&>(msg).getTimestamp();
- default:
- return 0;
+ case api::MessageType::PUT_ID:
+ return static_cast<const api::PutCommand&>(msg).getTimestamp();
+ case api::MessageType::REMOVE_ID:
+ return static_cast<const api::RemoveCommand&>(msg).getTimestamp();
+ case api::MessageType::UPDATE_ID:
+ return static_cast<const api::UpdateCommand&>(msg).getTimestamp();
+ default:
+ return 0;
}
}
bool
-Bouncer::isExternalLoad(const api::MessageType& type) noexcept
+Bouncer::isExternalLoad(const api::MessageType& type) const noexcept
{
switch (type.getId()) {
- case api::MessageType::PUT_ID:
- case api::MessageType::REMOVE_ID:
- case api::MessageType::UPDATE_ID:
- case api::MessageType::GET_ID:
- case api::MessageType::VISITOR_CREATE_ID:
- case api::MessageType::STATBUCKET_ID:
- return true;
- default:
- return false;
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::GET_ID:
+ case api::MessageType::VISITOR_CREATE_ID:
+ case api::MessageType::STATBUCKET_ID:
+ return true;
+ default:
+ return false;
}
}
bool
-Bouncer::isExternalWriteOperation(const api::MessageType& type) noexcept {
+Bouncer::isExternalWriteOperation(const api::MessageType& type) const noexcept {
switch (type.getId()) {
case api::MessageType::PUT_ID:
case api::MessageType::REMOVE_ID:
@@ -214,7 +216,8 @@ Bouncer::rejectDueToInsufficientPriority(
api::StorageMessage& msg,
api::StorageMessage::Priority feedPriorityLowerBound)
{
- std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(
+ static_cast<api::StorageCommand&>(msg).makeReply().release());
std::ostringstream ost;
ost << "Operation priority (" << int(msg.getPriority())
<< ") is lower than currently configured threshold ("
@@ -228,7 +231,8 @@ Bouncer::rejectDueToInsufficientPriority(
void
Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) {
- std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(
+ dynamic_cast<api::StorageCommand&>(msg).makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
vespalib::make_string("Operation bucket %s has too few bits used (%u < minimum of %u)",
msg.getBucketId().toString().c_str(),
@@ -237,22 +241,31 @@ Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) {
sendUp(reply);
}
-void
-Bouncer::reject_due_to_node_shutdown(api::StorageMessage& msg) {
- std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"));
- sendUp(reply);
-}
-
bool
Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
+ const api::MessageType& type(msg->getType());
+ // All replies can come in.
+ if (type.isReply()) {
+ return false;
+ }
+
+ switch (type.getId()) {
+ case api::MessageType::SETNODESTATE_ID:
+ case api::MessageType::GETNODESTATE_ID:
+ case api::MessageType::SETSYSTEMSTATE_ID:
+ case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
+ case api::MessageType::NOTIFYBUCKETCHANGE_ID:
+ // state commands are always ok
+ return false;
+ default:
+ break;
+ }
const lib::State* state;
int maxClockSkewInSeconds;
bool isInAvailableState;
bool abortLoadWhenClusterDown;
- bool closed;
- const lib::State* cluster_state;
+ const lib::State *cluster_state;
int feedPriorityLowerBound;
{
std::lock_guard lock(_lock);
@@ -262,34 +275,7 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg)
cluster_state = _clusterState;
isInAvailableState = state->oneOf(_config->stopAllLoadWhenNodestateNotIn.c_str());
feedPriorityLowerBound = _config->feedRejectionPriorityThreshold;
- closed = _closed;
- }
- const api::MessageType& type = msg->getType();
- // If the node is shutting down, we want to prevent _any_ messages from reaching
- // components further down the call chain. This means this case must be handled
- // _before_ any logic that explicitly allows through certain message types.
- if (closed) [[unlikely]] {
- if (!type.isReply()) {
- reject_due_to_node_shutdown(*msg);
- } // else: swallow all replies
- return true;
}
- // All replies can come in.
- if (type.isReply()) {
- return false;
- }
- switch (type.getId()) {
- case api::MessageType::SETNODESTATE_ID:
- case api::MessageType::GETNODESTATE_ID:
- case api::MessageType::SETSYSTEMSTATE_ID:
- case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
- case api::MessageType::NOTIFYBUCKETCHANGE_ID:
- // state commands are always ok
- return false;
- default:
- break;
- }
-
// Special case for point lookup Gets while node is in maintenance mode
// to allow reads to complete during two-phase cluster state transitions
if ((*state == lib::State::MAINTENANCE) && (type.getId() == api::MessageType::GET_ID) && clusterIsUp(*cluster_state)) {
diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h
index 1038e94ee94..78f07f10316 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.h
+++ b/storage/src/vespa/storage/storageserver/bouncer.h
@@ -41,7 +41,6 @@ class Bouncer : public StorageLink,
const lib::State* _clusterState;
std::unique_ptr<config::ConfigFetcher> _configFetcher;
std::unique_ptr<BouncerMetrics> _metrics;
- bool _closed;
public:
Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri);
@@ -61,12 +60,11 @@ private:
void abortCommandDueToClusterDown(api::StorageMessage&, const lib::State&);
void rejectDueToInsufficientPriority(api::StorageMessage&, api::StorageMessage::Priority);
void reject_due_to_too_few_bucket_bits(api::StorageMessage&);
- void reject_due_to_node_shutdown(api::StorageMessage&);
static bool clusterIsUp(const lib::State& cluster_state);
bool isDistributor() const;
- static bool isExternalLoad(const api::MessageType&) noexcept;
- static bool isExternalWriteOperation(const api::MessageType&) noexcept;
- static bool priorityRejectionIsEnabled(int configuredPriority) noexcept {
+ bool isExternalLoad(const api::MessageType&) const noexcept;
+ bool isExternalWriteOperation(const api::MessageType&) const noexcept;
+ bool priorityRejectionIsEnabled(int configuredPriority) const noexcept {
return (configuredPriority != -1);
}
@@ -74,7 +72,7 @@ private:
* If msg is a command containing a mutating timestamp (put, remove or
* update commands), return that timestamp. Otherwise, return 0.
*/
- static uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg);
+ uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg);
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
void handleNewState() noexcept override;
const lib::NodeState &getDerivedNodeState(document::BucketSpace bucketSpace) const;
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 37ee7cc2301..610d9c8d707 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -217,7 +217,7 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun
}
CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
- : StorageLink("Communication manager", true), // Explicitly allow msg down during flushing (will be bounced)
+ : StorageLink("Communication manager"),
_component(compReg, "communicationmanager"),
_metrics(),
_shared_rpc_resources(), // Created upon initial configuration
@@ -278,25 +278,25 @@ CommunicationManager::onClose()
// Avoid getting config during shutdown
_configFetcher.reset();
- _closed.store(true, std::memory_order_seq_cst);
+ _closed = true;
+
+ if (_mbus) {
+ if (_messageBusSession) {
+ _messageBusSession->close();
+ }
+ }
+
+ // TODO remove? this no longer has any particularly useful semantics
if (_cc_rpc_service) {
- _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on
+ _cc_rpc_service->close();
}
- // Sync all RPC threads to ensure that any subsequent RPCs must observe the closed-flags we just set
+ // TODO do this after we drain queues?
if (_shared_rpc_resources) {
- _shared_rpc_resources->sync_all_threads();
- }
-
- if (_mbus && _messageBusSession) {
- // Closing the mbus session unregisters the destination session and syncs the worker
- // thread(s), so once this call returns we should not observe further incoming requests
- // through this pipeline. Previous messages may already be in flight internally; these
- // will be handled by flushing-phases.
- _messageBusSession->close();
+ _shared_rpc_resources->shutdown();
}
- // Stopping internal message dispatch thread should stop all incoming _async_ messages
- // from being processed. _Synchronously_ dispatched RPCs are still passing through.
+ // Stopping pumper thread should stop all incoming messages from being
+ // processed.
if (_thread) {
_thread->interrupt();
_eventQueue.signal();
@@ -305,12 +305,13 @@ CommunicationManager::onClose()
}
// Emptying remaining queued messages
+ // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this
std::shared_ptr<api::StorageMessage> msg;
api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down");
while (_eventQueue.size() > 0) {
assert(_eventQueue.getNext(msg, 0ms));
if (!msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(*msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply());
reply->setResult(code);
sendReply(reply);
}
@@ -318,29 +319,6 @@ CommunicationManager::onClose()
}
void
-CommunicationManager::onFlush(bool downwards)
-{
- if (downwards) {
- // Sync RPC threads once more (with feeling!) to ensure that any closing done by other components
- // during the storage chain onClose() is visible to these.
- if (_shared_rpc_resources) {
- _shared_rpc_resources->sync_all_threads();
- }
- // By this point, no inbound RPCs (requests and responses) should be allowed any further down
- // than the Bouncer component, where they will be, well, bounced.
- } else {
- // All components further down the storage chain should now be completely closed
- // and flushed, and all message-dispatching threads should have been shut down.
- // It's possible that the RPC threads are still butting heads up against the Bouncer
- // component, so we conclude the shutdown ceremony by taking down the RPC subsystem.
- // This transitively waits for all RPC threads to complete.
- if (_shared_rpc_resources) {
- _shared_rpc_resources->shutdown();
- }
- }
-}
-
-void
CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig& cfg)
{
const bool isDist(_component.getNodeType() == lib::NodeType::DISTRIBUTOR);
@@ -460,15 +438,11 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
}
}
-// Called directly by RPC threads
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
- // If process is shutting down, msg will be synchronously aborted by the Bouncer component
process(msg);
}
-// Called directly by RPC threads (for incoming CC requests) and by any other request-dispatching
-// threads (i.e. calling sendUp) when address resolution fails and an internal error response is generated.
void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Enqueued dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
_eventQueue.enqueue(std::move(msg));
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 3c986c59c5e..da45124ed2d 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -89,7 +89,6 @@ private:
void onOpen() override;
void onClose() override;
- void onFlush(bool downwards) override;
void process(const std::shared_ptr<api::StorageMessage>& msg);
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index eb933f5eb2c..172084662e2 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -105,10 +105,6 @@ void SharedRpcResources::wait_until_slobrok_is_ready() {
}
}
-void SharedRpcResources::sync_all_threads() {
- _transport->sync();
-}
-
void SharedRpcResources::shutdown() {
assert(!_shutdown);
if (listen_port() > 0) {
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index d8f7eefad53..1da89dd8869 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -42,8 +42,6 @@ public:
// To be called after all RPC handlers have been registered.
void start_server_and_register_slobrok(vespalib::stringref my_handle);
- void sync_all_threads();
-
void shutdown();
[[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 452a94496af..3231deef268 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -358,7 +358,7 @@ StorageNode::shutdown()
{
// Try to shut down in opposite order of initialize. Bear in mind that
// we might be shutting down after init exception causing only parts
- // of the server to have been initialized
+ // of the server to have initialize
LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str());
if (!attemptedStopped()) {
LOG(debug, "Storage killed before requestShutdown() was called. No "