summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-10-10 13:05:33 +0200
committerGitHub <noreply@github.com>2023-10-10 13:05:33 +0200
commit2f635d64e4ba166db3b15854cc0339e0bcdb2f3d (patch)
tree1a461ea9fc22418035d4d6fcc77174ce7f453ee3 /storage
parentf606c6d91246e2f029da40c66a1f7ad827c7c14a (diff)
parent1689de5f1f2b3a1adb55d60b17156deb2ab72281 (diff)
Merge pull request #28825 from vespa-engine/vekterli/ensure-internal-messages-flushed-prior-to-rpc-shutdown
Ensure internal messages are flushed before shutting down RPC subsystem
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/bouncertest.cpp43
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp7
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.cpp124
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.h10
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp58
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h2
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp2
9 files changed, 164 insertions, 87 deletions
diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp
index c41696e1a02..5b7d279537e 100644
--- a/storage/src/tests/storageserver/bouncertest.cpp
+++ b/storage/src/tests/storageserver/bouncertest.cpp
@@ -51,9 +51,10 @@ struct BouncerTest : public Test {
api::Timestamp timestamp,
document::BucketSpace bucketSpace);
- void expectMessageBouncedWithRejection();
- void expectMessageBouncedWithAbort();
- void expectMessageNotBounced();
+ void expectMessageBouncedWithRejection() const;
+ void expect_message_bounced_with_node_down_abort() const;
+ void expect_message_bounced_with_shutdown_abort() const;
+ void expectMessageNotBounced() const;
};
BouncerTest::BouncerTest()
@@ -181,7 +182,7 @@ TEST_F(BouncerTest, allow_notify_bucket_change_even_when_distributor_down) {
}
void
-BouncerTest::expectMessageBouncedWithRejection()
+BouncerTest::expectMessageBouncedWithRejection() const
{
ASSERT_EQ(1, _upper->getNumReplies());
EXPECT_EQ(0, _upper->getNumCommands());
@@ -191,7 +192,7 @@ BouncerTest::expectMessageBouncedWithRejection()
}
void
-BouncerTest::expectMessageBouncedWithAbort()
+BouncerTest::expect_message_bounced_with_node_down_abort() const
{
ASSERT_EQ(1, _upper->getNumReplies());
EXPECT_EQ(0, _upper->getNumCommands());
@@ -204,7 +205,17 @@ BouncerTest::expectMessageBouncedWithAbort()
}
void
-BouncerTest::expectMessageNotBounced()
+BouncerTest::expect_message_bounced_with_shutdown_abort() const
+{
+ ASSERT_EQ(1, _upper->getNumReplies());
+ EXPECT_EQ(0, _upper->getNumCommands());
+ auto& reply = dynamic_cast<api::StorageReply&>(*_upper->getReply(0));
+ EXPECT_EQ(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"), reply.getResult());
+ EXPECT_EQ(0, _lower->getNumCommands());
+}
+
+void
+BouncerTest::expectMessageNotBounced() const
{
EXPECT_EQ(size_t(0), _upper->getNumReplies());
EXPECT_EQ(size_t(1), _lower->getNumCommands());
@@ -296,7 +307,7 @@ TEST_F(BouncerTest, abort_request_when_derived_bucket_space_node_state_is_marked
auto state = makeClusterStateBundle("distributor:3 storage:3", {{ document::FixedBucketSpaces::default_space(), "distributor:3 storage:3 .2.s:d" }});
_node->getNodeStateUpdater().setClusterStateBundle(state);
_upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space()));
- expectMessageBouncedWithAbort();
+ expect_message_bounced_with_node_down_abort();
EXPECT_EQ(1, _manager->metrics().unavailable_node_aborts.getValue());
_upper->reset();
@@ -362,5 +373,23 @@ TEST_F(BouncerTest, operation_with_sufficient_bucket_bits_is_not_rejected) {
expectMessageNotBounced();
}
+TEST_F(BouncerTest, requests_are_rejected_after_close) {
+ _manager->close();
+ _upper->sendDown(createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space()));
+ expect_message_bounced_with_shutdown_abort();
+}
+
+TEST_F(BouncerTest, replies_are_swallowed_after_close) {
+ _manager->close();
+ auto req = createDummyFeedMessage(11 * 1000000, document::FixedBucketSpaces::default_space());
+ auto reply = req->makeReply();
+ _upper->sendDown(std::move(reply));
+
+ EXPECT_EQ(0, _upper->getNumCommands());
+ EXPECT_EQ(0, _upper->getNumReplies());
+ EXPECT_EQ(0, _lower->getNumCommands());
+ EXPECT_EQ(0, _lower->getNumReplies());
+}
+
} // storage
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index beccd605650..feed32f9b94 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -281,15 +281,14 @@ Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, vespalib::duration tim
void
Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) {
- {
- std::lock_guard sync(_lock);
- _queue.emplace(std::move(msg));
- }
+ std::lock_guard sync(_lock);
+ _queue.emplace(std::move(msg));
_cond.notify_one();
}
void
Queue::signal() {
+ std::lock_guard sync(_lock);
_cond.notify_one();
}
diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp
index 404058325b9..fa98afe9489 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.cpp
+++ b/storage/src/vespa/storage/storageserver/bouncer.cpp
@@ -30,19 +30,19 @@ Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & co
_derivedNodeStates(),
_clusterState(&lib::State::UP),
_configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
- _metrics(std::make_unique<BouncerMetrics>())
+ _metrics(std::make_unique<BouncerMetrics>()),
+ _closed(false)
{
_component.getStateUpdater().addStateListener(*this);
_component.registerMetric(*_metrics);
// Register for config. Normally not critical, so catching config
// exception allowing program to continue if missing/faulty config.
- try{
+ try {
if (!configUri.empty()) {
_configFetcher->subscribe<vespa::config::content::core::StorBouncerConfig>(configUri.getConfigId(), this);
_configFetcher->start();
} else {
- LOG(info, "No config id specified. Using defaults rather than "
- "config");
+ LOG(info, "No config id specified. Using defaults rather than config");
}
} catch (config::InvalidConfigException& e) {
LOG(info, "Bouncer failed to load config '%s'. This "
@@ -70,6 +70,8 @@ Bouncer::onClose()
{
_configFetcher->close();
_component.getStateUpdater().removeStateListener(*this);
+ std::lock_guard guard(_lock);
+ _closed = true;
}
void
@@ -86,8 +88,7 @@ const BouncerMetrics& Bouncer::metrics() const noexcept {
}
void
-Bouncer::validateConfig(
- const vespa::config::content::core::StorBouncerConfig& newConfig) const
+Bouncer::validateConfig(const vespa::config::content::core::StorBouncerConfig& newConfig) const
{
if (newConfig.feedRejectionPriorityThreshold != -1) {
if (newConfig.feedRejectionPriorityThreshold
@@ -112,12 +113,11 @@ void Bouncer::append_node_identity(std::ostream& target_stream) const {
}
void
-Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg,
- const lib::State& state)
+Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::State& state)
{
// If we're not up or retired, fail due to this nodes state.
std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ static_cast<api::StorageCommand&>(msg).makeReply());
std::ostringstream ost;
ost << "We don't allow command of type " << msg.getType()
<< " when node is in state " << state.toString(true);
@@ -128,8 +128,7 @@ Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg,
}
void
-Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg,
- int maxClockSkewInSeconds)
+Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxClockSkewInSeconds)
{
auto& as_cmd = dynamic_cast<api::StorageCommand&>(msg);
std::ostringstream ost;
@@ -140,7 +139,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg,
as_cmd.getSourceIndex(), ost.str().c_str());
_metrics->clock_skew_aborts.inc();
- std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
sendUp(reply);
}
@@ -148,8 +147,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg,
void
Bouncer::abortCommandDueToClusterDown(api::StorageMessage& msg, const lib::State& cluster_state)
{
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply());
std::ostringstream ost;
ost << "We don't allow external load while cluster is in state "
<< cluster_state.toString(true);
@@ -172,35 +170,35 @@ uint64_t
Bouncer::extractMutationTimestampIfAny(const api::StorageMessage& msg)
{
switch (msg.getType().getId()) {
- case api::MessageType::PUT_ID:
- return static_cast<const api::PutCommand&>(msg).getTimestamp();
- case api::MessageType::REMOVE_ID:
- return static_cast<const api::RemoveCommand&>(msg).getTimestamp();
- case api::MessageType::UPDATE_ID:
- return static_cast<const api::UpdateCommand&>(msg).getTimestamp();
- default:
- return 0;
+ case api::MessageType::PUT_ID:
+ return static_cast<const api::PutCommand&>(msg).getTimestamp();
+ case api::MessageType::REMOVE_ID:
+ return static_cast<const api::RemoveCommand&>(msg).getTimestamp();
+ case api::MessageType::UPDATE_ID:
+ return static_cast<const api::UpdateCommand&>(msg).getTimestamp();
+ default:
+ return 0;
}
}
bool
-Bouncer::isExternalLoad(const api::MessageType& type) const noexcept
+Bouncer::isExternalLoad(const api::MessageType& type) noexcept
{
switch (type.getId()) {
- case api::MessageType::PUT_ID:
- case api::MessageType::REMOVE_ID:
- case api::MessageType::UPDATE_ID:
- case api::MessageType::GET_ID:
- case api::MessageType::VISITOR_CREATE_ID:
- case api::MessageType::STATBUCKET_ID:
- return true;
- default:
- return false;
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::GET_ID:
+ case api::MessageType::VISITOR_CREATE_ID:
+ case api::MessageType::STATBUCKET_ID:
+ return true;
+ default:
+ return false;
}
}
bool
-Bouncer::isExternalWriteOperation(const api::MessageType& type) const noexcept {
+Bouncer::isExternalWriteOperation(const api::MessageType& type) noexcept {
switch (type.getId()) {
case api::MessageType::PUT_ID:
case api::MessageType::REMOVE_ID:
@@ -216,8 +214,7 @@ Bouncer::rejectDueToInsufficientPriority(
api::StorageMessage& msg,
api::StorageMessage::Priority feedPriorityLowerBound)
{
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply());
std::ostringstream ost;
ost << "Operation priority (" << int(msg.getPriority())
<< ") is lower than currently configured threshold ("
@@ -231,8 +228,7 @@ Bouncer::rejectDueToInsufficientPriority(
void
Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) {
- std::shared_ptr<api::StorageReply> reply(
- dynamic_cast<api::StorageCommand&>(msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
vespalib::make_string("Operation bucket %s has too few bits used (%u < minimum of %u)",
msg.getBucketId().toString().c_str(),
@@ -241,31 +237,22 @@ Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) {
sendUp(reply);
}
+void
+Bouncer::reject_due_to_node_shutdown(api::StorageMessage& msg) {
+ std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"));
+ sendUp(reply);
+}
+
bool
Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
- const api::MessageType& type(msg->getType());
- // All replies can come in.
- if (type.isReply()) {
- return false;
- }
-
- switch (type.getId()) {
- case api::MessageType::SETNODESTATE_ID:
- case api::MessageType::GETNODESTATE_ID:
- case api::MessageType::SETSYSTEMSTATE_ID:
- case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
- case api::MessageType::NOTIFYBUCKETCHANGE_ID:
- // state commands are always ok
- return false;
- default:
- break;
- }
const lib::State* state;
int maxClockSkewInSeconds;
bool isInAvailableState;
bool abortLoadWhenClusterDown;
- const lib::State *cluster_state;
+ bool closed;
+ const lib::State* cluster_state;
int feedPriorityLowerBound;
{
std::lock_guard lock(_lock);
@@ -275,7 +262,34 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg)
cluster_state = _clusterState;
isInAvailableState = state->oneOf(_config->stopAllLoadWhenNodestateNotIn.c_str());
feedPriorityLowerBound = _config->feedRejectionPriorityThreshold;
+ closed = _closed;
+ }
+ const api::MessageType& type = msg->getType();
+ // If the node is shutting down, we want to prevent _any_ messages from reaching
+ // components further down the call chain. This means this case must be handled
+ // _before_ any logic that explicitly allows through certain message types.
+ if (closed) [[unlikely]] {
+ if (!type.isReply()) {
+ reject_due_to_node_shutdown(*msg);
+ } // else: swallow all replies
+ return true;
}
+ // All replies can come in.
+ if (type.isReply()) {
+ return false;
+ }
+ switch (type.getId()) {
+ case api::MessageType::SETNODESTATE_ID:
+ case api::MessageType::GETNODESTATE_ID:
+ case api::MessageType::SETSYSTEMSTATE_ID:
+ case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
+ case api::MessageType::NOTIFYBUCKETCHANGE_ID:
+ // state commands are always ok
+ return false;
+ default:
+ break;
+ }
+
// Special case for point lookup Gets while node is in maintenance mode
// to allow reads to complete during two-phase cluster state transitions
if ((*state == lib::State::MAINTENANCE) && (type.getId() == api::MessageType::GET_ID) && clusterIsUp(*cluster_state)) {
diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h
index 78f07f10316..1038e94ee94 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.h
+++ b/storage/src/vespa/storage/storageserver/bouncer.h
@@ -41,6 +41,7 @@ class Bouncer : public StorageLink,
const lib::State* _clusterState;
std::unique_ptr<config::ConfigFetcher> _configFetcher;
std::unique_ptr<BouncerMetrics> _metrics;
+ bool _closed;
public:
Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri);
@@ -60,11 +61,12 @@ private:
void abortCommandDueToClusterDown(api::StorageMessage&, const lib::State&);
void rejectDueToInsufficientPriority(api::StorageMessage&, api::StorageMessage::Priority);
void reject_due_to_too_few_bucket_bits(api::StorageMessage&);
+ void reject_due_to_node_shutdown(api::StorageMessage&);
static bool clusterIsUp(const lib::State& cluster_state);
bool isDistributor() const;
- bool isExternalLoad(const api::MessageType&) const noexcept;
- bool isExternalWriteOperation(const api::MessageType&) const noexcept;
- bool priorityRejectionIsEnabled(int configuredPriority) const noexcept {
+ static bool isExternalLoad(const api::MessageType&) noexcept;
+ static bool isExternalWriteOperation(const api::MessageType&) noexcept;
+ static bool priorityRejectionIsEnabled(int configuredPriority) noexcept {
return (configuredPriority != -1);
}
@@ -72,7 +74,7 @@ private:
* If msg is a command containing a mutating timestamp (put, remove or
* update commands), return that timestamp. Otherwise, return 0.
*/
- uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg);
+ static uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg);
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
void handleNewState() noexcept override;
const lib::NodeState &getDerivedNodeState(document::BucketSpace bucketSpace) const;
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 610d9c8d707..ed279f53cf0 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -278,25 +278,25 @@ CommunicationManager::onClose()
// Avoid getting config during shutdown
_configFetcher.reset();
- _closed = true;
-
- if (_mbus) {
- if (_messageBusSession) {
- _messageBusSession->close();
- }
- }
-
- // TODO remove? this no longer has any particularly useful semantics
+ _closed.store(true, std::memory_order_seq_cst);
if (_cc_rpc_service) {
- _cc_rpc_service->close();
+ _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on
}
- // TODO do this after we drain queues?
+ // Sync all RPC threads to ensure that any subsequent RPCs must observe the closed-flags we just set
if (_shared_rpc_resources) {
- _shared_rpc_resources->shutdown();
+ _shared_rpc_resources->sync_all_threads();
+ }
+
+ if (_mbus && _messageBusSession) {
+ // Closing the mbus session unregisters the destination session and syncs the worker
+ // thread(s), so once this call returns we should not observe further incoming requests
+ // through this pipeline. Previous messages may already be in flight internally; these
+ // will be handled by flushing-phases.
+ _messageBusSession->close();
}
- // Stopping pumper thread should stop all incoming messages from being
- // processed.
+ // Stopping internal message dispatch thread should stop all incoming _async_ messages
+ // from being processed. _Synchronously_ dispatched RPCs are still passing through.
if (_thread) {
_thread->interrupt();
_eventQueue.signal();
@@ -305,13 +305,12 @@ CommunicationManager::onClose()
}
// Emptying remaining queued messages
- // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this
std::shared_ptr<api::StorageMessage> msg;
api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down");
while (_eventQueue.size() > 0) {
assert(_eventQueue.getNext(msg, 0ms));
if (!msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(*msg).makeReply());
reply->setResult(code);
sendReply(reply);
}
@@ -319,6 +318,29 @@ CommunicationManager::onClose()
}
void
+CommunicationManager::onFlush(bool downwards)
+{
+ if (downwards) {
+ // Sync RPC threads once more (with feeling!) to ensure that any closing done by other components
+ // during the storage chain onClose() is visible to these.
+ if (_shared_rpc_resources) {
+ _shared_rpc_resources->sync_all_threads();
+ }
+ // By this point, no inbound RPCs (requests and responses) should be allowed any further down
+ // than the Bouncer component, where they will be, well, bounced.
+ } else {
+ // All components further down the storage chain should now be completely closed
+ // and flushed, and all message-dispatching threads should have been shut down.
+ // It's possible that the RPC threads are still butting heads up against the Bouncer
+ // component, so we conclude the shutdown ceremony by taking down the RPC subsystem.
+ // This transitively waits for all RPC threads to complete.
+ if (_shared_rpc_resources) {
+ _shared_rpc_resources->shutdown();
+ }
+ }
+}
+
+void
CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig& cfg)
{
const bool isDist(_component.getNodeType() == lib::NodeType::DISTRIBUTOR);
@@ -438,11 +460,15 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg)
}
}
+// Called directly by RPC threads
void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
+ // If process is shutting down, msg will be synchronously aborted by the Bouncer component
process(msg);
}
+// Called directly by RPC threads (for incoming CC requests) and by any other request-dispatching
+// threads (i.e. calling sendUp) when address resolution fails and an internal error response is generated.
void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) {
LOG(spam, "Enqueued dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority());
_eventQueue.enqueue(std::move(msg));
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index da45124ed2d..3c986c59c5e 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -89,6 +89,7 @@ private:
void onOpen() override;
void onClose() override;
+ void onFlush(bool downwards) override;
void process(const std::shared_ptr<api::StorageMessage>& msg);
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 172084662e2..eb933f5eb2c 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -105,6 +105,10 @@ void SharedRpcResources::wait_until_slobrok_is_ready() {
}
}
+void SharedRpcResources::sync_all_threads() {
+ _transport->sync();
+}
+
void SharedRpcResources::shutdown() {
assert(!_shutdown);
if (listen_port() > 0) {
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index 1da89dd8869..d8f7eefad53 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -42,6 +42,8 @@ public:
// To be called after all RPC handlers have been registered.
void start_server_and_register_slobrok(vespalib::stringref my_handle);
+ void sync_all_threads();
+
void shutdown();
[[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 3231deef268..452a94496af 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -358,7 +358,7 @@ StorageNode::shutdown()
{
// Try to shut down in opposite order of initialize. Bear in mind that
// we might be shutting down after init exception causing only parts
- // of the server to have initialize
+ // of the server to have been initialized
LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str());
if (!attemptedStopped()) {
LOG(debug, "Storage killed before requestShutdown() was called. No "