summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-06-05 14:45:38 +0200
committerGitHub <noreply@github.com>2023-06-05 14:45:38 +0200
commit58a133e9a98f053426e9a94cf39ec05be1695779 (patch)
tree4f22ca8bf29a6c6ff454733a2df7eb81973c4016 /storage
parent8226d4766007f0091826044c126655f388522a43 (diff)
parenta00278879397db33d6e9593fb887d041bbc44867 (diff)
Merge pull request #27275 from vespa-engine/vekterli/do-not-block-comm-mgr-thread-when-aborting-ops
Avoid blocking CommunicationManager thread during cluster state transitions
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp57
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp147
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h50
4 files changed, 143 insertions, 114 deletions
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 4c83dde30da..ae2385a36d8 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -40,15 +40,11 @@ struct ChangedBucketOwnershipHandlerTest : Test {
uint16_t wantedOwner,
const lib::ClusterState& state);
- std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
- const lib::ClusterState& state) const
- {
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const lib::ClusterState& state) const {
return std::make_shared<api::SetSystemStateCommand>(state);
}
- std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
- const std::string& stateStr) const
- {
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const std::string& stateStr) const {
return createStateCmd(lib::ClusterState(stateStr));
}
@@ -71,11 +67,17 @@ struct ChangedBucketOwnershipHandlerTest : Test {
template <typename MsgType, typename... MsgParams>
void expectDownAbortsMessage(bool expected, MsgParams&& ... params);
- lib::ClusterState getDefaultTestClusterState() const {
+ std::shared_ptr<AbortBucketOperationsCommand> fetch_dispatched_abort_operations_command() {
+ _bottom->waitForMessages(2, 60); // abort cmd + set cluster state cmd
+ EXPECT_EQ(2, _bottom->getNumCommands());
+ return std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
+ }
+
+ static lib::ClusterState getDefaultTestClusterState() {
return lib::ClusterState("distributor:4 storage:1");
}
- lib::ClusterState getStorageDownTestClusterState() const {
+ static lib::ClusterState getStorageDownTestClusterState() {
return lib::ClusterState("distributor:4 storage:1 .0.s:d");
}
@@ -173,29 +175,26 @@ hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v)
bool
hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) {
+ link.waitForMessages(1, 60);
if (link.getNumCommands() != 1) {
std::cerr << "expected 1 command, found"
<< link.getNumCommands() << "\n";
}
- api::SetSystemStateCommand::SP cmd(
- std::dynamic_pointer_cast<api::SetSystemStateCommand>(
- link.getCommand(0)));
- return (cmd.get() != 0);
+ auto cmd = std::dynamic_pointer_cast<api::SetSystemStateCommand>(link.getCommand(0));
+ return static_cast<bool>(cmd);
}
}
void
-ChangedBucketOwnershipHandlerTest::applyDistribution(
- Redundancy redundancy, NodeCount nodeCount)
+ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount)
{
_app->setDistribution(redundancy, nodeCount);
_handler->storageDistributionChanged();
}
void
-ChangedBucketOwnershipHandlerTest::applyClusterState(
- const lib::ClusterState& state)
+ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& state)
{
_app->setClusterState(state);
_handler->reloadClusterState();
@@ -212,10 +211,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed
auto node2Buckets(insertBuckets(2, 2, stateBefore));
_top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1"));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
EXPECT_TRUE(hasAbortedAllOf(cmd, node3Buckets));
@@ -280,10 +277,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, down_edge_to_no_available_distributors
lib::ClusterState downState("distributor:3 .0.s:d .1.s:s .2.s:s storage:1");
_top->sendDown(createStateCmd(downState));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node0Buckets));
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
@@ -304,10 +299,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed
auto node2Buckets(insertBuckets(2, 2, stateAfter));
_top->sendDown(createStateCmd(stateAfter));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
EXPECT_TRUE(hasAbortedNoneOf(cmd, node0Buckets));
@@ -319,8 +312,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed
}
void
-ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(
- uint16_t fromDistributorIndex)
+ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(uint16_t fromDistributorIndex)
{
document::BucketId bucket(16, 6786);
auto msg = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bucket));
@@ -350,7 +342,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own
/**
* Generate and dispatch a message of the given type with the provided
- * aruments as if that message was sent from distributor 1. Messages will
+ * arguments as if that message was sent from distributor 1. Messages will
* be checked as if the state contains 4 distributors in Up state. This
* means that it suffices to send in a message with a bucket that is not
* owned by distributor 1 in this state to trigger an abort.
@@ -382,7 +374,7 @@ ChangedBucketOwnershipHandlerTest::expectChangeAbortsMessage(bool expected, MsgP
/**
* Generate and dispatch a message of the given type with the provided
- * aruments as if that message was sent from distributor 1. Messages will
+ * arguments as if that message was sent from distributor 1. Messages will
* be checked as if the state contains 4 distributors in Up state and storage
* node is down. This means that any abortable message will trigger an abort.
*/
@@ -394,6 +386,7 @@ ChangedBucketOwnershipHandlerTest::expectDownAbortsMessage(bool expected, MsgPar
(void) _bottom->getCommandsOnce();
ASSERT_NO_FATAL_FAILURE((expectChangeAbortsMessage<MsgType, MsgParams...>(false, std::forward<MsgParams>(params)...)));
_top->sendDown(createStateCmd(getStorageDownTestClusterState()));
+ _bottom->waitForMessages(3, 60);
ASSERT_EQ(_bottom->getNumCommands(), 3);
auto setSystemStateCommand = std::dynamic_pointer_cast<api::SetSystemStateCommand>(_bottom->getCommand(2));
ASSERT_TRUE(setSystemStateCommand);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index c0f4041e284..f6b7c7e5f0b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -184,7 +184,8 @@ computeAllPossibleHandlerThreads(const vespa::config::content::StorFilestorConfi
return cfg.numThreads +
computeNumResponseThreads(cfg.numResponseThreads) +
cfg.numNetworkThreads +
- cfg.numVisitorThreads;
+ cfg.numVisitorThreads +
+ 1; // Async cluster state processing thread (might be a pessimization to include here...)
}
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
index 9d7dd95d922..3b97ff6c018 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
@@ -28,6 +28,7 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler(
_component(compReg, "changedbucketownershiphandler"),
_metrics(),
_configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
+ _state_sync_executor(1), // single thread for sequential task execution
_stateLock(),
_currentState(), // Not set yet, so ownership will not be valid
_currentOwnership(std::make_shared<OwnershipState>(
@@ -98,7 +99,7 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner)
idealStateOpsAborted("ideal_state_ops_aborted", {}, "Number of outdated ideal state operations aborted", this),
externalLoadOpsAborted("external_load_ops_aborted", {}, "Number of outdated external load operations aborted", this)
{}
-ChangedBucketOwnershipHandler::Metrics::~Metrics() { }
+ChangedBucketOwnershipHandler::Metrics::~Metrics() = default;
ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo,
std::shared_ptr<const lib::ClusterStateBundle> state)
@@ -114,7 +115,7 @@ ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucke
}
-ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() {}
+ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() = default;
const lib::ClusterState&
@@ -235,18 +236,79 @@ ChangedBucketOwnershipHandler::makeLazyAbortPredicate(
_component.getIndex()));
}
-/*
- * If we go from:
- * 1) Not all down -> all distributors down
- * - abort ops for _all_ buckets
- * 2) All distributors down -> not down
- * - no-op, since down edge must have been handled first
- * 3) All down -> all down
- * - no-op
- * 4) Some nodes down or up
- * - abort ops for buckets that have changed ownership between
- * current and new cluster state.
- */
+class ChangedBucketOwnershipHandler::ClusterStateSyncAndApplyTask
+ : public vespalib::Executor::Task
+{
+ ChangedBucketOwnershipHandler& _owner;
+ std::shared_ptr<api::SetSystemStateCommand> _command;
+public:
+ ClusterStateSyncAndApplyTask(ChangedBucketOwnershipHandler& owner,
+ std::shared_ptr<api::SetSystemStateCommand> command) noexcept
+ : _owner(owner),
+ _command(std::move(command))
+ {}
+
+ /*
+ * If we go from:
+ * 1) Not all down -> all distributors down
+ * - abort ops for _all_ buckets
+ * 2) All distributors down -> not down
+ * - no-op, since down edge must have been handled first
+ * 3) All down -> all down
+ * - no-op
+ * 4) Some nodes down or up
+ * - abort ops for buckets that have changed ownership between
+ * current and new cluster state.
+ */
+ void run() override {
+ OwnershipState::CSP old_ownership;
+ OwnershipState::CSP new_ownership;
+ // Update the ownership state inspected by all bucket-mutating operations passing through
+ // this component so that messages from outdated distributors will be rejected. Note that
+ // this is best-effort; with our current multitude of RPC threads directly dispatching
+ // operations into the persistence provider, it's possible for a thread carrying an outdated
+ // operation to have already passed the barrier, but be preempted so that it will apply the
+ // op _after_ the abort step has completed.
+ {
+ std::lock_guard guard(_owner._stateLock);
+ old_ownership = _owner._currentOwnership;
+ _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle());
+ new_ownership = _owner._currentOwnership;
+ }
+ assert(new_ownership->valid());
+ // If we're going from not having a state to having a state, we per
+ // definition cannot possibly have gotten any load that needs aborting,
+ // as no such load is allowed through this component when this is the
+ // case.
+ if (!old_ownership->valid()) {
+ return _owner.sendDown(_command);
+ }
+
+ if (allDistributorsDownInState(old_ownership->getBaselineState())) {
+ LOG(debug, "No need to send aborts on transition '%s' -> '%s'",
+ old_ownership->getBaselineState().toString().c_str(),
+ new_ownership->getBaselineState().toString().c_str());
+ return _owner.sendDown(_command);;
+ }
+ _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState());
+
+ metrics::MetricTimer duration_timer;
+ auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership);
+ auto abort_cmd = std::make_shared<AbortBucketOperationsCommand>(std::move(predicate));
+
+ // Will not return until all operation aborts have been performed
+ // on the lower level links, at which point it is safe to send down
+ // the SetSystemStateCommand.
+ _owner.sendDown(abort_cmd);
+ duration_timer.stop(_owner._metrics.averageAbortProcessingTime);
+
+ // Conflicting operations have been aborted and incoming conflicting operations
+ // are aborted inline; send down the state command actually making the state change
+ // visible on the content node.
+ _owner.sendDown(_command);
+ }
+};
+
bool
ChangedBucketOwnershipHandler::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& stateCmd)
@@ -255,47 +317,13 @@ ChangedBucketOwnershipHandler::onSetSystemState(
LOG(debug, "Operation aborting is config-disabled");
return false; // Early out.
}
- OwnershipState::CSP oldOwnership;
- OwnershipState::CSP newOwnership;
- // Get old state and update own current cluster state _before_ it is
- // applied to the rest of the system. This helps ensure that no message
- // can get through in the off-case that the lower level storage links
- // don't apply the state immediately for some reason.
- {
- std::lock_guard guard(_stateLock);
- oldOwnership = _currentOwnership;
- setCurrentOwnershipWithStateNoLock(stateCmd->getClusterStateBundle());
- newOwnership = _currentOwnership;
- }
- assert(newOwnership->valid());
- // If we're going from not having a state to having a state, we per
- // definition cannot possibly have gotten any load that needs aborting,
- // as no such load is allowed through this component when this is the
- // case.
- if (!oldOwnership->valid()) {
- return false;
- }
-
- if (allDistributorsDownInState(oldOwnership->getBaselineState())) {
- LOG(debug, "No need to send aborts on transition '%s' -> '%s'",
- oldOwnership->getBaselineState().toString().c_str(),
- newOwnership->getBaselineState().toString().c_str());
- return false;
- }
- logTransition(oldOwnership->getBaselineState(), newOwnership->getBaselineState());
-
- metrics::MetricTimer durationTimer;
- auto predicate(makeLazyAbortPredicate(oldOwnership, newOwnership));
- AbortBucketOperationsCommand::SP cmd(
- new AbortBucketOperationsCommand(std::move(predicate)));
-
- // Will not return until all operation aborts have been performed
- // on the lower level links, at which point it is safe to send down
- // the SetSystemStateCommand.
- sendDown(cmd);
-
- durationTimer.stop(_metrics.averageAbortProcessingTime);
- return false;
+ // Dispatch to background worker. This indirection is because operations such as lid-space compaction
+ // may cause the implicit operation abort waiting step to block the caller for a relatively long time.
+ // It is very important that the executor only has 1 thread, which means this has FIFO behavior.
+ [[maybe_unused]] auto rejected_task = _state_sync_executor.execute(std::make_unique<ClusterStateSyncAndApplyTask>(*this, stateCmd));
+ // If this fails, we have processed a message _after_ onClose has been called, which should not happen.
+ assert(!rejected_task);
+ return true;
}
/**
@@ -411,8 +439,7 @@ ChangedBucketOwnershipHandler::onDown(
const std::shared_ptr<api::StorageMessage>& msg)
{
if (msg->getType() == api::MessageType::SETSYSTEMSTATE) {
- return onSetSystemState(
- std::static_pointer_cast<api::SetSystemStateCommand>(msg));
+ return onSetSystemState(std::static_pointer_cast<api::SetSystemStateCommand>(msg));
}
if (!isMutatingCommandAndNeedsChecking(*msg)) {
return false;
@@ -451,4 +478,10 @@ ChangedBucketOwnershipHandler::onInternalReply(
return (reply->getType() == AbortBucketOperationsReply::ID);
}
+void
+ChangedBucketOwnershipHandler::onClose()
+{
+ _state_sync_executor.shutdown().sync();
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
index e753d96871e..8798d109955 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
@@ -10,6 +10,7 @@
#include <vespa/metrics/valuemetric.h>
#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/metricset.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <atomic>
#include <vector>
#include <unordered_map>
@@ -60,26 +61,24 @@ class ChangedBucketOwnershipHandler
private config::IFetcherCallback<vespa::config::content::PersistenceConfig>
{
public:
- class Metrics : public metrics::MetricSet
- {
+ class Metrics : public metrics::MetricSet {
public:
metrics::LongAverageMetric averageAbortProcessingTime;
metrics::LongCountMetric idealStateOpsAborted;
metrics::LongCountMetric externalLoadOpsAborted;
- Metrics(metrics::MetricSet* owner = 0);
- ~Metrics();
+ explicit Metrics(metrics::MetricSet* owner = nullptr);
+ ~Metrics() override;
};
/**
* Wrapper around the distribution & state pairs that decides how to
* compute the owner distributor for a bucket. It's possible to have
* an ownership state with a nullptr cluster state when the node
- * initially starts up, which is why no owership state must be used unless
+ * initially starts up, which is why no ownership state must be used unless
* invoking valid() on it returns true.
*/
- class OwnershipState
- {
+ class OwnershipState {
using BucketSpace = document::BucketSpace;
std::unordered_map<BucketSpace, std::shared_ptr<const lib::Distribution>, BucketSpace::hash> _distributions;
std::shared_ptr<const lib::ClusterStateBundle> _state;
@@ -93,7 +92,7 @@ public:
static const uint16_t FAILED_TO_RESOLVE = 0xffff;
- bool valid() const {
+ [[nodiscard]] bool valid() const noexcept {
return (!_distributions.empty() && _state);
}
@@ -114,16 +113,21 @@ public:
void reloadClusterState();
private:
- ServiceLayerComponent _component;
- Metrics _metrics;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
- mutable std::mutex _stateLock;
- std::shared_ptr<const lib::ClusterStateBundle> _currentState;
- OwnershipState::CSP _currentOwnership;
-
- std::atomic<bool> _abortQueuedAndPendingOnStateChange;
- std::atomic<bool> _abortMutatingIdealStateOps;
- std::atomic<bool> _abortMutatingExternalLoadOps;
+ class ClusterStateSyncAndApplyTask;
+
+ using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>;
+ using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>;
+
+ ServiceLayerComponent _component;
+ Metrics _metrics;
+ ConfigFetcherUP _configFetcher;
+ vespalib::ThreadStackExecutor _state_sync_executor;
+ mutable std::mutex _stateLock;
+ ClusterStateBundleCSP _currentState;
+ OwnershipState::CSP _currentOwnership;
+ std::atomic<bool> _abortQueuedAndPendingOnStateChange;
+ std::atomic<bool> _abortMutatingIdealStateOps;
+ std::atomic<bool> _abortMutatingExternalLoadOps;
std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate>
makeLazyAbortPredicate(
@@ -183,14 +187,12 @@ private:
public:
ChangedBucketOwnershipHandler(const config::ConfigUri& configUri,
ServiceLayerComponentRegister& compReg);
- ~ChangedBucketOwnershipHandler();
+ ~ChangedBucketOwnershipHandler() override;
- bool onSetSystemState(
- const std::shared_ptr<api::SetSystemStateCommand>&) override;
+ bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
-
- bool onInternalReply(
- const std::shared_ptr<api::InternalReply>& reply) override;
+ bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override;
+ void onClose() override;
void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override;