diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-06-05 14:45:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-05 14:45:38 +0200 |
commit | 58a133e9a98f053426e9a94cf39ec05be1695779 (patch) | |
tree | 4f22ca8bf29a6c6ff454733a2df7eb81973c4016 /storage | |
parent | 8226d4766007f0091826044c126655f388522a43 (diff) | |
parent | a00278879397db33d6e9593fb887d041bbc44867 (diff) |
Merge pull request #27275 from vespa-engine/vekterli/do-not-block-comm-mgr-thread-when-aborting-ops
Avoid blocking CommunicationManager thread during cluster state transitions
Diffstat (limited to 'storage')
4 files changed, 143 insertions, 114 deletions
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 4c83dde30da..ae2385a36d8 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -40,15 +40,11 @@ struct ChangedBucketOwnershipHandlerTest : Test { uint16_t wantedOwner, const lib::ClusterState& state); - std::shared_ptr<api::SetSystemStateCommand> createStateCmd( - const lib::ClusterState& state) const - { + std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const lib::ClusterState& state) const { return std::make_shared<api::SetSystemStateCommand>(state); } - std::shared_ptr<api::SetSystemStateCommand> createStateCmd( - const std::string& stateStr) const - { + std::shared_ptr<api::SetSystemStateCommand> createStateCmd(const std::string& stateStr) const { return createStateCmd(lib::ClusterState(stateStr)); } @@ -71,11 +67,17 @@ struct ChangedBucketOwnershipHandlerTest : Test { template <typename MsgType, typename... MsgParams> void expectDownAbortsMessage(bool expected, MsgParams&& ... params); - lib::ClusterState getDefaultTestClusterState() const { + std::shared_ptr<AbortBucketOperationsCommand> fetch_dispatched_abort_operations_command() { + _bottom->waitForMessages(2, 60); // abort cmd + set cluster state cmd + EXPECT_EQ(2, _bottom->getNumCommands()); + return std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); + } + + static lib::ClusterState getDefaultTestClusterState() { return lib::ClusterState("distributor:4 storage:1"); } - lib::ClusterState getStorageDownTestClusterState() const { + static lib::ClusterState getStorageDownTestClusterState() { return lib::ClusterState("distributor:4 storage:1 .0.s:d"); } @@ -173,29 +175,26 @@ hasAbortedNoneOf(const AbortBucketOperationsCommand::SP& cmd, const Vec& v) bool hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) { + link.waitForMessages(1, 60); if (link.getNumCommands() != 1) { std::cerr << "expected 1 command, found" << link.getNumCommands() << "\n"; } - api::SetSystemStateCommand::SP cmd( - std::dynamic_pointer_cast<api::SetSystemStateCommand>( - link.getCommand(0))); - return (cmd.get() != 0); + auto cmd = std::dynamic_pointer_cast<api::SetSystemStateCommand>(link.getCommand(0)); + return static_cast<bool>(cmd); } } void -ChangedBucketOwnershipHandlerTest::applyDistribution( - Redundancy redundancy, NodeCount nodeCount) +ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount) { _app->setDistribution(redundancy, nodeCount); _handler->storageDistributionChanged(); } void -ChangedBucketOwnershipHandlerTest::applyClusterState( - const lib::ClusterState& state) +ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& state) { _app->setClusterState(state); _handler->reloadClusterState(); @@ -212,10 +211,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed auto node2Buckets(insertBuckets(2, 2, stateBefore)); _top->sendDown(createStateCmd("distributor:4 .1.s:d .3.s:d storage:1")); - // TODO: refactor into own function - ASSERT_EQ(2, _bottom->getNumCommands()); - auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); - ASSERT_TRUE(cmd.get() != 0); + auto cmd = fetch_dispatched_abort_operations_command(); + ASSERT_TRUE(cmd); EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets)); EXPECT_TRUE(hasAbortedAllOf(cmd, node3Buckets)); @@ -280,10 +277,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, down_edge_to_no_available_distributors lib::ClusterState downState("distributor:3 .0.s:d .1.s:s .2.s:s storage:1"); _top->sendDown(createStateCmd(downState)); - // TODO: refactor into own function - ASSERT_EQ(2, _bottom->getNumCommands()); - auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); - ASSERT_TRUE(cmd.get() != 0); + auto cmd = fetch_dispatched_abort_operations_command(); + ASSERT_TRUE(cmd); EXPECT_TRUE(hasAbortedAllOf(cmd, node0Buckets)); EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets)); @@ -304,10 +299,8 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed auto node2Buckets(insertBuckets(2, 2, stateAfter)); _top->sendDown(createStateCmd(stateAfter)); - // TODO: refactor into own function - ASSERT_EQ(2, _bottom->getNumCommands()); - auto cmd = std::dynamic_pointer_cast<AbortBucketOperationsCommand>(_bottom->getCommand(0)); - ASSERT_TRUE(cmd.get() != 0); + auto cmd = fetch_dispatched_abort_operations_command(); + ASSERT_TRUE(cmd); EXPECT_TRUE(hasAbortedAllOf(cmd, node1Buckets)); EXPECT_TRUE(hasAbortedNoneOf(cmd, node0Buckets)); @@ -319,8 +312,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed } void -ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket( - uint16_t fromDistributorIndex) +ChangedBucketOwnershipHandlerTest::sendAndExpectAbortedCreateBucket(uint16_t fromDistributorIndex) { document::BucketId bucket(16, 6786); auto msg = std::make_shared<api::CreateBucketCommand>(makeDocumentBucket(bucket)); @@ -350,7 +342,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own /** * Generate and dispatch a message of the given type with the provided - * aruments as if that message was sent from distributor 1. Messages will + * arguments as if that message was sent from distributor 1. Messages will * be checked as if the state contains 4 distributors in Up state. This * means that it suffices to send in a message with a bucket that is not * owned by distributor 1 in this state to trigger an abort. @@ -382,7 +374,7 @@ ChangedBucketOwnershipHandlerTest::expectChangeAbortsMessage(bool expected, MsgP /** * Generate and dispatch a message of the given type with the provided - * aruments as if that message was sent from distributor 1. Messages will + * arguments as if that message was sent from distributor 1. Messages will * be checked as if the state contains 4 distributors in Up state and storage * node is down. This means that any abortable message will trigger an abort. */ @@ -394,6 +386,7 @@ ChangedBucketOwnershipHandlerTest::expectDownAbortsMessage(bool expected, MsgPar (void) _bottom->getCommandsOnce(); ASSERT_NO_FATAL_FAILURE((expectChangeAbortsMessage<MsgType, MsgParams...>(false, std::forward<MsgParams>(params)...))); _top->sendDown(createStateCmd(getStorageDownTestClusterState())); + _bottom->waitForMessages(3, 60); ASSERT_EQ(_bottom->getNumCommands(), 3); auto setSystemStateCommand = std::dynamic_pointer_cast<api::SetSystemStateCommand>(_bottom->getCommand(2)); ASSERT_TRUE(setSystemStateCommand); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index c0f4041e284..f6b7c7e5f0b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -184,7 +184,8 @@ computeAllPossibleHandlerThreads(const vespa::config::content::StorFilestorConfi return cfg.numThreads + computeNumResponseThreads(cfg.numResponseThreads) + cfg.numNetworkThreads + - cfg.numVisitorThreads; + cfg.numVisitorThreads + + 1; // Async cluster state processing thread (might be a pessimization to include here...) } } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index 9d7dd95d922..3b97ff6c018 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -28,6 +28,7 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( _component(compReg, "changedbucketownershiphandler"), _metrics(), _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), + _state_sync_executor(1), // single thread for sequential task execution _stateLock(), _currentState(), // Not set yet, so ownership will not be valid _currentOwnership(std::make_shared<OwnershipState>( @@ -98,7 +99,7 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner) idealStateOpsAborted("ideal_state_ops_aborted", {}, "Number of outdated ideal state operations aborted", this), externalLoadOpsAborted("external_load_ops_aborted", {}, "Number of outdated external load operations aborted", this) {} -ChangedBucketOwnershipHandler::Metrics::~Metrics() { } +ChangedBucketOwnershipHandler::Metrics::~Metrics() = default; ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo, std::shared_ptr<const lib::ClusterStateBundle> state) @@ -114,7 +115,7 @@ ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucke } -ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() {} +ChangedBucketOwnershipHandler::OwnershipState::~OwnershipState() = default; const lib::ClusterState& @@ -235,18 +236,79 @@ ChangedBucketOwnershipHandler::makeLazyAbortPredicate( _component.getIndex())); } -/* - * If we go from: - * 1) Not all down -> all distributors down - * - abort ops for _all_ buckets - * 2) All distributors down -> not down - * - no-op, since down edge must have been handled first - * 3) All down -> all down - * - no-op - * 4) Some nodes down or up - * - abort ops for buckets that have changed ownership between - * current and new cluster state. - */ +class ChangedBucketOwnershipHandler::ClusterStateSyncAndApplyTask + : public vespalib::Executor::Task +{ + ChangedBucketOwnershipHandler& _owner; + std::shared_ptr<api::SetSystemStateCommand> _command; +public: + ClusterStateSyncAndApplyTask(ChangedBucketOwnershipHandler& owner, + std::shared_ptr<api::SetSystemStateCommand> command) noexcept + : _owner(owner), + _command(std::move(command)) + {} + + /* + * If we go from: + * 1) Not all down -> all distributors down + * - abort ops for _all_ buckets + * 2) All distributors down -> not down + * - no-op, since down edge must have been handled first + * 3) All down -> all down + * - no-op + * 4) Some nodes down or up + * - abort ops for buckets that have changed ownership between + * current and new cluster state. + */ + void run() override { + OwnershipState::CSP old_ownership; + OwnershipState::CSP new_ownership; + // Update the ownership state inspected by all bucket-mutating operations passing through + // this component so that messages from outdated distributors will be rejected. Note that + // this is best-effort; with our current multitude of RPC threads directly dispatching + // operations into the persistence provider, it's possible for a thread carrying an outdated + // operation to have already passed the barrier, but be preempted so that it will apply the + // op _after_ the abort step has completed. + { + std::lock_guard guard(_owner._stateLock); + old_ownership = _owner._currentOwnership; + _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle()); + new_ownership = _owner._currentOwnership; + } + assert(new_ownership->valid()); + // If we're going from not having a state to having a state, we per + // definition cannot possibly have gotten any load that needs aborting, + // as no such load is allowed through this component when this is the + // case. + if (!old_ownership->valid()) { + return _owner.sendDown(_command); + } + + if (allDistributorsDownInState(old_ownership->getBaselineState())) { + LOG(debug, "No need to send aborts on transition '%s' -> '%s'", + old_ownership->getBaselineState().toString().c_str(), + new_ownership->getBaselineState().toString().c_str()); + return _owner.sendDown(_command);; + } + _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState()); + + metrics::MetricTimer duration_timer; + auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership); + auto abort_cmd = std::make_shared<AbortBucketOperationsCommand>(std::move(predicate)); + + // Will not return until all operation aborts have been performed + // on the lower level links, at which point it is safe to send down + // the SetSystemStateCommand. + _owner.sendDown(abort_cmd); + duration_timer.stop(_owner._metrics.averageAbortProcessingTime); + + // Conflicting operations have been aborted and incoming conflicting operations + // are aborted inline; send down the state command actually making the state change + // visible on the content node. + _owner.sendDown(_command); + } +}; + bool ChangedBucketOwnershipHandler::onSetSystemState( const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) @@ -255,47 +317,13 @@ ChangedBucketOwnershipHandler::onSetSystemState( LOG(debug, "Operation aborting is config-disabled"); return false; // Early out. } - OwnershipState::CSP oldOwnership; - OwnershipState::CSP newOwnership; - // Get old state and update own current cluster state _before_ it is - // applied to the rest of the system. This helps ensure that no message - // can get through in the off-case that the lower level storage links - // don't apply the state immediately for some reason. - { - std::lock_guard guard(_stateLock); - oldOwnership = _currentOwnership; - setCurrentOwnershipWithStateNoLock(stateCmd->getClusterStateBundle()); - newOwnership = _currentOwnership; - } - assert(newOwnership->valid()); - // If we're going from not having a state to having a state, we per - // definition cannot possibly have gotten any load that needs aborting, - // as no such load is allowed through this component when this is the - // case. - if (!oldOwnership->valid()) { - return false; - } - - if (allDistributorsDownInState(oldOwnership->getBaselineState())) { - LOG(debug, "No need to send aborts on transition '%s' -> '%s'", - oldOwnership->getBaselineState().toString().c_str(), - newOwnership->getBaselineState().toString().c_str()); - return false; - } - logTransition(oldOwnership->getBaselineState(), newOwnership->getBaselineState()); - - metrics::MetricTimer durationTimer; - auto predicate(makeLazyAbortPredicate(oldOwnership, newOwnership)); - AbortBucketOperationsCommand::SP cmd( - new AbortBucketOperationsCommand(std::move(predicate))); - - // Will not return until all operation aborts have been performed - // on the lower level links, at which point it is safe to send down - // the SetSystemStateCommand. - sendDown(cmd); - - durationTimer.stop(_metrics.averageAbortProcessingTime); - return false; + // Dispatch to background worker. This indirection is because operations such as lid-space compaction + // may cause the implicit operation abort waiting step to block the caller for a relatively long time. + // It is very important that the executor only has 1 thread, which means this has FIFO behavior. + [[maybe_unused]] auto rejected_task = _state_sync_executor.execute(std::make_unique<ClusterStateSyncAndApplyTask>(*this, stateCmd)); + // If this fails, we have processed a message _after_ onClose has been called, which should not happen. + assert(!rejected_task); + return true; } /** @@ -411,8 +439,7 @@ ChangedBucketOwnershipHandler::onDown( const std::shared_ptr<api::StorageMessage>& msg) { if (msg->getType() == api::MessageType::SETSYSTEMSTATE) { - return onSetSystemState( - std::static_pointer_cast<api::SetSystemStateCommand>(msg)); + return onSetSystemState(std::static_pointer_cast<api::SetSystemStateCommand>(msg)); } if (!isMutatingCommandAndNeedsChecking(*msg)) { return false; @@ -451,4 +478,10 @@ ChangedBucketOwnershipHandler::onInternalReply( return (reply->getType() == AbortBucketOperationsReply::ID); } +void +ChangedBucketOwnershipHandler::onClose() +{ + _state_sync_executor.shutdown().sync(); +} + } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h index e753d96871e..8798d109955 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h @@ -10,6 +10,7 @@ #include <vespa/metrics/valuemetric.h> #include <vespa/metrics/countmetric.h> #include <vespa/metrics/metricset.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <atomic> #include <vector> #include <unordered_map> @@ -60,26 +61,24 @@ class ChangedBucketOwnershipHandler private config::IFetcherCallback<vespa::config::content::PersistenceConfig> { public: - class Metrics : public metrics::MetricSet - { + class Metrics : public metrics::MetricSet { public: metrics::LongAverageMetric averageAbortProcessingTime; metrics::LongCountMetric idealStateOpsAborted; metrics::LongCountMetric externalLoadOpsAborted; - Metrics(metrics::MetricSet* owner = 0); - ~Metrics(); + explicit Metrics(metrics::MetricSet* owner = nullptr); + ~Metrics() override; }; /** * Wrapper around the distribution & state pairs that decides how to * compute the owner distributor for a bucket. It's possible to have * an ownership state with a nullptr cluster state when the node - * initially starts up, which is why no owership state must be used unless + * initially starts up, which is why no ownership state must be used unless * invoking valid() on it returns true. */ - class OwnershipState - { + class OwnershipState { using BucketSpace = document::BucketSpace; std::unordered_map<BucketSpace, std::shared_ptr<const lib::Distribution>, BucketSpace::hash> _distributions; std::shared_ptr<const lib::ClusterStateBundle> _state; @@ -93,7 +92,7 @@ public: static const uint16_t FAILED_TO_RESOLVE = 0xffff; - bool valid() const { + [[nodiscard]] bool valid() const noexcept { return (!_distributions.empty() && _state); } @@ -114,16 +113,21 @@ public: void reloadClusterState(); private: - ServiceLayerComponent _component; - Metrics _metrics; - std::unique_ptr<config::ConfigFetcher> _configFetcher; - mutable std::mutex _stateLock; - std::shared_ptr<const lib::ClusterStateBundle> _currentState; - OwnershipState::CSP _currentOwnership; - - std::atomic<bool> _abortQueuedAndPendingOnStateChange; - std::atomic<bool> _abortMutatingIdealStateOps; - std::atomic<bool> _abortMutatingExternalLoadOps; + class ClusterStateSyncAndApplyTask; + + using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>; + using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>; + + ServiceLayerComponent _component; + Metrics _metrics; + ConfigFetcherUP _configFetcher; + vespalib::ThreadStackExecutor _state_sync_executor; + mutable std::mutex _stateLock; + ClusterStateBundleCSP _currentState; + OwnershipState::CSP _currentOwnership; + std::atomic<bool> _abortQueuedAndPendingOnStateChange; + std::atomic<bool> _abortMutatingIdealStateOps; + std::atomic<bool> _abortMutatingExternalLoadOps; std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> makeLazyAbortPredicate( @@ -183,14 +187,12 @@ private: public: ChangedBucketOwnershipHandler(const config::ConfigUri& configUri, ServiceLayerComponentRegister& compReg); - ~ChangedBucketOwnershipHandler(); + ~ChangedBucketOwnershipHandler() override; - bool onSetSystemState( - const std::shared_ptr<api::SetSystemStateCommand>&) override; + bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override; bool onDown(const std::shared_ptr<api::StorageMessage>&) override; - - bool onInternalReply( - const std::shared_ptr<api::InternalReply>& reply) override; + bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override; + void onClose() override; void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override; |