diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-07 15:31:31 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-07 16:05:30 +0000 |
commit | 81853e7e81313a374dea7f2dec377f437393c47c (patch) | |
tree | 44c4362357059e5c3dce8cf37689ed61c73f338a /storage/src/tests/distributor | |
parent | f2ececfb183ef73d6a53d74ee84798e0dc7eaf36 (diff) |
Block ideal state operations towards buckets that are locked
Prevents ideal state ops from modifying buckets that are being used
in a read-for-write context.
Move `OperationSequencer` to main `Distributor` to more easily
facilitate sharing of it across components.
Diffstat (limited to 'storage/src/tests/distributor')
8 files changed, 119 insertions, 29 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp index 9a9e04f0f33..5203fec2462 100644 --- a/storage/src/tests/distributor/blockingoperationstartertest.cpp +++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp @@ -2,6 +2,7 @@ #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storage/distributor/blockingoperationstarter.h> #include <vespa/storage/distributor/pendingmessagetracker.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <tests/distributor/maintenancemocks.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/vespalib/gtest/gtest.h> @@ -26,6 +27,7 @@ struct BlockingOperationStarterTest : Test { std::unique_ptr<MockOperationStarter> _starterImpl; std::unique_ptr<StorageComponentRegisterImpl> _compReg; std::unique_ptr<PendingMessageTracker> _messageTracker; + std::unique_ptr<OperationSequencer> _operation_sequencer; std::unique_ptr<BlockingOperationStarter> _operationStarter; void SetUp() override; @@ -39,7 +41,8 @@ BlockingOperationStarterTest::SetUp() _compReg->setClock(_clock); _clock.setAbsoluteTimeInSeconds(1); _messageTracker = std::make_unique<PendingMessageTracker>(*_compReg); - _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_starterImpl); + _operation_sequencer = std::make_unique<OperationSequencer>(); + _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl); } TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) { diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.cpp b/storage/src/tests/distributor/distributor_message_sender_stub.cpp index df894f1bb2c..893ebcd158a 100644 --- a/storage/src/tests/distributor/distributor_message_sender_stub.cpp +++ b/storage/src/tests/distributor/distributor_message_sender_stub.cpp @@ -12,7 +12,8 @@ namespace storage { DistributorMessageSenderStub::DistributorMessageSenderStub() : _stub_impl(), _cluster_name("storage"), - _pending_message_tracker(nullptr) + _pending_message_tracker(nullptr), + _operation_sequencer(nullptr) {} DistributorMessageSenderStub::~DistributorMessageSenderStub() = default; diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h index e69673a9366..0b5ab5ae9a7 100644 --- a/storage/src/tests/distributor/distributor_message_sender_stub.h +++ b/storage/src/tests/distributor/distributor_message_sender_stub.h @@ -13,6 +13,7 @@ class DistributorMessageSenderStub : public distributor::DistributorMessageSende MessageSenderStub _stub_impl; vespalib::string _cluster_name; distributor::PendingMessageTracker* _pending_message_tracker; + distributor::OperationSequencer* _operation_sequencer; public: DistributorMessageSenderStub(); @@ -94,6 +95,15 @@ public: void setPendingMessageTracker(distributor::PendingMessageTracker& tracker) { _pending_message_tracker = &tracker; } + + const distributor::OperationSequencer& operation_sequencer() const noexcept override { + assert(_operation_sequencer); + return *_operation_sequencer; + } + + void set_operation_sequencer(distributor::OperationSequencer& op_seq) { + _operation_sequencer = &op_seq; + } }; } diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d66b1315fde..d5374907723 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <vespa/storageapi/message/stat.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageapi/message/bucketsplitting.h> @@ -41,16 +42,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker) const + const PendingMessageTracker& tracker, + const OperationSequencer& op_seq) const { - return op.checkBlock(bucket, tracker); + return op.checkBlock(bucket, tracker, op_seq); } bool checkBlockForAllNodes(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker) const + const PendingMessageTracker& tracker, + const OperationSequencer& op_seq) const { - return op.checkBlockForAllNodes(bucket, tracker); + return op.checkBlockForAllNodes(bucket, tracker, op_seq); } std::vector<document::BucketSpace> _bucketSpaces; @@ -178,6 +181,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) framework::defaultimplementation::FakeClock clock; PendingMessageTracker tracker(_node->getComponentRegister()); + OperationSequencer op_seq; document::BucketId bid(16, 1234); std::vector<document::BucketId> buckets; @@ -193,14 +197,14 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4))); - EXPECT_TRUE(op.isBlocked(tracker)); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); } { // Don't trigger on requests to other nodes. RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5))); - EXPECT_FALSE(op.isBlocked(tracker)); + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); } // Don't block on null-bucket messages that aren't RequestBucketInfo. @@ -213,7 +217,7 @@ TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); - EXPECT_FALSE(op.isBlocked(tracker)); + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); } } @@ -221,6 +225,7 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; PendingMessageTracker tracker(_node->getComponentRegister()); + OperationSequencer op_seq; document::BucketId bid(16, 1234); { @@ -232,9 +237,30 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) RemoveBucketOperation op("storage", BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); // Not blocked for exact node match. - EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker)); + EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq)); // But blocked for bucket match! - EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker)); + EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq)); + } +} + +TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) { + setupDistributor(2, 10, "distributor:1 storage:2"); + framework::defaultimplementation::FakeClock clock; + PendingMessageTracker tracker(_node->getComponentRegister()); + OperationSequencer op_seq; + const auto bucket = makeDocumentBucket(document::BucketId(16, 1234)); + + { + auto msg = std::make_shared<api::JoinBucketsCommand>(bucket); + msg->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + tracker.insert(msg); + } + auto token = op_seq.try_acquire(bucket); + EXPECT_TRUE(token.valid()); + { + RemoveBucketOperation op("storage", BucketAndNodes(bucket, toVector<uint16_t>(0))); + EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq)); + EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq)); } } diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h index c88e477e90e..2bfb4ebb40f 100644 --- a/storage/src/tests/distributor/maintenancemocks.h +++ b/storage/src/tests/distributor/maintenancemocks.h @@ -51,7 +51,7 @@ public: } void onStart(DistributorMessageSender&) override {} void onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&) override {} - bool isBlocked(const PendingMessageTracker&) const override { + bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override { return _shouldBlock; } void setShouldBlock(bool shouldBlock) { @@ -64,7 +64,7 @@ class MockMaintenanceOperationGenerator { public: MaintenanceOperation::SP generate(const document::Bucket&bucket) const override { - return MaintenanceOperation::SP(new MockOperation(bucket)); + return std::make_shared<MockOperation>(bucket); } std::vector<MaintenanceOperation::SP> generateAll( @@ -73,7 +73,7 @@ public: { (void) tracker; std::vector<MaintenanceOperation::SP> ret; - ret.push_back(MaintenanceOperation::SP(new MockOperation(bucket))); + ret.emplace_back(std::make_shared<MockOperation>(bucket)); return ret; } diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index ccd70309a88..852c2ef8754 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/operations/idealstate/mergeoperation.h> #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/operation_sequencer.h> #include <tests/distributor/distributortestutil.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/vespalib/gtest/gtest.h> @@ -17,11 +18,13 @@ namespace storage::distributor { struct MergeOperationTest : Test, DistributorTestUtil { std::unique_ptr<PendingMessageTracker> _pendingTracker; + OperationSequencer _operation_sequencer; void SetUp() override { createLinks(); _pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister()); _sender.setPendingMessageTracker(*_pendingTracker); + _sender.set_operation_sequencer(_operation_sequencer); } void TearDown() override { @@ -397,18 +400,31 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) { // Should not block on nodes _not_ included in operation node set _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10)); - EXPECT_FALSE(op.isBlocked(*_pendingTracker)); + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // Node 1 is included in operation node set and should cause a block _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker)); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); getClock().addSecondsToTime(11); - EXPECT_FALSE(op.isBlocked(*_pendingTracker)); // No longer busy + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy // Should block on other operation nodes than the first listed as well _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker)); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); +} + +TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { + getClock().setAbsoluteTimeInSeconds(10); + addNodesToBucketDB(document::BucketId(16, 1), "0=10/1/1/t,1=20/1/1,2=10/1/1/t"); + enableDistributorClusterState("distributor:1 storage:3"); + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1))); + EXPECT_TRUE(token.valid()); + EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); } TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) { diff --git a/storage/src/tests/distributor/operation_sequencer_test.cpp b/storage/src/tests/distributor/operation_sequencer_test.cpp index 37651be05a4..7c9a22b5226 100644 --- a/storage/src/tests/distributor/operation_sequencer_test.cpp +++ b/storage/src/tests/distributor/operation_sequencer_test.cpp @@ -61,8 +61,11 @@ TEST_F(OperationSequencerTest, releasing_handle_allows_for_getting_new_handles_f } TEST_F(OperationSequencerTest, cannot_get_handle_for_gid_contained_in_locked_bucket) { - auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1)); + EXPECT_FALSE(sequencer.is_blocked(bucket)); + auto bucket_handle = sequencer.try_acquire(bucket); EXPECT_TRUE(bucket_handle.valid()); + EXPECT_TRUE(sequencer.is_blocked(bucket)); auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); EXPECT_FALSE(doc_handle.valid()); ASSERT_TRUE(doc_handle.is_blocked()); @@ -78,10 +81,12 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_not_contained_in_active_bu } TEST_F(OperationSequencerTest, releasing_bucket_lock_allows_gid_handles_to_be_acquired) { - auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + const auto bucket = document::Bucket(default_space(), document::BucketId(16, 1)); + auto bucket_handle = sequencer.try_acquire(bucket); bucket_handle.release(); auto doc_handle = sequencer.try_acquire(default_space(), DocumentId("id:foo:test:n=1:abcd")); EXPECT_TRUE(doc_handle.valid()); + EXPECT_FALSE(sequencer.is_blocked(bucket)); } TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_separate_bucket_space) { @@ -91,4 +96,9 @@ TEST_F(OperationSequencerTest, can_get_handle_for_gid_when_locked_bucket_is_in_s EXPECT_TRUE(doc_handle.valid()); } +TEST_F(OperationSequencerTest, is_blocked_is_bucket_space_aware) { + auto bucket_handle = sequencer.try_acquire(document::Bucket(default_space(), document::BucketId(16, 1))); + EXPECT_FALSE(sequencer.is_blocked(document::Bucket(global_space(), document::BucketId(16, 1)))); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index c876f3e7ee4..9bb2e8b04a7 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -1,13 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/dummystoragelink.h> -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/storage/distributor/operations/idealstate/splitoperation.h> #include <vespa/document/base/documentid.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storage/distributor/idealstatemanager.h> -#include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/distributor.h> +#include <vespa/storage/distributor/idealstatemanager.h> +#include <vespa/storage/distributor/operation_sequencer.h> +#include <vespa/storage/distributor/operations/idealstate/splitoperation.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/storageapi/message/persistence.h> +#include <tests/common/dummystoragelink.h> +#include <tests/distributor/distributortestutil.h> #include <vespa/vespalib/gtest/gtest.h> using document::test::makeDocumentBucket; @@ -260,6 +261,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg); + OperationSequencer op_seq; enableDistributorClusterState("distributor:1 storage:2"); @@ -281,17 +283,39 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { splitCount, splitByteSize); - EXPECT_TRUE(op.isBlocked(tracker)); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); // Now, pretend there's a join for another node in the same bucket. This // will happen when a join is partially completed. tracker.clearMessagesForNode(0); - EXPECT_FALSE(op.isBlocked(tracker)); + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); joinCmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); tracker.insert(joinCmd); - EXPECT_TRUE(op.isBlocked(tracker)); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); +} + +TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { + StorageComponentRegisterImpl compReg; + framework::defaultimplementation::FakeClock clock; + compReg.setClock(clock); + clock.setAbsoluteTimeInSeconds(1); + PendingMessageTracker tracker(compReg); + OperationSequencer op_seq; + + enableDistributorClusterState("distributor:1 storage:2"); + + document::BucketId source_bucket(16, 1); + insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, 250); + + SplitOperation op("storage", BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)), + maxSplitBits, splitCount, splitByteSize); + + EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket)); + EXPECT_TRUE(token.valid()); + EXPECT_TRUE(op.isBlocked(tracker, op_seq)); } } // storage::distributor |