aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-06-01 14:41:25 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-06-02 11:37:26 +0000
commitb9585a91ac153ba1111579430f66e5acf0a37281 (patch)
treeb13253b8eedcee48f5db7db013d636bd13604541 /storage
parentef6f0db67a5da3d66c1a23b01364ca08cc3e2f71 (diff)
Avoid blocking CommunicationManager thread during cluster state transitions
Incoming cluster state versions are not applied locally on a content node until all potentially conflicting operations running in the persistence threads have completed and all potentially conflicting operations in the persistence queues have been aborted. This can take a relatively long time when running LID space compactions etc via the persistence threads, and we'd risk blocking the main CommunicationManager thread (which handles all cluster controller communication) for prolonged periods of time. Move state blocking and internal state propagation to a dedicated task executor. The executor only has 1 thread, effectively turning it into an asynchronous FIFO executor.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp57
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp143
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h50
4 files changed, 139 insertions, 114 deletions
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 4c83dde30da..0d60f50a779 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -40,15 +40,11 @@ struct ChangedBucketOwnershipHandlerTest : Test {
uint16_t wantedOwner,
const lib::ClusterState& state);
- std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
- const lib::ClusterState& state) const
- {
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const lib::ClusterState& state) const {
return std::make_shared<api::SetSystemStateCommand>(state);
}
- std::shared_ptr<api::SetSystemStateCommand> createStateCmd(
- const std::string& stateStr) const
- {
+ std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const std::string& stateStr) const {
return createStateCmd(lib::ClusterState(stateStr));
}
@@ -71,11 +67,17 @@ struct ChangedBucketOwnershipHandlerTest : Test {
template <typename MsgType, typename... MsgParams>
void expectDownAbortsMessage(bool expected, MsgParams&& ... params);
- lib::ClusterState getDefaultTestClusterState() const {
+ std::shared_ptr<AbortBucketOperationsCommand> fetch_dispatched_abort_operations_command() {
+ _bottom->waitForMessages(2, 60); // abort cmd + set cluster state cmd
+ EXPECT_EQ(2, _bottom->getNumCommands()); // always _at least_ 2
+ return std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
+ }
+
+ static lib::ClusterState getDefaultTestClusterState() {
return lib::ClusterState("distributor:4 storage:1");
}
- lib::ClusterState getStorageDownTestClusterState() const {
+ static lib::ClusterState getStorageDownTestClusterState() {
return lib::ClusterState("distributor:4 storage:1 .0.s:d");
}
@@ -173,29 +175,26 @@ hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v)
bool
hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) {
+ link.waitForMessages(1, 60);
if (link.getNumCommands() != 1) {
std::cerr << "expected 1 command, found"
<< link.getNumCommands() << "\n";
}
- api::SetSystemStateCommand::SP cmd(
- std::dynamic_pointer_cast<api::SetSystemStateCommand>(
- link.getCommand(0)));
- return (cmd.get() != 0);
+ auto cmd = std::dynamic_pointer_cast<api::SetSystemStateCommand>(link.getCommand(0));
+ return static_cast<bool>(cmd);
}
}
void
-ChangedBucketOwnershipHandlerTest::applyDistribution(
- Redundancy redundancy, NodeCount nodeCount)
+ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount)
{
_app->setDistribution(redundancy, nodeCount);
_handler->storageDistributionChanged();
}
void
-ChangedBucketOwnershipHandlerTest::applyClusterState(
- const lib::ClusterState& state)
+ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& state)
{
_app->setClusterState(state);
_handler->reloadClusterState();
@@ -212,10 +211,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed
auto node2Buckets(insertBuckets(2, 2, stateBefore));
_top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1"));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
EXPECT_TRUE(hasAbortedAllOf(cmd, node3Buckets));
@@ -280,10 +277,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, down_edge_to_no_available_distributors
lib::ClusterState downState("distributor:3 .0.s:d .1.s:s .2.s:s storage:1");
_top->sendDown(createStateCmd(downState));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node0Buckets));
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
@@ -304,10 +299,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed
auto node2Buckets(insertBuckets(2, 2, stateAfter));
_top->sendDown(createStateCmd(stateAfter));
- // TODO: refactor into own function
- ASSERT_EQ(2, _bottom->getNumCommands());
- auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0));
- ASSERT_TRUE(cmd.get() != 0);
+ auto cmd = fetch_dispatched_abort_operations_command();
+ ASSERT_TRUE(cmd);
EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets));
EXPECT_TRUE(hasAbortedNoneOf(cmd, node0Buckets));
@@ -319,8 +312,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed
}
void
-ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(
- uint16_t fromDistributorIndex)
+ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(uint16_t fromDistributorIndex)
{
document::BucketId bucket(16, 6786);
auto msg = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bucket));
@@ -350,7 +342,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own
/**
* Generate and dispatch a message of the given type with the provided
- * aruments as if that message was sent from distributor 1. Messages will
+ * arguments as if that message was sent from distributor 1. Messages will
* be checked as if the state contains 4 distributors in Up state. This
* means that it suffices to send in a message with a bucket that is not
* owned by distributor 1 in this state to trigger an abort.
@@ -382,7 +374,7 @@ ChangedBucketOwnershipHandlerTest::expectChangeAbortsMessage(bool expected, MsgP
/**
* Generate and dispatch a message of the given type with the provided
- * aruments as if that message was sent from distributor 1. Messages will
+ * arguments as if that message was sent from distributor 1. Messages will
* be checked as if the state contains 4 distributors in Up state and storage
* node is down. This means that any abortable message will trigger an abort.
*/
@@ -394,6 +386,7 @@ ChangedBucketOwnershipHandlerTest::expectDownAbortsMessage(bool expected, MsgPar
(void) _bottom->getCommandsOnce();
ASSERT_NO_FATAL_FAILURE((expectChangeAbortsMessage<MsgType, MsgParams...>(false, std::forward<MsgParams>(params)...)));
_top->sendDown(createStateCmd(getStorageDownTestClusterState()));
+ _bottom->waitForMessages(3, 60);
ASSERT_EQ(_bottom->getNumCommands(), 3);
auto setSystemStateCommand = std::dynamic_pointer_cast<api::SetSystemStateCommand>(_bottom->getCommand(2));
ASSERT_TRUE(setSystemStateCommand);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index c0f4041e284..f6b7c7e5f0b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -184,7 +184,8 @@ computeAllPossibleHandlerThreads(const vespa::config::content::StorFilestorConfi
return cfg.numThreads +
computeNumResponseThreads(cfg.numResponseThreads) +
cfg.numNetworkThreads +
- cfg.numVisitorThreads;
+ cfg.numVisitorThreads +
+ 1; // Async cluster state processing thread (might be a pessimization to include here...)
}
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
index 9d7dd95d922..84d12c4e85f 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
@@ -28,6 +28,7 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler(
_component(compReg, "changedbucketownershiphandler"),
_metrics(),
_configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
+ _state_sync_executor(1), // single thread for sequential task execution
_stateLock(),
_currentState(), // Not set yet, so ownership will not be valid
_currentOwnership(std::make_shared<OwnershipState>(
@@ -98,7 +99,7 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner)
idealStateOpsAborted("ideal_state_ops_aborted", {}, "Number of outdated ideal state operations aborted", this),
externalLoadOpsAborted("external_load_ops_aborted", {}, "Number of outdated external load operations aborted", this)
{}
-ChangedBucketOwnershipHandler::Metrics::~Metrics() { }
+ChangedBucketOwnershipHandler::Metrics::~Metrics() = default;
ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo,
std::shared_ptr<const lib::ClusterStateBundle> state)
@@ -114,7 +115,7 @@ ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucke
}
-ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() {}
+ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() = default;
const lib::ClusterState&
@@ -235,18 +236,77 @@ ChangedBucketOwnershipHandler::makeLazyAbortPredicate(
_component.getIndex()));
}
-/*
- * If we go from:
- * 1) Not all down -> all distributors down
- * - abort ops for _all_ buckets
- * 2) All distributors down -> not down
- * - no-op, since down edge must have been handled first
- * 3) All down -> all down
- * - no-op
- * 4) Some nodes down or up
- * - abort ops for buckets that have changed ownership between
- * current and new cluster state.
- */
+class ChangedBucketOwnershipHandler::ClusterStateSyncAndApplyTask
+ : public vespalib::Executor::Task
+{
+ ChangedBucketOwnershipHandler& _owner;
+ std::shared_ptr<api::SetSystemStateCommand> _command;
+public:
+ ClusterStateSyncAndApplyTask(ChangedBucketOwnershipHandler& owner,
+ std::shared_ptr<api::SetSystemStateCommand> command) noexcept
+ : _owner(owner),
+ _command(std::move(command))
+ {}
+
+ /*
+ * If we go from:
+ * 1) Not all down -> all distributors down
+ * - abort ops for _all_ buckets
+ * 2) All distributors down -> not down
+ * - no-op, since down edge must have been handled first
+ * 3) All down -> all down
+ * - no-op
+ * 4) Some nodes down or up
+ * - abort ops for buckets that have changed ownership between
+ * current and new cluster state.
+ */
+ void run() override {
+ OwnershipState::CSP old_ownership;
+ OwnershipState::CSP new_ownership;
+ // Get old state and update own current cluster state _before_ it is
+ // applied to the rest of the system. This helps ensure that no message
+ // can get through in the off-case that the lower level storage links
+ // don't apply the state immediately for some reason.
+ {
+ std::lock_guard guard(_owner._stateLock);
+ old_ownership = _owner._currentOwnership;
+ _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle());
+ new_ownership = _owner._currentOwnership;
+ }
+ assert(new_ownership->valid());
+ // If we're going from not having a state to having a state, we per
+ // definition cannot possibly have gotten any load that needs aborting,
+ // as no such load is allowed through this component when this is the
+ // case.
+ if (!old_ownership->valid()) {
+ return _owner.sendDown(_command);
+ }
+
+ if (allDistributorsDownInState(old_ownership->getBaselineState())) {
+ LOG(debug, "No need to send aborts on transition '%s' -> '%s'",
+ old_ownership->getBaselineState().toString().c_str(),
+ new_ownership->getBaselineState().toString().c_str());
+ return _owner.sendDown(_command);;
+ }
+ _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState());
+
+ metrics::MetricTimer duration_timer;
+ auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership);
+ auto abort_cmd = std::make_shared<AbortBucketOperationsCommand>(std::move(predicate));
+
+ // Will not return until all operation aborts have been performed
+ // on the lower level links, at which point it is safe to send down
+ // the SetSystemStateCommand.
+ _owner.sendDown(abort_cmd);
+ duration_timer.stop(_owner._metrics.averageAbortProcessingTime);
+
+ // Conflicting operations have been aborted and incoming conflicting operations
+ // are aborted inline; send down the state command actually making the state change
+ // visible on the content node.
+ _owner.sendDown(_command);
+ }
+};
+
bool
ChangedBucketOwnershipHandler::onSetSystemState(
const std::shared_ptr<api::SetSystemStateCommand>& stateCmd)
@@ -255,47 +315,11 @@ ChangedBucketOwnershipHandler::onSetSystemState(
LOG(debug, "Operation aborting is config-disabled");
return false; // Early out.
}
- OwnershipState::CSP oldOwnership;
- OwnershipState::CSP newOwnership;
- // Get old state and update own current cluster state _before_ it is
- // applied to the rest of the system. This helps ensure that no message
- // can get through in the off-case that the lower level storage links
- // don't apply the state immediately for some reason.
- {
- std::lock_guard guard(_stateLock);
- oldOwnership = _currentOwnership;
- setCurrentOwnershipWithStateNoLock(stateCmd->getClusterStateBundle());
- newOwnership = _currentOwnership;
- }
- assert(newOwnership->valid());
- // If we're going from not having a state to having a state, we per
- // definition cannot possibly have gotten any load that needs aborting,
- // as no such load is allowed through this component when this is the
- // case.
- if (!oldOwnership->valid()) {
- return false;
- }
-
- if (allDistributorsDownInState(oldOwnership->getBaselineState())) {
- LOG(debug, "No need to send aborts on transition '%s' -> '%s'",
- oldOwnership->getBaselineState().toString().c_str(),
- newOwnership->getBaselineState().toString().c_str());
- return false;
- }
- logTransition(oldOwnership->getBaselineState(), newOwnership->getBaselineState());
-
- metrics::MetricTimer durationTimer;
- auto predicate(makeLazyAbortPredicate(oldOwnership, newOwnership));
- AbortBucketOperationsCommand::SP cmd(
- new AbortBucketOperationsCommand(std::move(predicate)));
-
- // Will not return until all operation aborts have been performed
- // on the lower level links, at which point it is safe to send down
- // the SetSystemStateCommand.
- sendDown(cmd);
-
- durationTimer.stop(_metrics.averageAbortProcessingTime);
- return false;
+ // Dispatch to background worker. This indirection is because operations such as lid-space compaction
+ // may cause the implicit operation abort waiting step to block the caller for a relatively long time.
+ // It is very important that the executor only has 1 thread, which means this has FIFO behavior.
+ _state_sync_executor.execute(std::make_unique<ClusterStateSyncAndApplyTask>(*this, stateCmd));
+ return true;
}
/**
@@ -411,8 +435,7 @@ ChangedBucketOwnershipHandler::onDown(
const std::shared_ptr<api::StorageMessage>& msg)
{
if (msg->getType() == api::MessageType::SETSYSTEMSTATE) {
- return onSetSystemState(
- std::static_pointer_cast<api::SetSystemStateCommand>(msg));
+ return onSetSystemState(std::static_pointer_cast<api::SetSystemStateCommand>(msg));
}
if (!isMutatingCommandAndNeedsChecking(*msg)) {
return false;
@@ -451,4 +474,10 @@ ChangedBucketOwnershipHandler::onInternalReply(
return (reply->getType() == AbortBucketOperationsReply::ID);
}
+void
+ChangedBucketOwnershipHandler::onClose()
+{
+ _state_sync_executor.shutdown();
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
index e753d96871e..8798d109955 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
@@ -10,6 +10,7 @@
#include <vespa/metrics/valuemetric.h>
#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/metricset.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <atomic>
#include <vector>
#include <unordered_map>
@@ -60,26 +61,24 @@ class ChangedBucketOwnershipHandler
private config::IFetcherCallback<vespa::config::content::PersistenceConfig>
{
public:
- class Metrics : public metrics::MetricSet
- {
+ class Metrics : public metrics::MetricSet {
public:
metrics::LongAverageMetric averageAbortProcessingTime;
metrics::LongCountMetric idealStateOpsAborted;
metrics::LongCountMetric externalLoadOpsAborted;
- Metrics(metrics::MetricSet* owner = 0);
- ~Metrics();
+ explicit Metrics(metrics::MetricSet* owner = nullptr);
+ ~Metrics() override;
};
/**
* Wrapper around the distribution & state pairs that decides how to
* compute the owner distributor for a bucket. It's possible to have
* an ownership state with a nullptr cluster state when the node
- * initially starts up, which is why no owership state must be used unless
+ * initially starts up, which is why no ownership state must be used unless
* invoking valid() on it returns true.
*/
- class OwnershipState
- {
+ class OwnershipState {
using BucketSpace = document::BucketSpace;
std::unordered_map<BucketSpace, std::shared_ptr<const lib::Distribution>, BucketSpace::hash> _distributions;
std::shared_ptr<const lib::ClusterStateBundle> _state;
@@ -93,7 +92,7 @@ public:
static const uint16_t FAILED_TO_RESOLVE = 0xffff;
- bool valid() const {
+ [[nodiscard]] bool valid() const noexcept {
return (!_distributions.empty() && _state);
}
@@ -114,16 +113,21 @@ public:
void reloadClusterState();
private:
- ServiceLayerComponent _component;
- Metrics _metrics;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
- mutable std::mutex _stateLock;
- std::shared_ptr<const lib::ClusterStateBundle> _currentState;
- OwnershipState::CSP _currentOwnership;
-
- std::atomic<bool> _abortQueuedAndPendingOnStateChange;
- std::atomic<bool> _abortMutatingIdealStateOps;
- std::atomic<bool> _abortMutatingExternalLoadOps;
+ class ClusterStateSyncAndApplyTask;
+
+ using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>;
+ using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>;
+
+ ServiceLayerComponent _component;
+ Metrics _metrics;
+ ConfigFetcherUP _configFetcher;
+ vespalib::ThreadStackExecutor _state_sync_executor;
+ mutable std::mutex _stateLock;
+ ClusterStateBundleCSP _currentState;
+ OwnershipState::CSP _currentOwnership;
+ std::atomic<bool> _abortQueuedAndPendingOnStateChange;
+ std::atomic<bool> _abortMutatingIdealStateOps;
+ std::atomic<bool> _abortMutatingExternalLoadOps;
std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate>
makeLazyAbortPredicate(
@@ -183,14 +187,12 @@ private:
public:
ChangedBucketOwnershipHandler(const config::ConfigUri& configUri,
ServiceLayerComponentRegister& compReg);
- ~ChangedBucketOwnershipHandler();
+ ~ChangedBucketOwnershipHandler() override;
- bool onSetSystemState(
- const std::shared_ptr<api::SetSystemStateCommand>&) override;
+ bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override;
bool onDown(const std::shared_ptr<api::StorageMessage>&) override;
-
- bool onInternalReply(
- const std::shared_ptr<api::InternalReply>& reply) override;
+ bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override;
+ void onClose() override;
void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override;