diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-07 15:31:31 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-07 16:05:30 +0000 |
commit | 81853e7e81313a374dea7f2dec377f437393c47c (patch) | |
tree | 44c4362357059e5c3dce8cf37689ed61c73f338a | |
parent | f2ececfb183ef73d6a53d74ee84798e0dc7eaf36 (diff) |
Block ideal state operations towards buckets that are locked
Prevents ideal state ops from modifying buckets that are being used
in a read-for-write context.
Move `OperationSequencer` to main `Distributor` to more easily
facilitate sharing of it across components.
28 files changed, 199 insertions, 57 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp index 9a9e04f0f33..5203fec2462 100644 --- a/storage/src/tests/distributor/blockingoperationstartertest.cpp +++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp @@ -2,6 +2,7 @@ #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storage/distributor/blockingoperationstarter.h> #include <vespa/storage/distributor/pendingmessagetracker.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <tests/distributor/maintenancemocks.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/vespalib/gtest/gtest.h> @@ -26,6 +27,7 @@ struct BlockingOperationStarterTest : Test { std::unique_ptr<MockOperationStarter> _starterImpl; std::unique_ptr<StorageComponentRegisterImpl> _compReg; std::unique_ptr<PendingMessageTracker> _messageTracker; + std::unique_ptr<OperationSequencer> _operation_sequencer; std::unique_ptr<BlockingOperationStarter> _operationStarter; void SetUp() override; @@ -39,7 +41,8 @@ BlockingOperationStarterTest::SetUp() _compReg->setClock(_clock); _clock.setAbsoluteTimeInSeconds(1); _messageTracker = std::make_unique<PendingMessageTracker>(*_compReg); - _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_starterImpl); + _operation_sequencer = std::make_unique<OperationSequencer>(); + _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl); } TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) { diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.cpp b/storage/src/tests/distributor/distributor_message_sender_stub.cpp index df894f1bb2c..893ebcd158a 100644 --- a/storage/src/tests/distributor/distributor_message_sender_stub.cpp +++ b/storage/src/tests/distributor/distributor_message_sender_stub.cpp @@ -12,7 +12,8 @@ namespace storage { DistributorMessageSenderStub::DistributorMessageSenderStub() : _stub_impl(), _cluster_name("storage"), - _pending_message_tracker(nullptr) + _pending_message_tracker(nullptr), + _operation_sequencer(nullptr) {} DistributorMessageSenderStub::~DistributorMessageSenderStub() = default; diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h index e69673a9366..0b5ab5ae9a7 100644 --- a/storage/src/tests/distributor/distributor_message_sender_stub.h +++ b/storage/src/tests/distributor/distributor_message_sender_stub.h @@ -13,6 +13,7 @@ class DistributorMessageSenderStub : public distributor::DistributorMessageSende MessageSenderStub _stub_impl; vespalib::string _cluster_name; distributor::PendingMessageTracker* _pending_message_tracker; + distributor::OperationSequencer* _operation_sequencer; public: DistributorMessageSenderStub(); @@ -94,6 +95,15 @@ public: void setPendingMessageTracker(distributor::PendingMessageTracker& tracker) { _pending_message_tracker = &tracker; } + + const distributor::OperationSequencer& operation_sequencer() const noexcept override { + assert(_operation_sequencer); + return *_operation_sequencer; + } + + void set_operation_sequencer(distributor::OperationSequencer& op_seq) { + _operation_sequencer = &op_seq; + } }; } diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d66b1315fde..d5374907723 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageapi/message/bucketsplitting.h> @@ -41,16 +42,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker) const + const PendingMessageTracker& tracker, + const OperationSequencer& op_seq) const { - return op.checkBlock(bucket, tracker); + return op.checkBlock(bucket, tracker, op_seq); } bool checkBlockForAllNodes(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker) const + const PendingMessageTracker& tracker, + const OperationSequencer& op_seq) const { - return op.checkBlockForAllNodes(bucket, tracker); + return op.checkBlockForAllNodes(bucket, tracker, op_seq); } std::vector<document::BucketSpace> _bucketSpaces; @@ -178,6 +181,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) framework::defaultimplementation::FakeClock clock; PendingMessageTracker tracker(_node->getComponentRegister()); + OperationSequencer op_seq; document::BucketId bid(16, 1234); std::vector<document::BucketId> buckets; @@ -193,14 +197,14 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4))); - EXPECT_TRUE(op.isBlocked(tracker)); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); } { // Don't trigger on requests to other nodes. RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5))); - EXPECT_FALSE(op.isBlocked(tracker)); + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); } // Don't block on null-bucket messages that aren't RequestBucketInfo. @@ -213,7 +217,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); - EXPECT_FALSE(op.isBlocked(tracker)); + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); } } @@ -221,6 +225,7 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; PendingMessageTracker tracker(_node->getComponentRegister()); + OperationSequencer op_seq; document::BucketId bid(16, 1234); { @@ -232,9 +237,30 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); // Not blocked for exact node match. - EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker)); + EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq)); // But blocked for bucket match! - EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker)); + EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq)); + } +} + +TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) { + setupDistributor(2, 10, "distributor:1 storage:2"); + framework::defaultimplementation::FakeClock clock; + PendingMessageTracker tracker(_node->getComponentRegister()); + OperationSequencer op_seq; + const auto bucket = makeDocumentBucket(document::BucketId(16, 1234)); + + { + auto msg = std::make_shared<api::JoinBucketsCommand>(bucket); + msg->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + tracker.insert(msg); + } + auto token = op_seq.try_acquire(bucket); + EXPECT_TRUE(token.valid()); + { + RemoveBucketOperation op("storage", BucketAndNodes(bucket, toVector<uint16_t>(0))); + EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq)); + EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq)); } } diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h index c88e477e90e..2bfb4ebb40f 100644 --- a/storage/src/tests/distributor/maintenancemocks.h +++ b/storage/src/tests/distributor/maintenancemocks.h @@ -51,7 +51,7 @@ public: } void onStart(DistributorMessageSender&) override {} void onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&) override {} - bool isBlocked(const PendingMessageTracker&) const override { + bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override { return _shouldBlock; } void setShouldBlock(bool shouldBlock) { @@ -64,7 +64,7 @@ class MockMaintenanceOperationGenerator { public: MaintenanceOperation::SP generate(const document::Bucket&bucket) const override { - return MaintenanceOperation::SP(new MockOperation(bucket)); + return std::make_shared<MockOperation>(bucket); } std::vector<MaintenanceOperation::SP> generateAll( @@ -73,7 +73,7 @@ public: { (void) tracker; std::vector<MaintenanceOperation::SP> ret; - ret.push_back(MaintenanceOperation::SP(new MockOperation(bucket))); + ret.emplace_back(std::make_shared<MockOperation>(bucket)); return ret; } diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index ccd70309a88..852c2ef8754 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <tests/distributor/distributortestutil.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/vespalib/gtest/gtest.h> @@ -17,11 +18,13 @@ namespace storage::distributor { struct MergeOperationTest : Test, DistributorTestUtil { std::unique_ptr<PendingMessageTracker> _pendingTracker; + OperationSequencer _operation_sequencer; void SetUp() override { createLinks(); _pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister()); _sender.setPendingMessageTracker(*_pendingTracker); + _sender.set_operation_sequencer(_operation_sequencer); } void TearDown() override { @@ -397,18 +400,31 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) { // Should not block on nodes _not_ included in operation node set _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10)); - EXPECT_FALSE(op.isBlocked(*_pendingTracker)); + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // Node 1 is included in operation node set and should cause a block _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker)); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); getClock().addSecondsToTime(11); - EXPECT_FALSE(op.isBlocked(*_pendingTracker)); // No longer busy + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy // Should block on other operation nodes than the first listed as well _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker)); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); +} + +TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { + getClock().setAbsoluteTimeInSeconds(10); + addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t"); + enableDistributorClusterState("distributor:1 storage:3"); + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1))); + EXPECT_TRUE(token.valid()); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); } TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) { diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp index 37651be05a4..7c9a22b5226 100644 --- a/storage/src/tests/distributor/operation_sequencer_test.cpp +++ b/storage/src/tests/distributor/operation_sequencer_test.cpp @@ -61,8 +61,11 @@ TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_f } TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) { - auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1)); + EXPECT_FALSE(sequencer.is_blocked(bucket)); + auto bucket_handle = sequencer.try_acquire(bucket); EXPECT_TRUE(bucket_handle.valid()); + EXPECT_TRUE(sequencer.is_blocked(bucket)); auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); EXPECT_FALSE(doc_handle.valid()); ASSERT_TRUE(doc_handle.is_blocked()); @@ -78,10 +81,12 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bu } TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) { - auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1)); + auto bucket_handle = sequencer.try_acquire(bucket); bucket_handle.release(); auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); EXPECT_TRUE(doc_handle.valid()); + EXPECT_FALSE(sequencer.is_blocked(bucket)); } TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) { @@ -91,4 +96,9 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_s EXPECT_TRUE(doc_handle.valid()); } +TEST_F(OperationSequencerTest, is_blocked_is_bucket_space_aware) { + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + EXPECT_FALSE(sequencer.is_blocked(document::Bucket(global_space(), document::BucketId(16, 1)))); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index c876f3e7ee4..9bb2e8b04a7 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -1,13 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/dummystoragelink.h> -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storage/distributor/operations/idealstate/splitoperation.h> #include <vespa/document/base/documentid.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storage/distributor/idealstatemanager.h> -#include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/idealstatemanager.h> +#include <vespa/storage/distributor/operation_sequencer.h> +#include <vespa/storage/distributor/operations/idealstate/splitoperation.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/persistence.h> +#include <tests/common/dummystoragelink.h> +#include <tests/distributor/distributortestutil.h> #include <vespa/vespalib/gtest/gtest.h> using document::test::makeDocumentBucket; @@ -260,6 +261,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg); + OperationSequencer op_seq; enableDistributorClusterState("distributor:1 storage:2"); @@ -281,17 +283,39 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { splitCount, splitByteSize); - EXPECT_TRUE(op.isBlocked(tracker)); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); // Now, pretend there's a join for another node in the same bucket. This // will happen when a join is partially completed. tracker.clearMessagesForNode(0); - EXPECT_FALSE(op.isBlocked(tracker)); + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); joinCmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); tracker.insert(joinCmd); - EXPECT_TRUE(op.isBlocked(tracker)); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); +} + +TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { + StorageComponentRegisterImpl compReg; + framework::defaultimplementation::FakeClock clock; + compReg.setClock(clock); + clock.setAbsoluteTimeInSeconds(1); + PendingMessageTracker tracker(compReg); + OperationSequencer op_seq; + + enableDistributorClusterState("distributor:1 storage:2"); + + document::BucketId source_bucket(16, 1); + insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, 250); + + SplitOperation op("storage", BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)), + maxSplitBits, splitCount, splitByteSize); + + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket)); + EXPECT_TRUE(token.valid()); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); } } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp index 0e4bc511b26..25c38888098 100644 --- a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp +++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp @@ -7,7 +7,7 @@ namespace storage::distributor { bool BlockingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority) { - if (operation->isBlocked(_messageTracker)) { + if (operation->isBlocked(_messageTracker, _operation_sequencer)) { return true; } return _starterImpl.start(operation, priority); diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.h b/storage/src/vespa/storage/distributor/blockingoperationstarter.h index a64a8f5b6bb..e79ae6b4a79 100644 --- a/storage/src/vespa/storage/distributor/blockingoperationstarter.h +++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.h @@ -7,13 +7,16 @@ namespace storage::distributor { class PendingMessageTracker; +class OperationSequencer; class BlockingOperationStarter : public OperationStarter { public: BlockingOperationStarter(PendingMessageTracker& messageTracker, + OperationSequencer& operation_sequencer, OperationStarter& starterImpl) : _messageTracker(messageTracker), + _operation_sequencer(operation_sequencer), _starterImpl(starterImpl) {} BlockingOperationStarter(const BlockingOperationStarter&) = delete; @@ -22,7 +25,8 @@ public: bool start(const std::shared_ptr<Operation>& operation, Priority priority) override; private: PendingMessageTracker& _messageTracker; - OperationStarter& _starterImpl; + OperationSequencer& _operation_sequencer; + OperationStarter& _starterImpl; }; } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 235384c9473..40e108dcda7 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -5,6 +5,7 @@ #include "distributor_bucket_space.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" +#include "operation_sequencer.h" #include "ownership_transfer_safe_time_point_calculator.h" #include "throttlingoperationstarter.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> @@ -79,14 +80,15 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _metrics(std::make_shared<DistributorMetricSet>()), _operationOwner(*this, _component.getClock()), _maintenanceOperationOwner(*this, _component.getClock()), + _operation_sequencer(std::make_unique<OperationSequencer>()), _pendingMessageTracker(compReg), _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg), _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), _messageSender(messageSender), - _externalOperationHandler(_component, _component, - getMetrics(), getMessageSender(), *this, _component, + _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(), + *_operation_sequencer, *this, _component, _idealStateManager, _operationOwner), _threadPool(threadPool), _initializingIsUp(true), @@ -95,7 +97,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()), _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), - _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_throttlingStarter)), + _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer, + *_throttlingStarter)), _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)), _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE), _recoveryTimeStarted(_component.getClock()), diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index e95df863210..70affc59ce3 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -34,6 +34,7 @@ namespace storage::distributor { class BlockingOperationStarter; class BucketPriorityDatabase; class DistributorBucketSpaceRepo; +class OperationSequencer; class OwnershipTransferSafeTimePointCalculator; class SimpleMaintenanceScanner; class ThrottlingOperationStarter; @@ -76,6 +77,10 @@ public: return _pendingMessageTracker; } + const OperationSequencer& operation_sequencer() const noexcept override { + return *_operation_sequencer; + } + const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const override; /** @@ -274,6 +279,7 @@ private: OperationOwner _operationOwner; OperationOwner _maintenanceOperationOwner; + std::unique_ptr<OperationSequencer> _operation_sequencer; PendingMessageTracker _pendingMessageTracker; BucketDBUpdater _bucketDBUpdater; StatusReporterDelegate _distributorStatusDelegate; @@ -310,7 +316,6 @@ private: DoneInitializeHandler& _doneInitializeHandler; bool _doneInitializing; - std::unique_ptr<BucketPriorityDatabase> _bucketPriorityDb; std::unique_ptr<SimpleMaintenanceScanner> _scanner; std::unique_ptr<ThrottlingOperationStarter> _throttlingStarter; diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h index 151413e98ef..ddf17058950 100644 --- a/storage/src/vespa/storage/distributor/distributormessagesender.h +++ b/storage/src/vespa/storage/distributor/distributormessagesender.h @@ -8,6 +8,7 @@ namespace storage::lib { class NodeType; } namespace storage::distributor { class PendingMessageTracker; +class OperationSequencer; class DistributorMessageSender : public MessageSender { public: @@ -21,6 +22,7 @@ public: virtual int getDistributorIndex() const = 0; virtual const vespalib::string& getClusterName() const = 0; virtual const PendingMessageTracker& getPendingMessageTracker() const = 0; + virtual const OperationSequencer& operation_sequencer() const noexcept = 0; }; } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 5fe5e51202a..cf00718b1fd 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -3,6 +3,7 @@ #include "bucket_space_distribution_context.h" #include "externaloperationhandler.h" #include "distributor.h" +#include "operation_sequencer.h" #include <vespa/document/base/documentid.h> #include <vespa/storage/distributor/operations/external/putoperation.h> #include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h> @@ -54,12 +55,16 @@ public: const PendingMessageTracker& getPendingMessageTracker() const override { abort(); // Never called by the messages using this component. } + const OperationSequencer& operation_sequencer() const noexcept override { + abort(); // Never called by the messages using this component. + } }; ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ctx, DistributorOperationContext& op_ctx, DistributorMetricSet& metrics, ChainedMessageSender& msg_sender, + OperationSequencer& operation_sequencer, NonTrackingMessageSender& non_tracking_sender, DocumentSelectionParser& parser, const MaintenanceOperationGenerator& gen, @@ -68,6 +73,7 @@ ExternalOperationHandler::ExternalOperationHandler(DistributorNodeContext& node_ _op_ctx(op_ctx), _metrics(metrics), _msg_sender(msg_sender), + _operation_sequencer(operation_sequencer), _parser(parser), _direct_dispatch_sender(std::make_unique<DirectDispatchSender>(node_ctx, non_tracking_sender)), _operationGenerator(gen), diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 0b6e9d970aa..1a02ff92f58 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "operation_sequencer.h" #include <vespa/document/bucket/bucketid.h> #include <vespa/document/bucket/bucketidfactory.h> #include <vespa/vdslib/state/clusterstate.h> @@ -23,6 +22,8 @@ class DistributorMetricSet; class Distributor; class MaintenanceOperationGenerator; class DirectDispatchSender; +class SequencingHandle; +class OperationSequencer; class OperationOwner; class ExternalOperationHandler : public api::MessageHandler @@ -44,6 +45,7 @@ public: DistributorOperationContext& op_ctx, DistributorMetricSet& metrics, ChainedMessageSender& msg_sender, + OperationSequencer& operation_sequencer, NonTrackingMessageSender& non_tracking_sender, DocumentSelectionParser& parser, const MaintenanceOperationGenerator& gen, @@ -89,10 +91,10 @@ private: DistributorOperationContext& _op_ctx; DistributorMetricSet& _metrics; ChainedMessageSender& _msg_sender; + OperationSequencer& _operation_sequencer; DocumentSelectionParser& _parser; std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender; const MaintenanceOperationGenerator& _operationGenerator; - OperationSequencer _operation_sequencer; Operation::SP _op; TimePoint _rejectFeedBeforeTimeReached; OperationOwner& _distributor_operation_owner; diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.cpp b/storage/src/vespa/storage/distributor/operation_sequencer.cpp index aa48cb6c950..2eeba79a053 100644 --- a/storage/src/vespa/storage/distributor/operation_sequencer.cpp +++ b/storage/src/vespa/storage/distributor/operation_sequencer.cpp @@ -47,6 +47,10 @@ SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket) } } +bool OperationSequencer::is_blocked(const document::Bucket& bucket) const noexcept { + return (_active_buckets.find(bucket) != _active_buckets.end()); +} + void OperationSequencer::release(const SequencingHandle& handle) { assert(handle.valid()); if (handle.has_gid()) { diff --git a/storage/src/vespa/storage/distributor/operation_sequencer.h b/storage/src/vespa/storage/distributor/operation_sequencer.h index b813071a1c8..f9ff6b32e0b 100644 --- a/storage/src/vespa/storage/distributor/operation_sequencer.h +++ b/storage/src/vespa/storage/distributor/operation_sequencer.h @@ -130,6 +130,8 @@ public: SequencingHandle try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id); SequencingHandle try_acquire(const document::Bucket& bucket); + + bool is_blocked(const document::Bucket&) const noexcept; private: void release(const SequencingHandle& handle); }; diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h index 37412b08d2e..5f346a0104f 100644 --- a/storage/src/vespa/storage/distributor/operationowner.h +++ b/storage/src/vespa/storage/distributor/operationowner.h @@ -48,6 +48,10 @@ public: return _sender.getPendingMessageTracker(); } + const OperationSequencer& operation_sequencer() const noexcept override { + return _sender.operation_sequencer(); + } + private: OperationOwner& _owner; DistributorMessageSender& _sender; diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 46eb104db56..2d6e3bd30e2 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -88,6 +88,10 @@ struct IntermediateMessageSender : DistributorMessageSender { const PendingMessageTracker& getPendingMessageTracker() const override { return forward.getPendingMessageTracker(); } + + const OperationSequencer& operation_sequencer() const noexcept override { + return forward.operation_sequencer(); + } }; IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index 1a2eb8c6215..42d9e3d4a3d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/distributor/idealstatemetricsset.h> #include <vespa/storage/distributor/distributor_bucket_space_repo.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/log/log.h> LOG_SETUP(".distributor.operation"); @@ -200,8 +201,12 @@ checkNullBucketRequestBucketInfoMessage(uint16_t node, bool IdealStateOperation::checkBlock(const document::Bucket &bucket, - const PendingMessageTracker& tracker) const + const PendingMessageTracker& tracker, + const OperationSequencer& seq) const { + if (seq.is_blocked(bucket)) { + return true; + } IdealStateOpChecker ichk(*this); const std::vector<uint16_t>& nodes(getNodes()); for (auto node : nodes) { @@ -219,8 +224,12 @@ IdealStateOperation::checkBlock(const document::Bucket &bucket, bool IdealStateOperation::checkBlockForAllNodes( const document::Bucket &bucket, - const PendingMessageTracker& tracker) const + const PendingMessageTracker& tracker, + const OperationSequencer& seq) const { + if (seq.is_blocked(bucket)) { + return true; + } IdealStateOpChecker ichk(*this); // Check messages sent to _any node_ for _this_ particular bucket. tracker.checkPendingMessages(bucket, ichk); @@ -238,9 +247,9 @@ IdealStateOperation::checkBlockForAllNodes( bool -IdealStateOperation::isBlocked(const PendingMessageTracker& tracker) const +IdealStateOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const { - return checkBlock(getBucket(), tracker); + return checkBlock(getBucket(), tracker, op_seq); } std::string diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index 08b545057cf..dcdc2f32374 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -182,7 +182,7 @@ public: * Returns true if we are blocked to start this operation given * the pending messages. */ - bool isBlocked(const PendingMessageTracker& pendingMessages) const override; + bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override; /** Returns the priority we should send messages with. @@ -235,8 +235,12 @@ protected: * operations to other nodes for this bucket, these will not be part of * the set of messages checked. */ - bool checkBlock(const document::Bucket &bucket, const PendingMessageTracker& tracker) const; - bool checkBlockForAllNodes(const document::Bucket &bucket, const PendingMessageTracker& tracker) const; + bool checkBlock(const document::Bucket& bucket, + const PendingMessageTracker& tracker, + const OperationSequencer&) const; + bool checkBlockForAllNodes(const document::Bucket& bucket, + const PendingMessageTracker& tracker, + const OperationSequencer&) const; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index 130e039a43e..292df7c5e07 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -150,10 +150,10 @@ JoinOperation::getJoinBucket(size_t idx) const } bool -JoinOperation::isBlocked(const PendingMessageTracker& tracker) const +JoinOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const { - return (checkBlock(getBucket(), tracker) || - checkBlock(getJoinBucket(0), tracker) || - (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker))); + return (checkBlock(getBucket(), tracker, op_seq) || + checkBlock(getJoinBucket(0), tracker, op_seq) || + (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker, op_seq))); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h index 55a7c05264a..86372231058 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h @@ -41,7 +41,8 @@ public: return JOIN_BUCKET; } - bool isBlocked(const PendingMessageTracker& pendingMessages) const override; + bool isBlocked(const PendingMessageTracker& pendingMessages, + const OperationSequencer& op_seq) const override; protected: using NodeToBuckets = std::map<uint16_t, std::vector<document::BucketId>>; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 445d0972937..16b510c22be 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -233,7 +233,7 @@ MergeOperation::deleteSourceOnlyNodes( BucketAndNodes(getBucket(), sourceOnlyNodes))); // Must not send removes to source only copies if something has caused // pending load to the copy after the merge was sent! - if (_removeOperation->isBlocked(sender.getPendingMessageTracker())) { + if (_removeOperation->isBlocked(sender.getPendingMessageTracker(), sender.operation_sequencer())) { LOG(debug, "Source only removal for %s was blocked by a pending " "operation", @@ -324,14 +324,15 @@ bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) return IdealStateOperation::shouldBlockThisOperation(messageType, pri); } -bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker) const { +bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker, + const OperationSequencer& op_seq) const { const auto& node_info = pending_tracker.getNodeInfo(); for (auto node : getNodes()) { if (node_info.isBusy(node)) { return true; } } - return IdealStateOperation::isBlocked(pending_tracker); + return IdealStateOperation::isBlocked(pending_tracker, op_seq); } } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index d09bc0ba5c4..a5f7d352eea 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -47,7 +47,7 @@ public: std::vector<MergeMetaData>&); bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override; - bool isBlocked(const PendingMessageTracker& pendingMessages) const override; + bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override; private: static void addIdealNodes( const std::vector<uint16_t>& idealNodes, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 508dcf13916..a74dc1c0d65 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -164,9 +164,9 @@ SplitOperation::onReceive(DistributorMessageSender&, const api::StorageReply::SP } bool -SplitOperation::isBlocked(const PendingMessageTracker& tracker) const +SplitOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const { - return checkBlockForAllNodes(getBucket(), tracker); + return checkBlockForAllNodes(getBucket(), tracker, op_seq); } bool diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h index 6959929bf01..9e24852886d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h @@ -20,7 +20,7 @@ public: const char* getName() const override { return "split"; }; Type getType() const override { return SPLIT_BUCKET; } uint32_t getMaxSplitBits() const { return _maxBits; } - bool isBlocked(const PendingMessageTracker&) const override; + bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override; bool shouldBlockThisOperation(uint32_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index 538f15d2bf2..e9320817a8e 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -18,6 +18,7 @@ class StorageComponent; namespace distributor { class PendingMessageTracker; +class OperationSequencer; class Operation { @@ -61,7 +62,7 @@ public: * Returns true if we are blocked to start this operation given * the pending messages. */ - virtual bool isBlocked(const PendingMessageTracker&) const { + virtual bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const { return false; } |