diff options
Diffstat (limited to 'storage/src')
29 files changed, 223 insertions, 139 deletions
diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp index 5203fec2462..861f8e72832 100644 --- a/storage/src/tests/distributor/blockingoperationstartertest.cpp +++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storage/distributor/blockingoperationstarter.h> +#include <vespa/storage/distributor/distributor_stripe_operation_context.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/distributor/operation_sequencer.h> #include <tests/distributor/maintenancemocks.h> @@ -13,6 +14,86 @@ using namespace ::testing; namespace storage::distributor { +struct FakeDistributorStripeOperationContext : public DistributorStripeOperationContext { + + PendingMessageTracker& _message_tracker; + + explicit FakeDistributorStripeOperationContext(PendingMessageTracker& message_tracker) + : _message_tracker(message_tracker) + {} + + ~FakeDistributorStripeOperationContext() override = default; + + // From DistributorOperationContext: + api::Timestamp generate_unique_timestamp() override { + abort(); + } + const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override { + abort(); + } + DistributorBucketSpaceRepo& bucket_space_repo() noexcept override { + abort(); + } + const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override { + abort(); + } + DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override { + abort(); + } + const DistributorConfiguration& distributor_config() const noexcept override { + abort(); + } + // From DistributorStripeOperationContext: + void update_bucket_database(const document::Bucket&, const BucketCopy&, uint32_t) override { + abort(); + } + void update_bucket_database(const document::Bucket&, const std::vector<BucketCopy>&, uint32_t) override { + abort(); + } + void remove_node_from_bucket_database(const document::Bucket&, uint16_t) override { + abort(); + } + void remove_nodes_from_bucket_database(const document::Bucket&, const std::vector<uint16_t>&) override { + abort(); + } + document::BucketId make_split_bit_constrained_bucket_id(const document::DocumentId&) const override { + abort(); + } + void recheck_bucket_info(uint16_t, const document::Bucket&) override { + abort(); + } + document::BucketId get_sibling(const document::BucketId&) const override { + abort(); + } + void send_inline_split_if_bucket_too_large(document::BucketSpace, const BucketDatabase::Entry&, uint8_t) override { + abort(); + } + OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket&) const override { + abort(); + } + PendingMessageTracker& pending_message_tracker() noexcept override { + return _message_tracker; + } + const PendingMessageTracker& pending_message_tracker() const noexcept override { + return _message_tracker; + } + bool has_pending_message(uint16_t, const document::Bucket&, uint32_t) const override { + abort(); + } + const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace&) const override { + abort(); + } + const lib::ClusterStateBundle& cluster_state_bundle() const override { + abort(); + } + bool storage_node_is_up(document::BucketSpace, uint32_t) const override { + abort(); + } + const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override { + abort(); + } +}; + struct BlockingOperationStarterTest : Test { std::shared_ptr<Operation> createMockOperation() { return std::make_shared<MockOperation>(makeDocumentBucket(BucketId(16, 1))); @@ -27,6 +108,7 @@ struct BlockingOperationStarterTest : Test { std::unique_ptr<MockOperationStarter> _starterImpl; std::unique_ptr<StorageComponentRegisterImpl> _compReg; std::unique_ptr<PendingMessageTracker> _messageTracker; + std::unique_ptr<FakeDistributorStripeOperationContext> _fake_ctx; std::unique_ptr<OperationSequencer> _operation_sequencer; std::unique_ptr<BlockingOperationStarter> _operationStarter; @@ -41,8 +123,9 @@ BlockingOperationStarterTest::SetUp() _compReg->setClock(_clock); _clock.setAbsoluteTimeInSeconds(1); _messageTracker = std::make_unique<PendingMessageTracker>(*_compReg); + _fake_ctx = std::make_unique<FakeDistributorStripeOperationContext>(*_messageTracker); _operation_sequencer = std::make_unique<OperationSequencer>(); - _operationStarter = std::make_unique<BlockingOperationStarter>(*_messageTracker, *_operation_sequencer, *_starterImpl); + _operationStarter = std::make_unique<BlockingOperationStarter>(*_fake_ctx, *_operation_sequencer, *_starterImpl); } TEST_F(BlockingOperationStarterTest, operation_not_blocked_when_no_messages_pending) { diff --git a/storage/src/tests/distributor/distributor_message_sender_stub.h b/storage/src/tests/distributor/distributor_message_sender_stub.h index 59a5a82b7df..18662fbce8f 100644 --- a/storage/src/tests/distributor/distributor_message_sender_stub.h +++ b/storage/src/tests/distributor/distributor_message_sender_stub.h @@ -87,6 +87,11 @@ public: return dummy_cluster_context; } + distributor::PendingMessageTracker& getPendingMessageTracker() override { + assert(_pending_message_tracker); + return *_pending_message_tracker; + } + const distributor::PendingMessageTracker& getPendingMessageTracker() const override { assert(_pending_message_tracker); return *_pending_message_tracker; diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 3d1c6165946..9c3686d3614 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -222,7 +222,6 @@ struct DistributorTest : Test, DistributorTestUtil { } void configureMaxClusterClockSkew(int seconds); - void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); void sendDownDummyRemoveCommand(); void assertSingleBouncedRemoveReplyPresent(); @@ -795,12 +794,6 @@ auto make_dummy_get_command_for_bucket_1() { } -void DistributorTest::sendDownClusterStateCommand() { - lib::ClusterState newState("bits:1 storage:1 distributor:1"); - auto stateCmd = std::make_shared<api::SetSystemStateCommand>(newState); - _distributor->handleMessage(stateCmd); -} - void DistributorTest::replyToSingleRequestBucketInfoCommandWith1Bucket() { ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); for (uint32_t i = 0; i < _sender.commands().size(); ++i) { @@ -847,7 +840,7 @@ TEST_F(DistributorTest, configured_safe_time_point_rejection_works_end_to_end) { getClock().setAbsoluteTimeInSeconds(1000); configureMaxClusterClockSkew(10); - sendDownClusterStateCommand(); + receive_set_system_state_command("bits:1 storage:1 distributor:1"); ASSERT_NO_FATAL_FAILURE(replyToSingleRequestBucketInfoCommandWith1Bucket()); // SetSystemStateCommand sent down chain at this point. sendDownDummyRemoveCommand(); diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index a2f32d8faa2..3ec1c95b206 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -101,6 +101,19 @@ DistributorTestUtil::triggerDistributionChange(lib::Distribution::SP distr) } void +DistributorTestUtil::receive_set_system_state_command(const vespalib::string& state_str) +{ + auto state_cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + _distributor->handleMessage(state_cmd); // TODO move semantics +} + +void +DistributorTestUtil::handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg) +{ + _distributor->handleMessage(msg); +} + +void DistributorTestUtil::setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo) { _node->getComponentRegister().setDocumentTypeRepo(repo); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 6664b8d823d..533fd49811f 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -202,6 +202,12 @@ public: void setSystemState(const lib::ClusterState& systemState); + // Invokes full cluster state transition pipeline rather than directly applying + // the state and just pretending everything has been completed. + void receive_set_system_state_command(const vespalib::string& state_str); + + void handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg); + // Must be called prior to createLinks() to have any effect void set_num_distributor_stripes(uint32_t n_stripes) noexcept { _num_distributor_stripes = n_stripes; diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index 0a36e5cd0e5..e38e4b5b668 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -40,18 +40,18 @@ struct IdealStateManagerTest : Test, DistributorTestUtil { bool checkBlock(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { - return op.checkBlock(bucket, tracker, op_seq); + return op.checkBlock(bucket, ctx, op_seq); } bool checkBlockForAllNodes(const IdealStateOperation& op, const document::Bucket& bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { - return op.checkBlockForAllNodes(bucket, tracker, op_seq); + return op.checkBlockForAllNodes(bucket, ctx, op_seq); } std::vector<document::BucketSpace> _bucketSpaces; @@ -170,92 +170,86 @@ TEST_F(IdealStateManagerTest, recheck_when_active) { active_ideal_state_operations()); } -TEST_F(IdealStateManagerTest, block_ideal_state_ops_on_full_request_bucket_info) { +/** + * Don't schedule ideal state operations when there's a pending cluster state. + * This subsumes the legacy behavior of blocking ideal state ops when there is a + * zero-bucket RequestBucketInfoCommand pending towards a node (i.e. full bucket + * info fetch). + * + * This is for two reasons: + * - Avoids race conditions where we change the bucket set concurrently with + * requesting bucket info. + * - Once we get updated bucket info it's likely that the set of ideal state ops + * to execute will change anyway, so it makes sense to wait until it's ready. + */ +TEST_F(IdealStateManagerTest, block_ideal_state_ops_when_pending_cluster_state_is_present) { + + setupDistributor(2, 10, "version:1 distributor:1 storage:1 .0.s:d"); + + // Trigger a pending cluster state with bucket info requests towards 1 node + receive_set_system_state_command("version:2 distributor:1 storage:1"); - setupDistributor(2, 10, "distributor:1 storage:2"); - - framework::defaultimplementation::FakeClock clock; - PendingMessageTracker tracker(_node->getComponentRegister()); OperationSequencer op_seq; - document::BucketId bid(16, 1234); - std::vector<document::BucketId> buckets; - - // RequestBucketInfoCommand does not have a specific bucketid since it's - // sent to the entire node. It will then use a null bucketid. - { - auto msg = std::make_shared<api::RequestBucketInfoCommand>(makeBucketSpace(), buckets); - msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4)); - tracker.insert(msg); - } { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 4))); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); - } - - { - // Don't trigger on requests to other nodes. - RemoveBucketOperation op(dummy_cluster_context, - BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(3, 5))); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } - // Don't block on null-bucket messages that aren't RequestBucketInfo. - { - auto msg = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "foo", "bar", "baz"); - msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 7)); - tracker.insert(msg); + // Clear pending by replying with zero buckets for all bucket spaces + ASSERT_EQ(_bucketSpaces.size(), _sender.commands().size()); + for (uint32_t i = 0; i < _sender.commands().size(); ++i) { + auto& bucket_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.command(i)); + handle_top_level_message(bucket_req.makeReply()); } { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); } } TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) { setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; - PendingMessageTracker tracker(_node->getComponentRegister()); OperationSequencer op_seq; document::BucketId bid(16, 1234); { auto msg = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(bid)); msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 4)); - tracker.insert(msg); + pending_message_tracker().insert(msg); } { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); // Not blocked for exact node match. - EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), tracker, op_seq)); + EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq)); // But blocked for bucket match! - EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), tracker, op_seq)); + EXPECT_TRUE(checkBlockForAllNodes(op, makeDocumentBucket(bid), operation_context(), op_seq)); } } TEST_F(IdealStateManagerTest, block_operations_with_locked_buckets) { setupDistributor(2, 10, "distributor:1 storage:2"); framework::defaultimplementation::FakeClock clock; - PendingMessageTracker tracker(_node->getComponentRegister()); OperationSequencer op_seq; const auto bucket = makeDocumentBucket(document::BucketId(16, 1234)); { auto msg = std::make_shared<api::JoinBucketsCommand>(bucket); msg->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1)); - tracker.insert(msg); + pending_message_tracker().insert(msg); } auto token = op_seq.try_acquire(bucket, "foo"); EXPECT_TRUE(token.valid()); { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(bucket, toVector<uint16_t>(0))); - EXPECT_TRUE(checkBlock(op, bucket, tracker, op_seq)); - EXPECT_TRUE(checkBlockForAllNodes(op, bucket, tracker, op_seq)); + EXPECT_TRUE(checkBlock(op, bucket, operation_context(), op_seq)); + EXPECT_TRUE(checkBlockForAllNodes(op, bucket, operation_context(), op_seq)); } } diff --git a/storage/src/tests/distributor/maintenancemocks.h b/storage/src/tests/distributor/maintenancemocks.h index fff798d4413..1245c9bb15d 100644 --- a/storage/src/tests/distributor/maintenancemocks.h +++ b/storage/src/tests/distributor/maintenancemocks.h @@ -51,7 +51,7 @@ public: } void onStart(DistributorStripeMessageSender&) override {} void onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply>&) override {} - bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override { + bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override { return _shouldBlock; } void setShouldBlock(bool shouldBlock) { diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 1026ea2855e..52a8bfc41b6 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -18,13 +18,11 @@ using namespace ::testing; namespace storage::distributor { struct MergeOperationTest : Test, DistributorTestUtil { - std::unique_ptr<PendingMessageTracker> _pendingTracker; OperationSequencer _operation_sequencer; void SetUp() override { createLinks(); - _pendingTracker = std::make_unique<PendingMessageTracker>(getComponentRegister()); - _sender.setPendingMessageTracker(*_pendingTracker); + _sender.setPendingMessageTracker(pending_message_tracker()); _sender.set_operation_sequencer(_operation_sequencer); } @@ -256,7 +254,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) { makeDocumentBucket(bucket), api::SetBucketStateCommand::ACTIVE); vespalib::string storage("storage"); msg->setAddress(api::StorageMessageAddress::create(&storage, lib::NodeType::STORAGE, 1)); - _pendingTracker->insert(msg); + pending_message_tracker().insert(msg); sendReply(op); // Should not be a remove here! @@ -400,19 +398,19 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_any_busy_target_node) { op.setIdealStateManager(&getIdealStateManager()); // Should not block on nodes _not_ included in operation node set - _pendingTracker->getNodeInfo().setBusy(3, std::chrono::seconds(10)); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(3, std::chrono::seconds(10)); + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); // Node 1 is included in operation node set and should cause a block - _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10)); + EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer)); getClock().addSecondsToTime(11); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); // No longer busy + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); // No longer busy // Should block on other operation nodes than the first listed as well - _pendingTracker->getNodeInfo().setBusy(1, std::chrono::seconds(10)); - EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(1, std::chrono::seconds(10)); + EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer)); } @@ -426,8 +424,8 @@ TEST_F(MergeOperationTest, global_bucket_merges_are_not_blocked_by_busy_nodes) { op.setIdealStateManager(&getIdealStateManager()); // Node 1 is included in operation node set but should not cause a block of global bucket merge - _pendingTracker->getNodeInfo().setBusy(0, std::chrono::seconds(10)); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + pending_message_tracker().getNodeInfo().setBusy(0, std::chrono::seconds(10)); + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); } TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { @@ -437,10 +435,10 @@ TEST_F(MergeOperationTest, merge_operation_is_blocked_by_locked_bucket) { MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); op.setIdealStateManager(&getIdealStateManager()); - EXPECT_FALSE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); auto token = _operation_sequencer.try_acquire(makeDocumentBucket(document::BucketId(16, 1)), "foo"); EXPECT_TRUE(token.valid()); - EXPECT_TRUE(op.isBlocked(*_pendingTracker, _operation_sequencer)); + EXPECT_TRUE(op.isBlocked(operation_context(), _operation_sequencer)); } TEST_F(MergeOperationTest, missing_replica_is_included_in_limited_node_list) { diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index 8c8da1bb197..ec58992ed3e 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -261,7 +261,6 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); - PendingMessageTracker tracker(compReg); OperationSequencer op_seq; enableDistributorClusterState("distributor:1 storage:2"); @@ -274,7 +273,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { joinCmd->getSourceBuckets() = joinSources; joinCmd->setAddress(_Storage0Address); - tracker.insert(joinCmd); + pending_message_tracker().insert(joinCmd); insertBucketInfo(joinTarget, 0, 0xabc, 1000, 1234, true); @@ -284,18 +283,18 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { splitCount, splitByteSize); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); // Now, pretend there's a join for another node in the same bucket. This // will happen when a join is partially completed. - tracker.clearMessagesForNode(0); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + pending_message_tracker().clearMessagesForNode(0); + EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); joinCmd->setAddress(api::StorageMessageAddress::create(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 1)); - tracker.insert(joinCmd); + pending_message_tracker().insert(joinCmd); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { @@ -303,7 +302,6 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); - PendingMessageTracker tracker(compReg); OperationSequencer op_seq; enableDistributorClusterState("distributor:1 storage:2"); @@ -314,10 +312,10 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)), maxSplitBits, splitCount, splitByteSize); - EXPECT_FALSE(op.isBlocked(tracker, op_seq)); + EXPECT_FALSE(op.isBlocked(operation_context(), op_seq)); auto token = op_seq.try_acquire(makeDocumentBucket(source_bucket), "foo"); EXPECT_TRUE(token.valid()); - EXPECT_TRUE(op.isBlocked(tracker, op_seq)); + EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp index 25c38888098..e9b53e35b61 100644 --- a/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp +++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.cpp @@ -7,7 +7,7 @@ namespace storage::distributor { bool BlockingOperationStarter::start(const std::shared_ptr<Operation>& operation, Priority priority) { - if (operation->isBlocked(_messageTracker, _operation_sequencer)) { + if (operation->isBlocked(_operation_context, _operation_sequencer)) { return true; } return _starterImpl.start(operation, priority); diff --git a/storage/src/vespa/storage/distributor/blockingoperationstarter.h b/storage/src/vespa/storage/distributor/blockingoperationstarter.h index e79ae6b4a79..180e617d08d 100644 --- a/storage/src/vespa/storage/distributor/blockingoperationstarter.h +++ b/storage/src/vespa/storage/distributor/blockingoperationstarter.h @@ -6,16 +6,16 @@ namespace storage::distributor { -class PendingMessageTracker; +class DistributorStripeOperationContext; class OperationSequencer; class BlockingOperationStarter : public OperationStarter { public: - BlockingOperationStarter(PendingMessageTracker& messageTracker, + BlockingOperationStarter(DistributorStripeOperationContext& ctx, OperationSequencer& operation_sequencer, OperationStarter& starterImpl) - : _messageTracker(messageTracker), + : _operation_context(ctx), _operation_sequencer(operation_sequencer), _starterImpl(starterImpl) {} @@ -24,9 +24,9 @@ public: bool start(const std::shared_ptr<Operation>& operation, Priority priority) override; private: - PendingMessageTracker& _messageTracker; - OperationSequencer& _operation_sequencer; - OperationStarter& _starterImpl; + DistributorStripeOperationContext& _operation_context; + OperationSequencer& _operation_sequencer; + OperationStarter& _starterImpl; }; } diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index aa598835cdb..e0d481a322a 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -20,7 +20,7 @@ public: virtual ~DistributorOperationContext() {} virtual api::Timestamp generate_unique_timestamp() = 0; // TODO STRIPE: Access to bucket space repos is only temporary at this level. - virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept= 0; + virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0; virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0; virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0; virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index bf78707cfd9..4f6e2d5016b 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -71,7 +71,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _bucketPriorityDb(std::make_unique<SimpleBucketPriorityDatabase>()), _scanner(std::make_unique<SimpleMaintenanceScanner>(*_bucketPriorityDb, _idealStateManager, *_bucketSpaceRepo)), _throttlingStarter(std::make_unique<ThrottlingOperationStarter>(_maintenanceOperationOwner)), - _blockingStarter(std::make_unique<BlockingOperationStarter>(_pendingMessageTracker, *_operation_sequencer, + _blockingStarter(std::make_unique<BlockingOperationStarter>(_component, *_operation_sequencer, *_throttlingStarter)), _scheduler(std::make_unique<MaintenanceScheduler>(_idealStateManager, *_bucketPriorityDb, *_blockingStarter)), _schedulingMode(MaintenanceScheduler::NORMAL_SCHEDULING_MODE), diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.h b/storage/src/vespa/storage/distributor/distributor_stripe_component.h index 31ee9ca88d2..e47d73cc4df 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_component.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.h @@ -140,6 +140,9 @@ public: PendingMessageTracker& pending_message_tracker() noexcept override { return getDistributor().getPendingMessageTracker(); } + const PendingMessageTracker& pending_message_tracker() const noexcept override { + return getDistributor().getPendingMessageTracker(); + } bool has_pending_message(uint16_t node_index, const document::Bucket& bucket, uint32_t message_type) const override; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h index bd9a4e1de57..24db212c120 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h @@ -24,7 +24,6 @@ class PendingMessageTracker; class DistributorStripeInterface : public DistributorStripeMessageSender { public: - virtual PendingMessageTracker& getPendingMessageTracker() = 0; virtual DistributorMetricSet& getMetrics() = 0; virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0; virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h index 8419abeadaa..518c83d7ffa 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h @@ -22,7 +22,7 @@ class PendingMessageTracker; */ class DistributorStripeOperationContext : public DistributorOperationContext { public: - virtual ~DistributorStripeOperationContext() {} + virtual ~DistributorStripeOperationContext() = default; virtual void update_bucket_database(const document::Bucket& bucket, const BucketCopy& changed_node, uint32_t update_flags = 0) = 0; @@ -41,6 +41,7 @@ public: uint8_t pri) = 0; virtual OperationRoutingSnapshot read_snapshot_for_bucket(const document::Bucket& bucket) const = 0; virtual PendingMessageTracker& pending_message_tracker() noexcept = 0; + virtual const PendingMessageTracker& pending_message_tracker() const noexcept = 0; virtual bool has_pending_message(uint16_t node_index, const document::Bucket& bucket, uint32_t message_type) const = 0; diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.h b/storage/src/vespa/storage/distributor/distributormessagesender.h index c39e3e8fe8a..c5a164ed036 100644 --- a/storage/src/vespa/storage/distributor/distributormessagesender.h +++ b/storage/src/vespa/storage/distributor/distributormessagesender.h @@ -26,6 +26,7 @@ public: class DistributorStripeMessageSender : public DistributorMessageSender { public: + virtual PendingMessageTracker& getPendingMessageTracker() = 0; virtual const PendingMessageTracker& getPendingMessageTracker() const = 0; virtual const OperationSequencer& operation_sequencer() const noexcept = 0; }; diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index e703c5bfdb8..2bc779aa47e 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -54,6 +54,9 @@ public: const ClusterContext & cluster_context() const override { return _node_ctx; } + PendingMessageTracker& getPendingMessageTracker() override { + abort(); // Never called by the messages using this component. + } const PendingMessageTracker& getPendingMessageTracker() const override { abort(); // Never called by the messages using this component. } diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h index d3f46343ebc..c469b35a8dc 100644 --- a/storage/src/vespa/storage/distributor/operationowner.h +++ b/storage/src/vespa/storage/distributor/operationowner.h @@ -43,6 +43,10 @@ public: return _sender.cluster_context(); } + PendingMessageTracker& getPendingMessageTracker() override { + return _sender.getPendingMessageTracker(); + } + const PendingMessageTracker& getPendingMessageTracker() const override { return _sender.getPendingMessageTracker(); } diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 9077f3dc288..db30fcc7196 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -86,6 +86,10 @@ struct IntermediateMessageSender : DistributorStripeMessageSender { return forward.cluster_context(); } + PendingMessageTracker& getPendingMessageTracker() override { + return forward.getPendingMessageTracker(); + } + const PendingMessageTracker& getPendingMessageTracker() const override { return forward.getPendingMessageTracker(); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index 1a48df0fd7c..f11d1c26da2 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -156,41 +156,26 @@ public: } }; -// TODO STRIPE replace with check for pending cluster state transition. -// Null-bucket messages are not intercepted nor observeable by stripes, -// only by the top-level distributor. -bool -checkNullBucketRequestBucketInfoMessage(uint16_t node, - document::BucketSpace bucketSpace, - const PendingMessageTracker& tracker) -{ - RequestBucketInfoChecker rchk; - // Check messages sent to null-bucket (i.e. any bucket) for the node. - document::Bucket nullBucket(bucketSpace, document::BucketId()); - tracker.checkPendingMessages(node, nullBucket, rchk); - return rchk.blocked; -} - } bool IdealStateOperation::checkBlock(const document::Bucket &bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer& seq) const { if (seq.is_blocked(bucket)) { return true; } + if (ctx.pending_cluster_state_or_null(bucket.getBucketSpace())) { + return true; + } IdealStateOpChecker ichk(*this); const std::vector<uint16_t>& nodes(getNodes()); for (auto node : nodes) { - tracker.checkPendingMessages(node, bucket, ichk); + ctx.pending_message_tracker().checkPendingMessages(node, bucket, ichk); if (ichk.blocked) { return true; } - if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) { - return true; - } } return false; } @@ -198,32 +183,25 @@ IdealStateOperation::checkBlock(const document::Bucket &bucket, bool IdealStateOperation::checkBlockForAllNodes( const document::Bucket &bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer& seq) const { if (seq.is_blocked(bucket)) { return true; } - IdealStateOpChecker ichk(*this); - // Check messages sent to _any node_ for _this_ particular bucket. - tracker.checkPendingMessages(bucket, ichk); - if (ichk.blocked) { + if (ctx.pending_cluster_state_or_null(bucket.getBucketSpace())) { return true; } - const std::vector<uint16_t>& nodes(getNodes()); - for (auto node : nodes) { - if (checkNullBucketRequestBucketInfoMessage(node, bucket.getBucketSpace(), tracker)) { - return true; - } - } - return false; + IdealStateOpChecker ichk(*this); + // Check messages sent to _any node_ for _this_ particular bucket. + ctx.pending_message_tracker().checkPendingMessages(bucket, ichk); + return ichk.blocked; } - bool -IdealStateOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const +IdealStateOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { - return checkBlock(getBucket(), tracker, op_seq); + return checkBlock(getBucket(), ctx, op_seq); } std::string diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index 0e45d7f3b3a..d41640b468e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -182,7 +182,7 @@ public: * Returns true if we are blocked to start this operation given * the pending messages. */ - bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override; + bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override; /** Returns the priority we should send messages with. @@ -234,10 +234,10 @@ protected: * the set of messages checked. */ bool checkBlock(const document::Bucket& bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer&) const; bool checkBlockForAllNodes(const document::Bucket& bucket, - const PendingMessageTracker& tracker, + const DistributorStripeOperationContext& ctx, const OperationSequencer&) const; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index d9e411bc44e..15d3129b309 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -161,10 +161,10 @@ JoinOperation::getJoinBucket(size_t idx) const } bool -JoinOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const +JoinOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { - return (checkBlock(getBucket(), tracker, op_seq) || - checkBlock(getJoinBucket(0), tracker, op_seq) || - (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), tracker, op_seq))); + return (checkBlock(getBucket(), ctx, op_seq) || + checkBlock(getJoinBucket(0), ctx, op_seq) || + (_bucketsToJoin.size() > 1 && checkBlock(getJoinBucket(1), ctx, op_seq))); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h index 5796b8d3fa1..4515092cfef 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.h @@ -35,7 +35,7 @@ public: return JOIN_BUCKET; } - bool isBlocked(const PendingMessageTracker& pendingMessages, + bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const override; protected: diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 27e203a9060..749787c51b9 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -235,7 +235,7 @@ MergeOperation::deleteSourceOnlyNodes( BucketAndNodes(getBucket(), sourceOnlyNodes)); // Must not send removes to source only copies if something has caused // pending load to the copy after the merge was sent! - if (_removeOperation->isBlocked(sender.getPendingMessageTracker(), sender.operation_sequencer())) { + if (_removeOperation->isBlocked(_manager->operation_context(), sender.operation_sequencer())) { LOG(debug, "Source only removal for %s was blocked by a pending operation", getBucketId().toString().c_str()); _ok = false; @@ -324,7 +324,7 @@ bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) return IdealStateOperation::shouldBlockThisOperation(messageType, pri); } -bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker, +bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { // To avoid starvation of high priority global bucket merges, we do not consider // these for blocking due to a node being "busy" (usually caused by a full merge @@ -338,14 +338,14 @@ bool MergeOperation::isBlocked(const PendingMessageTracker& pending_tracker, // 2. Global bucket merges have high priority and will most likely be allowed // to enter the merge throttler queues, displacing lower priority merges. if (!is_global_bucket_merge()) { - const auto& node_info = pending_tracker.getNodeInfo(); + const auto& node_info = ctx.pending_message_tracker().getNodeInfo(); for (auto node : getNodes()) { if (node_info.isBusy(node)) { return true; } } } - return IdealStateOperation::isBlocked(pending_tracker, op_seq); + return IdealStateOperation::isBlocked(ctx, op_seq); } bool MergeOperation::is_global_bucket_merge() const noexcept { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 11b5494fd9b..945b9318482 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -47,7 +47,7 @@ public: std::vector<MergeMetaData>&); bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override; - bool isBlocked(const PendingMessageTracker& pendingMessages, const OperationSequencer&) const override; + bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override; private: static void addIdealNodes( const std::vector<uint16_t>& idealNodes, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 437c4ed6033..d7f03740e4d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -143,9 +143,9 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep } bool -SplitOperation::isBlocked(const PendingMessageTracker& tracker, const OperationSequencer& op_seq) const +SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const { - return checkBlockForAllNodes(getBucket(), tracker, op_seq); + return checkBlockForAllNodes(getBucket(), ctx, op_seq); } bool diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h index eccbdc69869..5581edf41bd 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h @@ -20,7 +20,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "split"; }; Type getType() const override { return SPLIT_BUCKET; } - bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const override; + bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override; bool shouldBlockThisOperation(uint32_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index 5099762fd6a..18f7214c498 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -16,6 +16,7 @@ class StorageComponent; namespace distributor { +class DistributorStripeOperationContext; class PendingMessageTracker; class OperationSequencer; @@ -61,7 +62,7 @@ public: * Returns true if we are blocked to start this operation given * the pending messages. */ - virtual bool isBlocked(const PendingMessageTracker&, const OperationSequencer&) const { + virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const { return false; } |