diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-09-08 14:25:56 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-08 14:25:56 +0200 |
commit | 97bc82e63d2dd0b008bcccea9acd7b78ffe2a6ce (patch) | |
tree | e5dd49807a6d33b3355a5da5302fce20fbb17990 | |
parent | e6919f1aaad79b2ee19448735e86214ff91bca6d (diff) | |
parent | 2aa0216ae69d33847a2e6db79722d19b49ff5744 (diff) |
Merge pull request #28441 from vespa-engine/vekterli/expand-operation-cancellation-support
Wire distributor operation cancelling to cluster state/config change edges
48 files changed, 976 insertions, 327 deletions
diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp index c9fab0b37e5..44da88b4587 100644 --- a/storage/src/tests/distributor/bucketstateoperationtest.cpp +++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp @@ -201,4 +201,27 @@ TEST_F(BucketStateOperationTest, bucket_db_not_updated_on_failure) { EXPECT_FALSE(op.ok()); } +TEST_F(BucketStateOperationTest, cancelled_node_does_not_update_bucket_db) { + document::BucketId bid(16, 1); + insertBucketInfo(bid, 0, 0xabc, 10, 1100, true, false); + + BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0)); + std::vector<uint16_t> active = {0}; + SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active); + + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender); + op.cancel(_sender, CancelScope::of_node_subset({0})); + + ASSERT_EQ(_sender.commands().size(), 1); + std::shared_ptr<api::StorageCommand> msg = _sender.command(0); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); + op.receive(_sender, reply); + + BucketDatabase::Entry entry = getBucket(bid); + ASSERT_TRUE(entry.valid()); + EXPECT_FALSE(entry->getNodeRef(0).active()); // Should not be updated + EXPECT_FALSE(op.ok()); +} + } // namespace storage::distributor diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index 566fb704105..c87f5133997 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -14,6 +14,7 @@ #include <vespa/storageapi/message/visitor.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/util/stringfmt.h> #include <gmock/gmock.h> using document::Bucket; @@ -47,8 +48,8 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { // Simple type aliases to make interfacing with certain utility functions // easier. Note that this is only for readability and does not provide any // added type safety. - using NodeCount = int; - using Redundancy = int; + using NodeCount = uint16_t; + using Redundancy = uint16_t; using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; @@ -137,82 +138,114 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { return _stripe->handleMessage(msg); } - void configure_stale_reads_enabled(bool enabled) { + template <typename Func> + void configure_stripe_with(Func f) { ConfigBuilder builder; - builder.allowStaleReadsDuringClusterStateTransitions = enabled; + f(builder); configure_stripe(builder); } + void configure_stale_reads_enabled(bool enabled) { + configure_stripe_with([&](auto& builder) { + builder.allowStaleReadsDuringClusterStateTransitions = enabled; + }); + } + void configure_update_fast_path_restart_enabled(bool enabled) { - ConfigBuilder builder; - builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled; + }); } void configure_merge_operations_disabled(bool disabled) { - ConfigBuilder builder; - builder.mergeOperationsDisabled = disabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.mergeOperationsDisabled = disabled; + }); } void configure_use_weak_internal_read_consistency(bool use_weak) { - ConfigBuilder builder; - builder.useWeakInternalReadConsistencyForClientGets = use_weak; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.useWeakInternalReadConsistencyForClientGets = use_weak; + }); } void configure_metadata_update_phase_enabled(bool enabled) { - ConfigBuilder builder; - builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled; + }); } void configure_prioritize_global_bucket_merges(bool enabled) { - ConfigBuilder builder; - builder.prioritizeGlobalBucketMerges = enabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.prioritizeGlobalBucketMerges = enabled; + }); } void configure_max_activation_inhibited_out_of_sync_groups(uint32_t n_groups) { - ConfigBuilder builder; - builder.maxActivationInhibitedOutOfSyncGroups = n_groups; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.maxActivationInhibitedOutOfSyncGroups = n_groups; + }); } void configure_implicitly_clear_priority_on_schedule(bool implicitly_clear) { - ConfigBuilder builder; - builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear; + }); } void configure_use_unordered_merge_chaining(bool use_unordered) { - ConfigBuilder builder; - builder.useUnorderedMergeChaining = use_unordered; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.useUnorderedMergeChaining = use_unordered; + }); } void configure_enable_two_phase_garbage_collection(bool use_two_phase) { - ConfigBuilder builder; - builder.enableTwoPhaseGarbageCollection = use_two_phase; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.enableTwoPhaseGarbageCollection = use_two_phase; + }); } void configure_enable_condition_probing(bool enable_probing) { - ConfigBuilder builder; - builder.enableConditionProbing = enable_probing; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.enableConditionProbing = enable_probing; + }); + } + + void configure_enable_operation_cancellation(bool enable_cancellation) { + configure_stripe_with([&](auto& builder) { + builder.enableOperationCancellation = enable_cancellation; + }); } - bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { + [[nodiscard]] bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { return _stripe->_scheduler->implicitly_clear_priority_on_schedule(); } + [[nodiscard]] bool distributor_owns_bucket_in_current_and_pending_states(document::BucketId bucket_id) const { + return (getDistributorBucketSpace().get_bucket_ownership_flags(bucket_id).owned_in_pending_state() && + getDistributorBucketSpace().check_ownership_in_pending_and_current_state(bucket_id).isOwned()); + } + void configureMaxClusterClockSkew(int seconds); void configure_mutation_sequencing(bool enabled); void configure_merge_busy_inhibit_duration(int seconds); void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled); + void simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending); + static std::shared_ptr<api::RemoveReply> make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd); + + // TODO dedupe + auto sent_get_command(size_t idx) { return sent_command<api::GetCommand>(idx); } + + auto make_get_reply(size_t idx, api::Timestamp ts, bool is_tombstone, bool condition_matched) { + return std::make_shared<api::GetReply>(*sent_get_command(idx), std::shared_ptr<document::Document>(), ts, + false, is_tombstone, condition_matched); + } + + void set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx); + void do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state); + void do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state); }; DistributorStripeTest::DistributorStripeTest() @@ -224,6 +257,34 @@ DistributorStripeTest::DistributorStripeTest() DistributorStripeTest::~DistributorStripeTest() = default; +void +DistributorStripeTest::simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending) +{ + simulate_set_pending_cluster_state(state_str); + if (clear_pending) { + enable_cluster_state(state_str); + clear_pending_cluster_state_bundle(); + } +} + +std::shared_ptr<api::RemoveReply> +DistributorStripeTest::make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd) +{ + auto& cmd_as_remove = dynamic_cast<api::RemoveCommand&>(originator_cmd); + auto reply = std::dynamic_pointer_cast<api::RemoveReply>(std::shared_ptr<api::StorageReply>(cmd_as_remove.makeReply())); + reply->setOldTimestamp(100); + // Including a bucket remapping as part of the response is a pragmatic way to avoid false + // negatives when testing whether cancelled operations may mutate the DB. This is because + // non-remapped buckets are not created in the DB if they are already removed (which will + // be the case after bucket pruning on a cluster state change), but remapped buckets _are_ + // implicitly created upon insert. + // We expect the original bucket is 16 bits and fake a remap to a split bucket one level + // below the original bucket. + reply->remapBucketId(BucketId(17, (cmd_as_remove.getBucketId().getId() & 0xFFFF) | 0x10000)); + reply->setBucketInfo(api::BucketInfo(0x1234, 2, 300)); + return reply; +} + TEST_F(DistributorStripeTest, operation_generation) { setup_stripe(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -455,8 +516,7 @@ TEST_F(DistributorStripeTest, no_db_resurrection_for_bucket_not_owned_in_pending simulate_set_pending_cluster_state("storage:10 distributor:10"); document::BucketId nonOwnedBucket(16, 3); - EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(nonOwnedBucket).owned_in_pending_state()); - EXPECT_FALSE(getDistributorBucketSpace().check_ownership_in_pending_and_current_state(nonOwnedBucket).isOwned()); + ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(nonOwnedBucket)); std::vector<BucketCopy> copies; copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); @@ -1052,4 +1112,168 @@ TEST_F(DistributorStripeTest, enable_condition_probing_config_is_propagated_to_i EXPECT_TRUE(getConfig().enable_condition_probing()); } +TEST_F(DistributorStripeTest, enable_operation_cancellation_config_is_propagated_to_internal_config) { + setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + EXPECT_FALSE(getConfig().enable_operation_cancellation()); // TODO switch default once ready + + configure_enable_operation_cancellation(false); + EXPECT_FALSE(getConfig().enable_operation_cancellation()); + + configure_enable_operation_cancellation(true); + EXPECT_TRUE(getConfig().enable_operation_cancellation()); +} + +TEST_F(DistributorStripeTest, cluster_state_node_down_edge_cancels_pending_operations_on_unavailable_nodes) { + setup_stripe(Redundancy(1), NodeCount(1), "version:1 distributor:1 storage:1"); + configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled + addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t"); + + stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1} + ASSERT_EQ(_sender.getCommands(true), "Remove => 0"); + + // Oh no, node 0 goes down while we have a pending operation! + simulate_cluster_state_transition("version:2 distributor:1 storage:1 .0.s:d", true); + EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(16, 1))); // Implicitly cleared + + auto reply = make_remove_reply_with_bucket_remap(*_sender.command(0)); + // Before we receive the reply, node 0 is back online. Even though the node is available in the + // cluster state, we should not apply its bucket info to our DB, as it may represent stale + // information (it's possible that our node froze up and that another distributor took over and + // mutated the bucket in the meantime; a classic ABA scenario). + simulate_cluster_state_transition("version:5 distributor:1 storage:1", true); + + _stripe->handleReply(std::move(reply)); + EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(17, 0x10001))); +} + +TEST_F(DistributorStripeTest, distribution_config_change_edge_cancels_pending_operations_on_unavailable_nodes) { + setup_stripe(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); + configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled + addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t,1=3/4/5/t"); + + stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1} + ASSERT_EQ(_sender.getCommands(true), "Remove => 0,Remove => 1"); + + // Node 1 is configured away; only node 0 remains. This is expected to be closely followed by + // (or--depending on the timing of operations in the cluster--preceded by) a cluster state with + // the node marked as down, but the ordering is not guaranteed. + auto new_config = make_default_distribution_config(1, 1); + simulate_distribution_config_change(std::move(new_config)); + + auto node_0_reply = make_remove_reply_with_bucket_remap(*_sender.command(0)); + auto node_1_reply = make_remove_reply_with_bucket_remap(*_sender.command(1)); + + _stripe->handleReply(std::move(node_0_reply)); + _stripe->handleReply(std::move(node_1_reply)); + + // Only node 0 should be present in the DB + EXPECT_EQ("BucketId(0x4000000000000001) : " // Original bucket + "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=true,active=false,ready=false)", + dumpBucket(BucketId(16, 1))); + EXPECT_EQ("BucketId(0x4400000000010001) : " // Remapped bucket + "node(idx=0,crc=0x1234,docs=2/2,bytes=300/300,trusted=true,active=false,ready=false)", + dumpBucket(BucketId(17, 0x10001))); +} + +void DistributorStripeTest::set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx) { + setup_stripe(Redundancy(1), NodeCount(10), "version:1 distributor:2 storage:2"); + configure_stripe_with([](auto& builder) { + builder.enableConditionProbing = true; + builder.enableOperationCancellation = true; + }); + + NodeSupportedFeatures features; + features.document_condition_probe = true; + set_node_supported_features(0, features); + set_node_supported_features(1, features); + + // Note: replicas are intentionally out of sync to trigger a write-repair. + addNodesToBucketDB(BucketId(16, superbucket_idx), "0=3/4/5,1=4/5/6"); +} + +namespace { + +std::shared_ptr<api::RemoveCommand> make_conditional_remove_request(uint32_t superbucket_idx) { + auto client_remove = std::make_shared<api::RemoveCommand>( + makeDocumentBucket(document::BucketId(0)), + document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%u:foo", superbucket_idx)), + api::Timestamp(0)); + client_remove->setCondition(documentapi::TestAndSetCondition("foo.bar==baz")); + return client_remove; +} + +} + +void DistributorStripeTest::do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state) { + constexpr uint32_t superbucket_idx = 3; + const BucketId bucket_id(16, superbucket_idx); + set_up_for_bucket_ownership_cancellation(superbucket_idx); + // To actually check if cancellation is happening, we need to trigger a code path that + // is only covered by cancellation and not the legacy "check buckets at DB insert time" + // logic. The latter would give a false negative. + stripe_handle_message(make_conditional_remove_request(superbucket_idx)); + ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1"); // Condition probes, thunder cats go! + + simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state); + ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(bucket_id)); + EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // Should have been pruned + + _stripe->handleReply(make_get_reply(0, 100, false, true)); + _stripe->handleReply(make_get_reply(1, 100, false, true)); + + // Condition probe was successful, but operation is cancelled and shall not continue. + ASSERT_EQ(_sender.getCommands(true, false, 2), ""); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:foo:testdoctype1:n=3:foo, timestamp 1, not found) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); + EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // And definitely no resurrection +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_pending_case) { + do_test_cancelled_pending_op_with_bucket_ownership_change(false); +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_not_pending_case) { + do_test_cancelled_pending_op_with_bucket_ownership_change(true); +} + +void DistributorStripeTest::do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state) { + constexpr uint32_t superbucket_idx = 14; + const BucketId bucket_id(16, superbucket_idx); + set_up_for_bucket_ownership_cancellation(superbucket_idx); + + stripe_handle_message(make_conditional_remove_request(superbucket_idx)); + ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1"); + + simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state); + ASSERT_TRUE(distributor_owns_bucket_in_current_and_pending_states(bucket_id)); + EXPECT_EQ("BucketId(0x400000000000000e) : " + "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x4,docs=5/5,bytes=6/6,trusted=false,active=false,ready=false)", + dumpBucket(bucket_id)); // Should _not_ have been pruned + + _stripe->handleReply(make_get_reply(0, 100, false, true)); + _stripe->handleReply(make_get_reply(1, 100, false, true)); + + // Operation can proceed as planned as it has not been cancelled + ASSERT_EQ(_sender.getCommands(true, false, 2), "Remove => 0,Remove => 1"); +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_pending_case) { + do_test_not_cancelled_pending_op_without_bucket_ownership_change(false); +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_not_pending_case) { + do_test_not_cancelled_pending_op_without_bucket_ownership_change(true); +} + +// TODO we do not have good handling of bucket ownership changes combined with +// distribution config changes... Hard to remove all such edge cases unless we +// make state+config change an atomic operation initiated by the cluster controller +// (hint: we should do this). + + } diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index 5babde49380..ba6c4cb4ac4 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -72,7 +72,7 @@ DistributorStripeTestUtil::setup_stripe(int redundancy, int node_count, const li // explicitly (which is what happens in "real life"), that is what would // take place. // The inverse case of this can be explicitly accomplished by calling - // triggerDistributionChange(). + // trigger_distribution_change(). // This isn't pretty, folks, but it avoids breaking the world for now, // as many tests have implicit assumptions about this being the behavior. auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); @@ -93,10 +93,31 @@ void DistributorStripeTestUtil::trigger_distribution_change(lib::Distribution::SP distr) { _node->getComponentRegister().setDistribution(distr); - auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(distr); + auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distr)); _stripe->update_distribution_config(new_config); } +void +DistributorStripeTestUtil::simulate_distribution_config_change(std::shared_ptr<lib::Distribution> new_config) +{ + trigger_distribution_change(std::move(new_config)); + _stripe->notifyDistributionChangeEnabled(); + for (auto& space : _stripe->getBucketSpaceRepo()) { + auto cur_state = current_cluster_state_bundle().getDerivedClusterState(space.first); // no change in state itself + _stripe->remove_superfluous_buckets(space.first, *cur_state, true); + } +} + +std::shared_ptr<lib::Distribution> +DistributorStripeTestUtil::make_default_distribution_config(uint16_t redundancy, uint16_t node_count) +{ + lib::Distribution::DistributionConfigBuilder config(lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get()); + config.redundancy = redundancy; + config.initialRedundancy = redundancy; + config.ensurePrimaryPersisted = true; + return std::make_shared<lib::Distribution>(config); +} + std::shared_ptr<DistributorConfiguration> DistributorStripeTestUtil::make_config() const { @@ -416,6 +437,11 @@ DistributorStripeTestUtil::getDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } +const DistributorBucketSpace& +DistributorStripeTestUtil::getDistributorBucketSpace() const { + return getBucketSpaceRepo().get(makeBucketSpace()); +} + BucketDatabase& DistributorStripeTestUtil::getBucketDatabase() { return getDistributorBucketSpace().getBucketDatabase(); diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index 272301bf4a6..2892cec6fcf 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -138,6 +138,7 @@ public: // TODO explicit notion of bucket spaces for tests DistributorBucketSpace& getDistributorBucketSpace(); + const DistributorBucketSpace& getDistributorBucketSpace() const; BucketDatabase& getBucketDatabase(); // Implicit default space only BucketDatabase& getBucketDatabase(document::BucketSpace space); const BucketDatabase& getBucketDatabase() const; // Implicit default space only @@ -175,6 +176,8 @@ public: void set_redundancy(uint32_t redundancy); void trigger_distribution_change(std::shared_ptr<lib::Distribution> distr); + void simulate_distribution_config_change(std::shared_ptr<lib::Distribution> new_config); + static std::shared_ptr<lib::Distribution> make_default_distribution_config(uint16_t redundancy, uint16_t node_count); using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp index 570fe24679e..ae389ecd6c2 100644 --- a/storage/src/tests/distributor/joinbuckettest.cpp +++ b/storage/src/tests/distributor/joinbuckettest.cpp @@ -109,4 +109,32 @@ TEST_F(JoinOperationTest, send_sparse_joins_to_nodes_without_both_source_buckets ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}})); } +TEST_F(JoinOperationTest, cancelled_node_does_not_update_bucket_db) { + auto cfg = make_config(); + cfg->setJoinCount(100); + cfg->setJoinSize(1000); + configure_stripe(cfg); + + addNodesToBucketDB(document::BucketId(33, 1), "0=250/50/300"); + addNodesToBucketDB(document::BucketId(33, 0x100000001), "0=300/40/200"); + enable_cluster_state("distributor:1 storage:1"); + + JoinOperation op(dummy_cluster_context, + BucketAndNodes(makeDocumentBucket(document::BucketId(32, 0)), toVector<uint16_t>(0)), + {document::BucketId(33, 1), document::BucketId(33, 0x100000001)}); + + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender); + + op.cancel(_sender, CancelScope::of_node_subset({0})); + + checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}}); + + // DB is not touched, so source buckets remain unchanged and target buckets is not created + EXPECT_TRUE(getBucket(document::BucketId(33, 0x100000001)).valid()); + EXPECT_TRUE(getBucket(document::BucketId(33, 1)).valid()); + EXPECT_FALSE(getBucket(document::BucketId(32, 0)).valid()); + EXPECT_FALSE(op.ok()); +} + } diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 512c092d8ae..12280980998 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -548,8 +548,7 @@ TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer)); } -TEST_F(MergeOperationTest, on_blocked_updates_metrics) -{ +TEST_F(MergeOperationTest, on_blocked_updates_metrics) { auto op = setup_minimal_merge_op(); auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET]; EXPECT_EQ(0, metrics->blocked.getValue()); @@ -557,8 +556,7 @@ TEST_F(MergeOperationTest, on_blocked_updates_metrics) EXPECT_EQ(1, metrics->blocked.getValue()); } -TEST_F(MergeOperationTest, on_throttled_updates_metrics) -{ +TEST_F(MergeOperationTest, on_throttled_updates_metrics) { auto op = setup_minimal_merge_op(); auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET]; EXPECT_EQ(0, metrics->throttled.getValue()); @@ -628,4 +626,22 @@ TEST_F(MergeOperationTest, delete_bucket_priority_is_capped_to_feed_pri_120) { EXPECT_EQ(int(del_cmd->getPriority()), 120); } +TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_fully_cancelled) { + auto op = setup_simple_merge_op(); + ASSERT_NO_FATAL_FAILURE(assert_simple_merge_bucket_command()); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + sendReply(*op); + EXPECT_EQ(_sender.getCommands(true, false, 1), ""); // nothing more + EXPECT_FALSE(op->ok()); +} + +TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) { + auto op = setup_simple_merge_op(); // to nodes, 0, 2, 1 (source only) + ASSERT_NO_FATAL_FAILURE(assert_simple_merge_bucket_command()); + op->cancel(_sender, CancelScope::of_node_subset({1})); + sendReply(*op); + EXPECT_EQ(_sender.getCommands(true, false, 1), ""); // nothing more + EXPECT_FALSE(op->ok()); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp index 68d86884036..b9d1d804761 100644 --- a/storage/src/tests/distributor/removebucketoperationtest.cpp +++ b/storage/src/tests/distributor/removebucketoperationtest.cpp @@ -24,6 +24,14 @@ struct RemoveBucketOperationTest : Test, DistributorStripeTestUtil { void TearDown() override { close(); } + + void reject_with_bucket_info(RemoveBucketOperation& op, size_t msg_index) { + std::shared_ptr<api::StorageCommand> msg2 = _sender.command(msg_index); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); + dynamic_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(api::BucketInfo(10, 200, 1)); + reply->setResult(api::ReturnCode::REJECTED); + op.receive(_sender, reply); + } }; TEST_F(RemoveBucketOperationTest, simple) { @@ -36,11 +44,10 @@ TEST_F(RemoveBucketOperationTest, simple) { RemoveBucketOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), - toVector<uint16_t>(1,2))); + toVector<uint16_t>(1, 2))); op.setIdealStateManager(&getIdealStateManager()); op.start(_sender); - ASSERT_EQ("Delete bucket => 1," "Delete bucket => 2", _sender.getCommands(true)); @@ -75,16 +82,11 @@ TEST_F(RemoveBucketOperationTest, bucket_info_mismatch_failure) { ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true)); ASSERT_EQ(1, _sender.commands().size()); - std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); - std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release()); - dynamic_cast<api::DeleteBucketReply&>(*reply).setBucketInfo( - api::BucketInfo(10, 100, 1)); - reply->setResult(api::ReturnCode::REJECTED); - op.receive(_sender, reply); + reject_with_bucket_info(op, 0); // RemoveBucketOperation should reinsert bucketinfo into database ASSERT_EQ("BucketId(0x4000000000000001) : " - "node(idx=1,crc=0xa,docs=100/100,bytes=1/1,trusted=true,active=false,ready=false)", + "node(idx=1,crc=0xa,docs=200/200,bytes=1/1,trusted=true,active=false,ready=false)", dumpBucket(document::BucketId(16, 1))); } @@ -130,4 +132,35 @@ TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_targ EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120)); } +TEST_F(RemoveBucketOperationTest, cancelled_node_does_not_update_bucket_db_upon_rejection) { + addNodesToBucketDB(document::BucketId(16, 1), + "0=10/100/1/t," + "1=10/100/1/t," + "2=10/100/1/t"); + set_redundancy(1); + enable_cluster_state("distributor:1 storage:3"); + + RemoveBucketOperation op(dummy_cluster_context, + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + toVector<uint16_t>(1,2))); + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender); + + ASSERT_EQ("Delete bucket => 1," + "Delete bucket => 2", + _sender.getCommands(true)); + + op.cancel(_sender, CancelScope::of_node_subset({1})); + + // Rejections will by default reinsert the bucket into the DB with the bucket info contained + // in the reply, but here the node is cancelled and should therefore not be reinserted. + reject_with_bucket_info(op, 0); + sendReply(op, 1); + // Node 1 not reinserted + ASSERT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1/1,trusted=true,active=false,ready=false)", + dumpBucket(document::BucketId(16, 1))); + EXPECT_FALSE(op.ok()); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp index b19a448199b..be688e64a8b 100644 --- a/storage/src/tests/distributor/removelocationtest.cpp +++ b/storage/src/tests/distributor/removelocationtest.cpp @@ -67,4 +67,6 @@ TEST_F(RemoveLocationOperationTest, simple) { _sender.getLastReply()); } +// TODO test cancellation (implicitly covered via operation PersistenceMessageTracker) + } // storage::distributor diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp index edb392d9532..05a13f67bc9 100644 --- a/storage/src/tests/distributor/splitbuckettest.cpp +++ b/storage/src/tests/distributor/splitbuckettest.cpp @@ -48,14 +48,14 @@ SplitOperationTest::SplitOperationTest() } namespace { - api::StorageMessageAddress _Storage0Address(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 0); + api::StorageMessageAddress _storage0Address(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 0); } TEST_F(SplitOperationTest, simple) { enable_cluster_state("distributor:1 storage:1"); insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000, - tooLargeBucketSize, 250); + tooLargeBucketSize, true); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), @@ -72,10 +72,10 @@ TEST_F(SplitOperationTest, simple) { std::shared_ptr<api::StorageCommand> msg = _sender.command(0); ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET); - EXPECT_EQ(_Storage0Address.toString(), + EXPECT_EQ(_storage0Address.toString(), msg->getAddress()->toString()); - std::shared_ptr<api::StorageReply> reply(msg->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); auto* sreply = static_cast<api::SplitBucketReply*>(reply.get()); sreply->getSplitInfo().emplace_back(document::BucketId(17, 1), @@ -142,19 +142,15 @@ TEST_F(SplitOperationTest, multi_node_failure) { { std::shared_ptr<api::StorageCommand> msg = _sender.command(0); ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET); - EXPECT_EQ(_Storage0Address.toString(), - msg->getAddress()->toString()); + EXPECT_EQ(_storage0Address.toString(), msg->getAddress()->toString()); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); + auto& sreply = dynamic_cast<api::SplitBucketReply&>(*reply); - auto* sreply = static_cast<api::SplitBucketReply*>(msg->makeReply().release()); - sreply->setResult(api::ReturnCode::OK); + sreply.setResult(api::ReturnCode::OK); + sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000)); + sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000)); - sreply->getSplitInfo().emplace_back(document::BucketId(17, 1), - api::BucketInfo(100, 600, 5000000)); - - sreply->getSplitInfo().emplace_back(document::BucketId(17, 0x10001), - api::BucketInfo(110, 400, 6000000)); - - op.receive(_sender, std::shared_ptr<api::StorageReply>(sreply)); + op.receive(_sender, reply); } sendReply(op, 1, api::ReturnCode::NOT_CONNECTED); @@ -230,7 +226,7 @@ TEST_F(SplitOperationTest, copy_trusted_status_not_carried_over_after_split) { for (int i = 0; i < 2; ++i) { std::shared_ptr<api::StorageCommand> msg = _sender.command(i); ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET); - std::shared_ptr<api::StorageReply> reply(msg->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); auto* sreply = static_cast<api::SplitBucketReply*>(reply.get()); // Make sure copies differ so they cannot become implicitly trusted. @@ -271,7 +267,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) { }; auto joinCmd = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(joinTarget)); joinCmd->getSourceBuckets() = joinSources; - joinCmd->setAddress(_Storage0Address); + joinCmd->setAddress(_storage0Address); pending_message_tracker().insert(joinCmd); @@ -307,7 +303,7 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { enable_cluster_state("distributor:1 storage:2"); document::BucketId source_bucket(16, 1); - insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, 250); + insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, true); SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)), maxSplitBits, splitCount, splitByteSize); @@ -318,4 +314,36 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) { EXPECT_TRUE(op.isBlocked(operation_context(), op_seq)); } +TEST_F(SplitOperationTest, cancelled_node_does_not_update_bucket_db) { + enable_cluster_state("distributor:1 storage:1"); + insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000, tooLargeBucketSize, true); + + SplitOperation op(dummy_cluster_context, + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0)), + maxSplitBits, splitCount, splitByteSize); + + op.setIdealStateManager(&getIdealStateManager()); + op.start(_sender); + + op.cancel(_sender, CancelScope::of_node_subset({0})); + + { + ASSERT_EQ(_sender.commands().size(), 1); + std::shared_ptr<api::StorageCommand> msg = _sender.command(0); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); + auto& sreply = dynamic_cast<api::SplitBucketReply&>(*reply); + + sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000)); + sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000)); + op.receive(_sender, reply); + } + + // DB is not touched, so source bucket remains (will be removed during actual operation) + // while target buckets are not created + EXPECT_TRUE(getBucket(document::BucketId(16, 1)).valid()); + EXPECT_FALSE(getBucket(document::BucketId(17, 0x00001)).valid()); + EXPECT_FALSE(getBucket(document::BucketId(17, 0x10001)).valid()); + EXPECT_FALSE(op.ok()); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index d3af4cd564a..f3aa9c2eb92 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -1056,7 +1056,32 @@ TEST_F(TopLevelBucketDBUpdaterTest, recheck_node) { const BucketCopy* copy = entry->getNode(1); ASSERT_TRUE(copy != nullptr); - EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo()); + EXPECT_EQ(api::BucketInfo(20, 10, 12, 50, 60, true, true), copy->getBucketInfo()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, cancelled_pending_recheck_command_does_not_update_db) { + ASSERT_NO_FATAL_FAILURE(initialize_nodes_and_buckets(3, 5)); + _sender.clear(); + + auto bucket = makeDocumentBucket(document::BucketId(16, 3)); + auto& stripe_bucket_db_updater = stripe_of_bucket(bucket.getBucketId()).bucket_db_updater(); + stripe_bucket_db_updater.recheckBucketInfo(0, bucket); + + ASSERT_EQ(_sender.getCommands(true), "Request bucket info => 0"); + auto& req = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); + + ASSERT_TRUE(stripe_bucket_db_updater.cancel_message_by_id(req.getMsgId())); + + auto reply = std::make_shared<api::RequestBucketInfoReply>(req); + reply->getBucketInfo().emplace_back(document::BucketId(16, 3), api::BucketInfo(20, 10, 12, 50, 60, true, true)); + stripe_bucket_db_updater.onRequestBucketInfoReply(reply); + + BucketDatabase::Entry entry = get_bucket(bucket); + ASSERT_TRUE(entry.valid()); + const BucketCopy* copy = entry->getNode(0); + ASSERT_TRUE(copy != nullptr); + // Existing bucket info not modified by reply (0xa ... is the initialized test state). + EXPECT_EQ(api::BucketInfo(0xa, 1, 1, 1, 1, false, false), copy->getBucketInfo()); } TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) { @@ -1217,11 +1242,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) { document::BucketId bucket_id(16, 1234); add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); - std::vector<api::MergeBucketCommand::Node> nodes; - nodes.emplace_back(0); - nodes.emplace_back(1); - nodes.emplace_back(2); - + std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}}; api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); auto reply = std::make_shared<api::MergeBucketReply>(cmd); @@ -1248,19 +1269,15 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) { "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), " "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)", dump_bucket(bucket_id)); -}; +} TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) { enable_distributor_cluster_state("distributor:1 storage:3"); - std::vector<api::MergeBucketCommand::Node> nodes; document::BucketId bucket_id(16, 1234); add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); - for (uint32_t i = 0; i < 3; ++i) { - nodes.emplace_back(i); - } - + std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}}; api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); auto reply = std::make_shared<api::MergeBucketReply>(cmd); @@ -1287,19 +1304,15 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) { "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)", dump_bucket(bucket_id)); -}; +} TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { enable_distributor_cluster_state("distributor:1 storage:3"); - std::vector<api::MergeBucketCommand::Node> nodes; document::BucketId bucket_id(16, 1234); add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); - for (uint32_t i = 0; i < 3; ++i) { - nodes.emplace_back(i); - } - + std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}}; api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); auto reply = std::make_shared<api::MergeBucketReply>(cmd); @@ -1326,7 +1339,41 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)", dump_bucket(bucket_id)); -}; +} + +TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_triggered_bucket_info_request_not_sent_to_cancelled_nodes) { + enable_distributor_cluster_state("distributor:1 storage:3"); + document::BucketId bucket_id(16, 1234); + // DB has one bucket with 3 mutually out of sync replicas. Node 0 is explicitly tagged as Active; + // this is done to prevent the distributor from scheduling a bucket activation as its first task. + add_nodes_to_stripe_bucket_db(bucket_id, "0=10/1/1/t/a,1=20/1/1/t,2=30/1/1/t"); + + // Poke at the business end of the stripe until it has scheduled a merge for the inconsistent bucket + ASSERT_EQ(_sender.commands().size(), 0u); + const int max_tick_tries = 20; + for (int i = 0; i <= max_tick_tries; ++i) { + stripe_of_bucket(bucket_id).tick(); + if (!_sender.commands().empty()) { + continue; + } + if (i == max_tick_tries) { + FAIL() << "no merge sent after ticking " << max_tick_tries << " times"; + } + } + ASSERT_EQ(_sender.getCommands(true), "Merge bucket => 0"); + + auto cmd = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]); + _sender.commands().clear(); + auto reply = std::make_shared<api::MergeBucketReply>(*cmd); + + auto op = stripe_of_bucket(bucket_id).maintenance_op_from_message_id(cmd->getMsgId()); + ASSERT_TRUE(op); + + op->cancel(_sender, CancelScope::of_node_subset({0, 2})); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + // RequestBucketInfo only sent to node 1 + ASSERT_EQ(_sender.getCommands(true), "Request bucket info => 1"); +} TEST_F(TopLevelBucketDBUpdaterTest, flush) { enable_distributor_cluster_state("distributor:1 storage:3"); @@ -1335,11 +1382,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, flush) { document::BucketId bucket_id(16, 1234); add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); - std::vector<api::MergeBucketCommand::Node> nodes; - for (uint32_t i = 0; i < 3; ++i) { - nodes.emplace_back(i); - } - + std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}}; api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); auto reply = std::make_shared<api::MergeBucketReply>(cmd); diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index d6d21c89b68..959fdc612cb 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -54,6 +54,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _inhibit_default_merges_when_global_merges_pending(false), _enable_two_phase_garbage_collection(false), _enable_condition_probing(false), + _enable_operation_cancellation(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -179,6 +180,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _inhibit_default_merges_when_global_merges_pending = config.inhibitDefaultMergesWhenGlobalMergesPending; _enable_two_phase_garbage_collection = config.enableTwoPhaseGarbageCollection; _enable_condition_probing = config.enableConditionProbing; + _enable_operation_cancellation = config.enableOperationCancellation; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index e3664276518..6056f0a1d5c 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -293,6 +293,12 @@ public: [[nodiscard]] bool enable_condition_probing() const noexcept { return _enable_condition_probing; } + void set_enable_operation_cancellation(bool enable) noexcept { + _enable_operation_cancellation = enable; + } + [[nodiscard]] bool enable_operation_cancellation() const noexcept { + return _enable_operation_cancellation; + } uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; } @@ -354,6 +360,7 @@ private: bool _inhibit_default_merges_when_global_merges_pending; bool _enable_two_phase_garbage_collection; bool _enable_condition_probing; + bool _enable_operation_cancellation; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 95461eb5dc2..debbe443b31 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -313,3 +313,8 @@ enable_two_phase_garbage_collection bool default=true ## replicas will trigger an implicit distributed condition probe to resolve the outcome of ## the condition across all divergent replicas. enable_condition_probing bool default=true + +## If true, changes in the cluster where a subset of the nodes become unavailable or buckets +## change ownership between distributors will trigger an explicit cancellation of all pending +## requests partially or fully "invalidated" by such a change. +enable_operation_cancellation bool default=false diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index c889afcc77c..16a4fb5691f 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_library(storage_distributor OBJECT activecopy.cpp blockingoperationstarter.cpp bucket_db_prune_elision.cpp + bucket_ownership_calculator.cpp bucket_space_distribution_configs.cpp bucket_space_distribution_context.cpp bucket_space_state_map.cpp diff --git a/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp new file mode 100644 index 00000000000..6f94235e548 --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp @@ -0,0 +1,41 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "bucket_ownership_calculator.h" +#include <vespa/document/bucket/bucket.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/clusterstate.h> + +namespace storage::distributor { + +namespace { + +uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept { + // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest. + return id.getRawId() & ~(UINT64_MAX << distribution_bits); +} + +} + +bool +BucketOwnershipCalculator::this_distributor_owns_bucket(const document::BucketId& bucket_id) const +{ + // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor. + // TODO "too few bits used" case can be cheaply checked without needing exception + try { + const auto bits = _state.getDistributionBitCount(); + const auto this_superbucket = superbucket_from_id(bucket_id, bits); + if (_cached_decision_superbucket == this_superbucket) { + return _cached_owned; + } + uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucket_id, "uim"); + _cached_decision_superbucket = this_superbucket; + _cached_owned = (distributor == _this_node_index); + return _cached_owned; + } catch (lib::TooFewBucketBitsInUseException&) { + // Ignore; implicitly not owned + } catch (lib::NoDistributorsAvailableException&) { + // Ignore; implicitly not owned + } + return false; +} + +} diff --git a/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h new file mode 100644 index 00000000000..b67bb41e85d --- /dev/null +++ b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h @@ -0,0 +1,44 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <cstdint> + +namespace document { class BucketId; } + +namespace storage::lib { +class ClusterState; +class Distribution; +} + +namespace storage::distributor { + +/** + * Calculator for determining if a bucket is owned by the current distributor. + * Ideal state calculations are cached and reused for all consecutive sub-buckets + * under the same super bucket. The cache is invalidated when a new super bucket + * is encountered, so it only provides a benefit when invoked in bucket ID order. + * + * Not thread safe due to internal caching. + */ +class BucketOwnershipCalculator { + const lib::ClusterState& _state; + const lib::Distribution& _distribution; + mutable uint64_t _cached_decision_superbucket; + const uint16_t _this_node_index; + mutable bool _cached_owned; +public: + BucketOwnershipCalculator(const lib::ClusterState& state, + const lib::Distribution& distribution, + uint16_t this_node_index) noexcept + : _state(state), + _distribution(distribution), + _cached_decision_superbucket(UINT64_MAX), + _this_node_index(this_node_index), + _cached_owned(false) + { + } + + [[nodiscard]] bool this_distributor_owns_bucket(const document::BucketId& bucket_id) const; +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index ad1cce46bea..ac5cb740361 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -16,6 +16,8 @@ #include <vespa/storage/common/node_identity.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> +#include <vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/memoryusage.h> @@ -177,6 +179,12 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM return true; } +std::shared_ptr<Operation> +DistributorStripe::maintenance_op_from_message_id(uint64_t msg_id) const noexcept +{ + return _maintenanceOperationOwner.find_by_id(msg_id); +} + void DistributorStripe::handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply) { @@ -210,6 +218,8 @@ DistributorStripe::handleReply(const std::shared_ptr<api::StorageReply>& reply) bucket.getBucketId() != document::BucketId(0) && reply->getAddress()) { + // Won't be triggered for replies of cancelled ops since they will be missing + // from `_pendingMessageTracker` and thus `bucket` will be zero. recheckBucketInfo(reply->getAddress()->getIndex(), bucket); } @@ -271,18 +281,82 @@ DistributorStripe::getClusterStateBundle() const } void -DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state) +DistributorStripe::cancel_single_message_by_id_if_found(uint64_t msg_id, const CancelScope& cancel_scope) { - lib::ClusterStateBundle oldState = _clusterStateBundle; - _clusterStateBundle = state; - propagateClusterStates(); + // In descending order of likelihood: + if (_operationOwner.try_cancel_by_id(msg_id, cancel_scope)) { + return; + } + if (_maintenanceOperationOwner.try_cancel_by_id(msg_id, cancel_scope)) { + return; + } + (void)_bucketDBUpdater.cancel_message_by_id(msg_id); +} - const auto& baseline_state = *state.getBaselineClusterState(); - enterRecoveryMode(); +void +DistributorStripe::handle_node_down_edge_with_cancellations(uint16_t node_index, std::span<const uint64_t> msg_ids) +{ + auto cancel_scope = CancelScope::of_node_subset({node_index}); + for (const auto msg_id : msg_ids) { + cancel_single_message_by_id_if_found(msg_id, cancel_scope); + } +} + +void +DistributorStripe::cancel_ops_for_buckets_no_longer_owned(document::BucketSpace bucket_space, + const lib::ClusterState& new_state) +{ + // Note: we explicitly do not simply reuse the set of buckets removed from the bucket database + // when deciding which operations to cancel. This is because that would depend on every candidate + // bucket to cancel already being present in the DB, which is hard to guarantee always holds. + const auto& distribution = _bucketSpaceRepo->get(bucket_space).getDistribution(); + BucketOwnershipCalculator ownership_calc(new_state, distribution, getDistributorIndex()); + + auto bucket_not_owned_in_new_state = [&](const document::Bucket& bucket) { + return !ownership_calc.this_distributor_owns_bucket(bucket.getBucketId()); + }; + auto cancel_op_by_msg_id = [&](uint64_t msg_id) { + cancel_single_message_by_id_if_found(msg_id, CancelScope::of_fully_cancelled()); + }; + _pendingMessageTracker.enumerate_matching_pending_bucket_ops(bucket_not_owned_in_new_state, cancel_op_by_msg_id); +} + +void +DistributorStripe::cancel_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle, + const lib::ClusterStateBundle& new_state_bundle) +{ + // TODO we should probably only consider a node as unavailable if it is unavailable in + // _all_ bucket spaces. Consider: implicit maintenance mode for global merges (although + // that _should_ only be triggered by the CC when the node was already down...). + const auto& baseline_state = *new_state_bundle.getBaselineClusterState(); + const uint16_t old_node_count = old_state_bundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); + const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE); + const auto& distribution = _bucketSpaceRepo->get(document::FixedBucketSpaces::default_space()).getDistribution(); + for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) { + // Handle both the case where a node may be gone from the cluster state and from the config. + // These are not atomic, so one may happen before the other. + const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState(); + const auto* node_group = distribution.getNodeGraph().getGroupForNode(i); + if (!node_state.oneOf(storage_node_up_states()) || !node_group) { + // Note: this also clears _non-maintenance_ operations from the pending message tracker, but + // the client operation mapping (_operationOwner) is _not_ cleared, so replies from the + // unavailable node(s) will still be processed as expected. + std::vector<uint64_t> msg_ids = _pendingMessageTracker.clearMessagesForNode(i); + LOG(debug, "Node %u is unavailable, cancelling %zu pending operations", i, msg_ids.size()); + handle_node_down_edge_with_cancellations(i, msg_ids); + } + } +} + +// TODO remove once cancellation support has proven itself worthy of prime time +void +DistributorStripe::legacy_erase_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle, + const lib::ClusterStateBundle& new_state_bundle) +{ + const auto& baseline_state = *new_state_bundle.getBaselineClusterState(); // Clear all active messages on nodes that are down. - // TODO this should also be done on nodes that are no longer part of the config! - const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); + const uint16_t old_node_count = old_state_bundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE); const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE); for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) { const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState(); @@ -291,12 +365,28 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state LOG(debug, "Node %u is down, clearing %zu pending maintenance operations", i, msgIds.size()); for (const auto & msgId : msgIds) { - _maintenanceOperationOwner.erase(msgId); + (void)_maintenanceOperationOwner.erase(msgId); } } } } +void +DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state) +{ + lib::ClusterStateBundle old_state = _clusterStateBundle; + _clusterStateBundle = state; + + propagateClusterStates(); + enterRecoveryMode(); + + if (_total_config->enable_operation_cancellation()) { + cancel_ops_for_unavailable_nodes(old_state, state); + } else { + legacy_erase_ops_for_unavailable_nodes(old_state, state); + } +} + OperationRoutingSnapshot DistributorStripe::read_snapshot_for_bucket(const document::Bucket& bucket) const { return _bucketDBUpdater.read_snapshot_for_bucket(bucket); } @@ -308,6 +398,10 @@ DistributorStripe::notifyDistributionChangeEnabled() // Trigger a re-scan of bucket database, just like we do when a new cluster // state has been enabled. enterRecoveryMode(); + + if (_total_config->enable_operation_cancellation()) { + cancel_ops_for_unavailable_nodes(_clusterStateBundle, _clusterStateBundle); + } } void @@ -850,6 +944,9 @@ DistributorStripe::remove_superfluous_buckets(document::BucketSpace bucket_space const lib::ClusterState& new_state, bool is_distribution_change) { + if (_total_config->enable_operation_cancellation()) { + cancel_ops_for_buckets_no_longer_owned(bucket_space, new_state); + } return bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change); } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 338a6c72125..566e6ed454a 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -26,6 +26,7 @@ #include <atomic> #include <mutex> #include <queue> +#include <span> #include <unordered_map> namespace storage { @@ -135,6 +136,8 @@ public: const lib::ClusterStateBundle& getClusterStateBundle() const override; + std::shared_ptr<Operation> maintenance_op_from_message_id(uint64_t msg_id) const noexcept override; + /** * Called by bucket db updater after a merge has finished, and all the * request bucket info operations have been performed as well. Passes the @@ -251,6 +254,16 @@ private: void enterRecoveryMode(); void leaveRecoveryMode(); + void cancel_single_message_by_id_if_found(uint64_t msg_id, const CancelScope& cancel_scope); + void handle_node_down_edge_with_cancellations(uint16_t node_index, std::span<const uint64_t> msg_ids); + void cancel_ops_for_buckets_no_longer_owned(document::BucketSpace bucket_space, const lib::ClusterState& new_state); + // Note: old and new state bundles may be the same if this is called for distribution config changes + void cancel_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle, + const lib::ClusterStateBundle& new_state_bundle); + + void legacy_erase_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle, + const lib::ClusterStateBundle& new_state_bundle); + // Tries to generate an operation from the given message. Returns true // if we either returned an operation, or the message was otherwise handled // (for instance, wrong distribution). diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h index dfed59499c6..14888de961e 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h @@ -18,6 +18,7 @@ namespace storage::distributor { class DistributorMetricSet; class NodeSupportedFeaturesRepo; class PendingMessageTracker; +class Operation; /** * TODO STRIPE add class comment. @@ -27,7 +28,7 @@ class DistributorStripeInterface : public DistributorStripeMessageSender public: virtual DistributorMetricSet& getMetrics() = 0; virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0; - virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0; + [[nodiscard]] virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0; virtual void notifyDistributionChangeEnabled() = 0; /** @@ -57,7 +58,9 @@ public: /** * Returns true if the node is currently initializing. */ - virtual bool initializing() const = 0; + [[nodiscard]] virtual bool initializing() const = 0; + + [[nodiscard]] virtual std::shared_ptr<Operation> maintenance_op_from_message_id(uint64_t msg_id) const noexcept = 0; virtual void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>&) = 0; virtual const DistributorConfiguration& getConfig() const = 0; virtual ChainedMessageSender& getMessageSender() = 0; diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h index a0234f425a0..92cc921d91c 100644 --- a/storage/src/vespa/storage/distributor/messagetracker.h +++ b/storage/src/vespa/storage/distributor/messagetracker.h @@ -25,7 +25,7 @@ public: uint16_t _target; }; - MessageTracker(const ClusterContext& cluster_context); + explicit MessageTracker(const ClusterContext& cluster_context); MessageTracker(MessageTracker&&) noexcept = default; MessageTracker& operator=(MessageTracker&&) noexcept = delete; MessageTracker(const MessageTracker &) = delete; diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp index c92544c8cb5..16bbc36e4bc 100644 --- a/storage/src/vespa/storage/distributor/operationowner.cpp +++ b/storage/src/vespa/storage/distributor/operationowner.cpp @@ -70,10 +70,27 @@ OperationOwner::onClose() } } -void +std::shared_ptr<Operation> +OperationOwner::find_by_id(api::StorageMessage::Id msg_id) const noexcept +{ + return _sentMessageMap.find_by_id_or_empty(msg_id); +} + +bool +OperationOwner::try_cancel_by_id(api::StorageMessage::Id id, const CancelScope& cancel_scope) +{ + auto* op = _sentMessageMap.find_by_id_or_nullptr(id); + if (!op) { + return false; + } + op->cancel(_sender, cancel_scope); + return true; +} + +std::shared_ptr<Operation> OperationOwner::erase(api::StorageMessage::Id msgId) { - (void)_sentMessageMap.pop(msgId); + return _sentMessageMap.pop(msgId); } } diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h index 009bb5b80aa..828d776f1a6 100644 --- a/storage/src/vespa/storage/distributor/operationowner.h +++ b/storage/src/vespa/storage/distributor/operationowner.h @@ -9,6 +9,7 @@ namespace storage::framework { struct Clock; } namespace storage::distributor { +class CancelScope; class Operation; /** @@ -87,10 +88,19 @@ public: bool start(const std::shared_ptr<Operation>& operation, Priority priority) override; /** - If the given message exists, create a reply and pass it to the - appropriate callback. + * If the given message exists, remove it from the internal operation mapping. + * + * Returns the operation the message belonged to, if any. */ - void erase(api::StorageMessage::Id msgId); + [[nodiscard]] std::shared_ptr<Operation> erase(api::StorageMessage::Id msgId); + + /** + * Returns a strong ref to the pending operation with the given msg_id if it exists. + * Otherwise returns an empty shared_ptr. + */ + [[nodiscard]] std::shared_ptr<Operation> find_by_id(api::StorageMessage::Id msg_id) const noexcept; + + [[nodiscard]] bool try_cancel_by_id(api::StorageMessage::Id msg_id, const CancelScope& cancel_scope); [[nodiscard]] DistributorStripeMessageSender& sender() noexcept { return _sender; } diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp index bd7f3709575..03be507f467 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp @@ -178,6 +178,8 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St if (_bucket_space.has_pending_cluster_state()) { state_version_now = _bucket_space.get_pending_cluster_state().getVersion(); } + // TODO disable these explicit (and possibly costly) checks when cancellation is enabled, + // as cancellation shall cover a superset of the cases that can be detected here. if ((state_version_now != _cluster_state_version_at_creation_time) && (replica_set_changed_after_get_operation() || distributor_no_longer_owns_bucket())) diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index c7f858de608..5735e47ec87 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -28,8 +28,8 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx, PersistenceOperationMetricSet& condition_probe_metrics, SequencingHandle sequencing_handle) : SequencedOperation(std::move(sequencing_handle)), - _tracker_instance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()), - _tracker(_tracker_instance), + _tracker(metric, std::make_shared<api::PutReply>(*msg), node_ctx, + op_ctx, _cancel_scope, msg->getTimestamp()), _msg(std::move(msg)), _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())), _node_ctx(node_ctx), @@ -253,7 +253,6 @@ PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScop if (_check_condition) { _check_condition->cancel(sender, cancel_scope); } - _tracker.cancel(cancel_scope); } bool diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 8b8e3e15375..4d26ffda61e 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -39,8 +39,7 @@ public: void onClose(DistributorStripeMessageSender& sender) override; private: - PersistenceMessageTrackerImpl _tracker_instance; - PersistenceMessageTracker& _tracker; + PersistenceMessageTracker _tracker; std::shared_ptr<api::PutCommand> _msg; document::BucketId _doc_id_bucket_id; const DistributorNodeContext& _node_ctx; diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp index 5f52a8208fc..ab3855b2687 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp @@ -23,12 +23,7 @@ RemoveLocationOperation::RemoveLocationOperation( std::shared_ptr<api::RemoveLocationCommand> msg, PersistenceOperationMetricSet& metric) : Operation(), - _trackerInstance(metric, - std::make_shared<api::RemoveLocationReply>(*msg), - node_ctx, - op_ctx, - 0), - _tracker(_trackerInstance), + _tracker(metric, std::make_shared<api::RemoveLocationReply>(*msg), node_ctx, op_ctx, _cancel_scope, 0), _msg(std::move(msg)), _node_ctx(node_ctx), _parser(parser), diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h index d177676ff03..1ac4af0997a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h @@ -10,8 +10,7 @@ namespace storage::distributor { class DistributorBucketSpace; -class RemoveLocationOperation : public Operation -{ +class RemoveLocationOperation : public Operation { public: RemoveLocationOperation(const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, @@ -32,14 +31,11 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>&) override; void onClose(DistributorStripeMessageSender& sender) override; private: - PersistenceMessageTrackerImpl _trackerInstance; - PersistenceMessageTracker& _tracker; - + PersistenceMessageTracker _tracker; std::shared_ptr<api::RemoveLocationCommand> _msg; - - const DistributorNodeContext& _node_ctx; - const DocumentSelectionParser& _parser; - DistributorBucketSpace &_bucketSpace; + const DistributorNodeContext& _node_ctx; + const DocumentSelectionParser& _parser; + DistributorBucketSpace& _bucketSpace; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index be43aac3d9e..c7cfea017ac 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -19,10 +19,8 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx, PersistenceOperationMetricSet& condition_probe_metrics, SequencingHandle sequencingHandle) : SequencedOperation(std::move(sequencingHandle)), - _tracker_instance(metric, - std::make_shared<api::RemoveReply>(*msg), - node_ctx, op_ctx, msg->getTimestamp()), - _tracker(_tracker_instance), + _tracker(metric, std::make_shared<api::RemoveReply>(*msg), node_ctx, + op_ctx, _cancel_scope, msg->getTimestamp()), _msg(std::move(msg)), _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())), _node_ctx(node_ctx), @@ -168,7 +166,6 @@ RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelS if (_check_condition) { _check_condition->cancel(sender, cancel_scope); } - _tracker.cancel(cancel_scope); } bool RemoveOperation::has_condition() const noexcept { diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h index 221def81fdc..772047b96ca 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h @@ -32,8 +32,7 @@ public: void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; private: - PersistenceMessageTrackerImpl _tracker_instance; - PersistenceMessageTracker& _tracker; + PersistenceMessageTracker _tracker; std::shared_ptr<api::RemoveCommand> _msg; document::BucketId _doc_id_bucket_id; const DistributorNodeContext& _node_ctx; diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 60bddebbb89..90f3f8c9c43 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -25,9 +25,8 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx, std::vector<BucketDatabase::Entry> entries, UpdateMetricSet& metric) : Operation(), - _trackerInstance(metric, std::make_shared<api::UpdateReply>(*msg), - node_ctx, op_ctx, msg->getTimestamp()), - _tracker(_trackerInstance), + _tracker(metric, std::make_shared<api::UpdateReply>(*msg), node_ctx, + op_ctx, _cancel_scope, msg->getTimestamp()), _msg(msg), _entries(std::move(entries)), _new_timestamp(_msg->getTimestamp()), @@ -207,13 +206,6 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender) _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } -void -UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) -{ - _tracker.cancel(cancel_scope); -} - - // The backend behavior of "create-if-missing" updates is to return the timestamp of the // _new_ update operation if the document was created from scratch. The two-phase update // operation logic auto-detects unexpected inconsistencies and tries to reconcile diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 7d2131d426d..750e50aeae5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -31,25 +31,22 @@ public: std::string getStatus() const override { return ""; }; void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override; void onClose(DistributorStripeMessageSender& sender) override; - void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const { return _newestTimestampLocation; } private: - PersistenceMessageTrackerImpl _trackerInstance; - PersistenceMessageTracker& _tracker; - std::shared_ptr<api::UpdateCommand> _msg; - std::vector<BucketDatabase::Entry> _entries; - const api::Timestamp _new_timestamp; - const bool _is_auto_create_update; - - const DistributorNodeContext& _node_ctx; - DistributorStripeOperationContext& _op_ctx; - DistributorBucketSpace &_bucketSpace; + PersistenceMessageTracker _tracker; + std::shared_ptr<api::UpdateCommand> _msg; + std::vector<BucketDatabase::Entry> _entries; + const api::Timestamp _new_timestamp; + const bool _is_auto_create_update; + const DistributorNodeContext& _node_ctx; + DistributorStripeOperationContext& _op_ctx; + DistributorBucketSpace& _bucketSpace; std::pair<document::BucketId, uint16_t> _newestTimestampLocation; - api::BucketInfo _infoAtSendTime; // Should be same across all replicas + api::BucketInfo _infoAtSendTime; // Should be same across all replicas bool anyStorageNodesAvailable() const; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index bf64fa2eb82..e384163f421 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -23,7 +23,6 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu _cluster_state_version_at_phase1_start_time(0), _remove_candidates(), _replica_info(), - _cancel_scope(), _max_documents_removed(0), _is_done(false) {} @@ -150,10 +149,6 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender, } } -void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) { - _cancel_scope.merge(cancel_scope); -} - void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) { _replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(), from_node, reply.getBucketInfo()); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 97efbe694de..d5c6d655857 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -7,7 +7,6 @@ #include <vespa/storage/bucketdb/bucketcopy.h> #include <vespa/storage/distributor/messagetracker.h> #include <vespa/storage/distributor/operation_sequencer.h> -#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/vespalib/stllike/hash_map.h> #include <vector> @@ -23,7 +22,6 @@ public: void onStart(DistributorStripeMessageSender& sender) override; void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; - void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override; const char* getName() const noexcept override { return "garbagecollection"; }; Type getType() const noexcept override { return GARBAGE_COLLECTION; } bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; @@ -56,7 +54,6 @@ private: RemoveCandidates _remove_candidates; std::vector<SequencingHandle> _gc_write_locks; std::vector<BucketCopy> _replica_info; - CancelScope _cancel_scope; uint32_t _max_documents_removed; bool _is_done; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index a69b7739e07..09082e718e0 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -28,6 +28,7 @@ IdealStateOperation::IdealStateOperation(const BucketAndNodes& bucketAndNodes) : _manager(nullptr), _bucketSpace(nullptr), _bucketAndNodes(bucketAndNodes), + _detailedReason(), _priority(255), _ok(true) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index 6c52bdb738d..ba4a2f95686 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/storage/distributor/maintenance/maintenanceoperation.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/storageapi/messageapi/storagereply.h> #include <vespa/storageapi/messageapi/maintenancecommand.h> @@ -110,7 +111,7 @@ public: using Vector = std::vector<SP>; using Map = std::map<document::BucketId, SP>; - IdealStateOperation(const BucketAndNodes& bucketAndNodes); + explicit IdealStateOperation(const BucketAndNodes& bucketAndNodes); ~IdealStateOperation() override; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index 616c4962dca..6153306861c 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -93,14 +93,18 @@ JoinOperation::enqueueJoinMessagePerTargetNode( void JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg) { - auto& rep = static_cast<api::JoinBucketsReply&>(*msg); + auto& rep = dynamic_cast<api::JoinBucketsReply&>(*msg); uint16_t node = _tracker.handleReply(rep); if (node == 0xffff) { LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons"); return; } - if (rep.getResult().success()) { + _ok = rep.getResult().success(); + if (_cancel_scope.node_is_cancelled(node)) { + LOG(debug, "Join operation for %s has been cancelled", getBucketId().toString().c_str()); + _ok = false; + } else if (rep.getResult().success()) { const std::vector<document::BucketId>& sourceBuckets( rep.getSourceBuckets()); for (auto bucket : sourceBuckets) { @@ -133,7 +137,6 @@ JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRepl LOG(debug, "Join failed for %s with non-critical failure: %s", getBucketId().toString().c_str(), rep.getResult().toString().c_str()); } - _ok = rep.getResult().success(); LOG(debug, "Bucket %s join finished", getBucketId().toString().c_str()); if (_tracker.finished()) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 9469403daae..0a11a8233aa 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -177,7 +177,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge( const BucketDatabase::Entry& currentState) const { assert(currentState.valid()); - for (const auto & mnode : _mnodes) { + for (const auto& mnode : _mnodes) { const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index)); if (!copyBefore) { continue; @@ -205,7 +205,7 @@ MergeOperation::deleteSourceOnlyNodes( { assert(currentState.valid()); std::vector<uint16_t> sourceOnlyNodes; - for (const auto & mnode : _mnodes) { + for (const auto& mnode : _mnodes) { const uint16_t nodeIndex = mnode.index; const BucketCopy* copy = currentState->getNode(nodeIndex); if (!copy) { @@ -272,7 +272,14 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::sha api::ReturnCode result = reply.getResult(); _ok = result.success(); - if (_ok) { + // We avoid replica deletion entirely if _any_ aspect of the merge has been cancelled. + // It is, for instance, possible that a node that was previously considered source-only + // now is part of the #redundancy ideal copies because another node became unavailable. + // Leave it up to the maintenance state checkers to figure this out. + if (_cancel_scope.is_cancelled()) { + LOG(debug, "Merge operation for %s has been cancelled", getBucketId().toString().c_str()); + _ok = false; + } else if (_ok) { BucketDatabase::Entry entry(_bucketSpace->getBucketDatabase().get(getBucketId())); if (!entry.valid()) { LOG(debug, "Bucket %s no longer exists after merge", getBucketId().toString().c_str()); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index 41767f0e3af..2184739f82c 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -59,13 +59,14 @@ RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply { auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get()); - uint16_t node = _tracker.handleReply(*rep); + const uint16_t node = _tracker.handleReply(*rep); - LOG(debug, "Got DeleteBucket reply for %s from node %u", - getBucketId().toString().c_str(), - node); + LOG(debug, "Got DeleteBucket reply for %s from node %u", getBucketId().toString().c_str(), node); - if (rep->getResult().failed()) { + if (_cancel_scope.node_is_cancelled(node)) { + LOG(debug, "DeleteBucket operation for %s has been cancelled", getBucketId().toString().c_str()); + _ok = false; + } else if (rep->getResult().failed()) { if (rep->getResult().getResult() == api::ReturnCode::REJECTED && rep->getBucketInfo().valid()) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp index 531f7f64b68..5ddf082a544 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp @@ -69,13 +69,17 @@ void SetBucketStateOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& reply) { - auto& rep = static_cast<api::SetBucketStateReply&>(*reply); + auto& rep = dynamic_cast<api::SetBucketStateReply&>(*reply); const uint16_t node = _tracker.handleReply(rep); LOG(debug, "Got %s from node %u", reply->toString(true).c_str(), node); bool deactivate = false; - if (reply->getResult().success()) { + + if (_cancel_scope.node_is_cancelled(node)) { + LOG(debug, "SetBucketState for %s has been cancelled", rep.getBucketId().toString().c_str()); + _ok = false; + } else if (reply->getResult().success()) { BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId()); if (entry.valid()) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index d704a42e96b..c894deeecd8 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -50,7 +50,7 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender) void SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg) { - auto & rep = static_cast<api::SplitBucketReply&>(*msg); + auto& rep = dynamic_cast<api::SplitBucketReply&>(*msg); uint16_t node = _tracker.handleReply(rep); @@ -61,7 +61,9 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep std::ostringstream ost; - if (rep.getResult().success()) { + if (_cancel_scope.node_is_cancelled(node)) { + _ok = false; + } else if (rep.getResult().success()) { BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId()); if (entry.valid()) { diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp index 9f944a94178..f60dc8eecff 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.cpp +++ b/storage/src/vespa/storage/distributor/operations/operation.cpp @@ -13,7 +13,7 @@ namespace storage::distributor { Operation::Operation() : _startTime(), - _cancelled(false) + _cancel_scope() { } @@ -47,7 +47,7 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo } void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) { - _cancelled = true; + _cancel_scope.merge(cancel_scope); on_cancel(sender, cancel_scope); } diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h index 64caacfc642..c742f918c30 100644 --- a/storage/src/vespa/storage/distributor/operations/operation.h +++ b/storage/src/vespa/storage/distributor/operations/operation.h @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "cancel_scope.h" #include <vespa/vdslib/state/nodetype.h> #include <vespa/storage/distributor/distributormessagesender.h> #include <vespa/vespalib/util/time.h> @@ -68,13 +69,15 @@ public: */ void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope); + [[nodiscard]] const CancelScope& cancel_scope() const noexcept { return _cancel_scope; } + /** * Whether cancel() has been invoked at least once on this instance. This does not * distinguish between cancellations caused by ownership transfers and those caused * by nodes becoming unavailable; Operation implementations that care about this need - * to implement cancel() themselves and inspect the provided CancelScope. + * to inspect cancel_scope() themselves. */ - [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; } + [[nodiscard]] bool is_cancelled() const noexcept { return _cancel_scope.is_cancelled(); } /** * Returns true if we are blocked to start this operation given @@ -118,7 +121,7 @@ protected: static constexpr vespalib::duration MAX_TIMEOUT = 3600s; vespalib::system_time _startTime; - bool _cancelled; + CancelScope _cancel_scope; }; } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index 498f3a5feab..43e3f6831d4 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -13,11 +13,12 @@ LOG_SETUP(".persistencemessagetracker"); namespace storage::distributor { -PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( +PersistenceMessageTracker::PersistenceMessageTracker( PersistenceOperationMetricSet& metric, std::shared_ptr<api::BucketInfoReply> reply, const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, + CancelScope& cancel_scope, api::Timestamp revertTimestamp) : MessageTracker(node_ctx), _remapBucketInfo(), @@ -28,7 +29,7 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( _revertTimestamp(revertTimestamp), _trace(_reply->getTrace().getLevel()), _requestTimer(node_ctx.clock()), - _cancel_scope(), + _cancel_scope(cancel_scope), _n_persistence_replies_total(0), _n_successful_persistence_replies(0), _priority(_reply->getPriority()), @@ -36,33 +37,35 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( { } -PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default; +PersistenceMessageTracker::~PersistenceMessageTracker() = default; -void -PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope) -{ - _cancel_scope.merge(cancel_scope); -} - -void -PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present( +PersistenceMessageTracker::PostPruningStatus +PersistenceMessageTracker::prune_cancelled_nodes_if_present( BucketInfoMap& bucket_and_replicas, const CancelScope& cancel_scope) { + bool any_replicas = false; for (auto& info : bucket_and_replicas) { info.second = prune_cancelled_nodes(info.second, cancel_scope); + any_replicas |= !info.second.empty(); } + return (any_replicas ? PostPruningStatus ::ReplicasStillPresent + : PostPruningStatus::NoReplicasPresent); } void -PersistenceMessageTrackerImpl::updateDB() +PersistenceMessageTracker::updateDB() { if (_cancel_scope.is_cancelled()) { if (_cancel_scope.fully_cancelled()) { return; // Fully cancelled ops cannot mutate the DB at all } - prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope); - prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope); + const bool any_infos = still_has_replicas(prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope)); + const bool any_remapped = still_has_replicas(prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope)); + if (!(any_infos || any_remapped)) { + LOG(spam, "No usable bucket info left after pruning; returning without updating DB"); + return; + } } for (const auto & entry : _bucketInfo) { @@ -75,7 +78,7 @@ PersistenceMessageTrackerImpl::updateDB() } void -PersistenceMessageTrackerImpl::updateMetrics() +PersistenceMessageTracker::updateMetrics() { const api::ReturnCode& result(_reply->getResult()); _metric.updateFromResult(result); @@ -83,7 +86,7 @@ PersistenceMessageTrackerImpl::updateMetrics() } void -PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode& result) { +PersistenceMessageTracker::fail(MessageSender& sender, const api::ReturnCode& result) { if (_reply.get()) { _reply->setResult(result); updateMetrics(); @@ -94,7 +97,7 @@ PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode } uint16_t -PersistenceMessageTrackerImpl::receiveReply(MessageSender& sender, api::BucketInfoReply& reply) +PersistenceMessageTracker::receiveReply(MessageSender& sender, api::BucketInfoReply& reply) { uint16_t node = handleReply(reply); @@ -106,7 +109,7 @@ PersistenceMessageTrackerImpl::receiveReply(MessageSender& sender, api::BucketIn } void -PersistenceMessageTrackerImpl::revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes) +PersistenceMessageTracker::revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes) { if (_revertTimestamp != 0) { // Since we're reverting, all received bucket info is voided. @@ -126,7 +129,7 @@ PersistenceMessageTrackerImpl::revert(MessageSender& sender, const std::vector<B } void -PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) { +PersistenceMessageTracker::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) { _messageBatches.emplace_back(); auto & batch = _messageBatches.back(); batch.reserve(messages.size()); @@ -142,7 +145,7 @@ PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToS } bool -PersistenceMessageTrackerImpl::canSendReplyEarly() const +PersistenceMessageTracker::canSendReplyEarly() const { if (!_reply.get() || !_reply->getResult().success()) { LOG(spam, "Can't return early because we have already replied or failed"); @@ -181,7 +184,7 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const } void -PersistenceMessageTrackerImpl::addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply) +PersistenceMessageTracker::addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply) { document::Bucket bucket(reply.getBucket()); const api::BucketInfo& bucketInfo(reply.getBucketInfo()); @@ -198,7 +201,7 @@ PersistenceMessageTrackerImpl::addBucketInfoFromReply(uint16_t node, const api:: } void -PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const +PersistenceMessageTracker::logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const { LOG(spam, "Bucket %s: Received successful reply %s", reply.getBucketId().toString().c_str(), reply.toString().c_str()); @@ -211,26 +214,26 @@ PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::Buck } bool -PersistenceMessageTrackerImpl::shouldRevert() const +PersistenceMessageTracker::shouldRevert() const { return _op_ctx.distributor_config().enable_revert() && !_revertNodes.empty() && !_success && _reply; } -bool PersistenceMessageTrackerImpl::has_majority_successful_replies() const noexcept { +bool PersistenceMessageTracker::has_majority_successful_replies() const noexcept { // FIXME this has questionable interaction with early client ACK since we only count // the number of observed replies rather than the number of total requests sent. // ... but the early ACK-feature dearly needs a redesign anyway. return (_n_successful_persistence_replies >= (_n_persistence_replies_total / 2 + 1)); } -bool PersistenceMessageTrackerImpl::has_minority_test_and_set_failure() const noexcept { +bool PersistenceMessageTracker::has_minority_test_and_set_failure() const noexcept { return ((_reply->getResult().getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED) && has_majority_successful_replies()); } void -PersistenceMessageTrackerImpl::sendReply(MessageSender& sender) +PersistenceMessageTracker::sendReply(MessageSender& sender) { // If we've observed _partial_ TaS failures but have had a majority of good ACKs, // treat the reply as successful. This is because the ACKed write(s) will eventually @@ -247,7 +250,7 @@ PersistenceMessageTrackerImpl::sendReply(MessageSender& sender) } void -PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& reply) +PersistenceMessageTracker::updateFailureResult(const api::BucketInfoReply& reply) { LOG(debug, "Bucket %s: Received failed reply %s with result %s", reply.getBucketId().toString().c_str(), reply.toString().c_str(), reply.getResult().toString().c_str()); @@ -259,13 +262,13 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r } bool -PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept +PersistenceMessageTracker::node_is_effectively_cancelled(uint16_t node) const noexcept { return _cancel_scope.node_is_cancelled(node); // Implicitly covers the fully cancelled case } void -PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node) +PersistenceMessageTracker::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node) { LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node); if (!reply.getResult().success() @@ -285,7 +288,7 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& rep } void -PersistenceMessageTrackerImpl::handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node) +PersistenceMessageTracker::handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node) { ++_n_persistence_replies_total; if (reply.getBucketInfo().valid()) { @@ -301,7 +304,7 @@ PersistenceMessageTrackerImpl::handlePersistenceReply(api::BucketInfoReply& repl } void -PersistenceMessageTrackerImpl::transfer_trace_state_to_reply() +PersistenceMessageTracker::transfer_trace_state_to_reply() { if (!_trace.isEmpty()) { _trace.setStrict(false); @@ -310,7 +313,7 @@ PersistenceMessageTrackerImpl::transfer_trace_state_to_reply() } void -PersistenceMessageTrackerImpl::updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) +PersistenceMessageTracker::updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) { _trace.addChild(reply.steal_trace()); @@ -338,7 +341,7 @@ PersistenceMessageTrackerImpl::updateFromReply(MessageSender& sender, api::Bucke } void -PersistenceMessageTrackerImpl::add_trace_tree_to_reply(vespalib::Trace trace) +PersistenceMessageTracker::add_trace_tree_to_reply(vespalib::Trace trace) { _trace.addChild(std::move(trace)); } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index 8c44d70062c..06ad8dc95b3 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -11,47 +11,29 @@ namespace storage::distributor { -struct PersistenceMessageTracker { - virtual ~PersistenceMessageTracker() = default; - using ToSend = MessageTracker::ToSend; - - virtual void cancel(const CancelScope& cancel_scope) = 0; - virtual void fail(MessageSender&, const api::ReturnCode&) = 0; - virtual void queueMessageBatch(std::vector<ToSend> messages) = 0; - virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0; - virtual std::shared_ptr<api::BucketInfoReply>& getReply() = 0; - virtual void updateFromReply(MessageSender&, api::BucketInfoReply&, uint16_t node) = 0; - virtual void queueCommand(api::BucketCommand::SP, uint16_t target) = 0; - virtual void flushQueue(MessageSender&) = 0; - virtual uint16_t handleReply(api::BucketReply& reply) = 0; - virtual void add_trace_tree_to_reply(vespalib::Trace trace) = 0; -}; - -class PersistenceMessageTrackerImpl final - : public PersistenceMessageTracker, - public MessageTracker -{ +class PersistenceMessageTracker final : public MessageTracker { public: - PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric, - std::shared_ptr<api::BucketInfoReply> reply, - const DistributorNodeContext& node_ctx, - DistributorStripeOperationContext& op_ctx, - api::Timestamp revertTimestamp = 0); - ~PersistenceMessageTrackerImpl() override; + using ToSend = MessageTracker::ToSend; - void cancel(const CancelScope& cancel_scope) override; + PersistenceMessageTracker(PersistenceOperationMetricSet& metric, + std::shared_ptr<api::BucketInfoReply> reply, + const DistributorNodeContext& node_ctx, + DistributorStripeOperationContext& op_ctx, + CancelScope& cancel_scope, + api::Timestamp revertTimestamp = 0); + ~PersistenceMessageTracker(); void updateDB(); void updateMetrics(); [[nodiscard]] bool success() const noexcept { return _success; } - void fail(MessageSender& sender, const api::ReturnCode& result) override; + void fail(MessageSender& sender, const api::ReturnCode& result); /** Returns the node the reply was from. */ - uint16_t receiveReply(MessageSender& sender, api::BucketInfoReply& reply) override; - void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) override; - std::shared_ptr<api::BucketInfoReply>& getReply() override { return _reply; } + uint16_t receiveReply(MessageSender& sender, api::BucketInfoReply& reply); + void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node); + std::shared_ptr<api::BucketInfoReply>& getReply() { return _reply; } using BucketNodePair = std::pair<document::Bucket, uint16_t>; @@ -63,7 +45,9 @@ public: have at most (messages.size() - initial redundancy) messages left in the queue and have it's first message be done. */ - void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override; + void queueMessageBatch(std::vector<MessageTracker::ToSend> messages); + + void add_trace_tree_to_reply(vespalib::Trace trace); private: using MessageBatch = std::vector<uint64_t>; @@ -79,14 +63,25 @@ private: std::vector<BucketNodePair> _revertNodes; mbus::Trace _trace; framework::MilliSecTimer _requestTimer; - CancelScope _cancel_scope; + CancelScope& _cancel_scope; uint32_t _n_persistence_replies_total; uint32_t _n_successful_persistence_replies; uint8_t _priority; bool _success; - static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas, - const CancelScope& cancel_scope); + enum class PostPruningStatus { + ReplicasStillPresent, + NoReplicasPresent + }; + + constexpr static bool still_has_replicas(PostPruningStatus status) { + return status == PostPruningStatus::ReplicasStillPresent; + } + + // Returns ReplicasStillPresent iff `bucket_and_replicas` has at least 1 usable entry after pruning, + // otherwise returns NoReplicasPresent + [[nodiscard]] static PostPruningStatus prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas, + const CancelScope& cancel_scope); [[nodiscard]] bool canSendReplyEarly() const; void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply); void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const; @@ -100,13 +95,6 @@ private: void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node); void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node); void transfer_trace_state_to_reply(); - - void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) override { - MessageTracker::queueCommand(std::move(msg), target); - } - void flushQueue(MessageSender& s) override { MessageTracker::flushQueue(s); } - uint16_t handleReply(api::BucketReply& r) override { return MessageTracker::handleReply(r); } - void add_trace_tree_to_reply(vespalib::Trace trace) override; }; } diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.cpp b/storage/src/vespa/storage/distributor/sentmessagemap.cpp index 4b7292c1e81..2ae70f417d1 100644 --- a/storage/src/vespa/storage/distributor/sentmessagemap.cpp +++ b/storage/src/vespa/storage/distributor/sentmessagemap.cpp @@ -18,18 +18,30 @@ SentMessageMap::SentMessageMap() SentMessageMap::~SentMessageMap() = default; +Operation* +SentMessageMap::find_by_id_or_nullptr(api::StorageMessage::Id id) const noexcept +{ + auto iter = _map.find(id); + return ((iter != _map.end()) ? iter->second.get() : nullptr); +} + +std::shared_ptr<Operation> +SentMessageMap::find_by_id_or_empty(api::StorageMessage::Id id) const noexcept +{ + auto iter = _map.find(id); + return ((iter != _map.end()) ? iter->second : std::shared_ptr<Operation>()); +} std::shared_ptr<Operation> SentMessageMap::pop() { auto found = _map.begin(); - if (found != _map.end()) { - std::shared_ptr<Operation> retVal = found->second; + std::shared_ptr<Operation> op = std::move(found->second); _map.erase(found); - return retVal; + return op; } else { - return std::shared_ptr<Operation>(); + return {}; } } @@ -37,17 +49,15 @@ std::shared_ptr<Operation> SentMessageMap::pop(api::StorageMessage::Id id) { auto found = _map.find(id); - if (found != _map.end()) { LOG(spam, "Found Id %" PRIu64 " in callback map: %p", id, found->second.get()); - std::shared_ptr<Operation> retVal = found->second; + std::shared_ptr<Operation> op = std::move(found->second); _map.erase(found); - return retVal; + return op; } else { LOG(spam, "Did not find Id %" PRIu64 " in callback map", id); - - return std::shared_ptr<Operation>(); + return {}; } } @@ -55,7 +65,6 @@ void SentMessageMap::insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & callback) { LOG(spam, "Inserting callback %p for message %" PRIu64 "", callback.get(), id); - _map[id] = callback; } diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h index 951ed6a6877..3ad80f4e55d 100644 --- a/storage/src/vespa/storage/distributor/sentmessagemap.h +++ b/storage/src/vespa/storage/distributor/sentmessagemap.h @@ -15,6 +15,11 @@ public: SentMessageMap(); ~SentMessageMap(); + // Find by message ID, or nullptr if not found + [[nodiscard]] Operation* find_by_id_or_nullptr(api::StorageMessage::Id id) const noexcept; + // Find by message ID, or empty shared_ptr if not found + [[nodiscard]] std::shared_ptr<Operation> find_by_id_or_empty(api::StorageMessage::Id id) const noexcept; + [[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id); [[nodiscard]] std::shared_ptr<Operation> pop(); diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp index fd747484ccf..ad8fae9cd74 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp @@ -130,6 +130,7 @@ StripeBucketDBUpdater::sendRequestBucketInfo( const document::Bucket& bucket, const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard) { + // TODO assert if cancellation enabled if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) { return; } @@ -342,14 +343,19 @@ StripeBucketDBUpdater::onMergeBucketReply( const std::shared_ptr<api::MergeBucketReply>& reply) { auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply); + auto merge_op = _distributor_interface.maintenance_op_from_message_id(reply->getMsgId()); // In case the merge was unsuccessful somehow, or some nodes weren't // actually merged (source-only nodes?) we request the bucket info of the // bucket again to make sure it's ok. for (uint32_t i = 0; i < reply->getNodes().size(); i++) { - sendRequestBucketInfo(reply->getNodes()[i].index, - reply->getBucket(), - replyGuard); + const uint16_t node_index = reply->getNodes()[i].index; + // We conditionally omit the node instead of conditionally send to it, as many tests do not + // wire their merges through the main distributor maintenance operation tracking locking. + if (merge_op && merge_op->cancel_scope().node_is_cancelled(node_index)) { + continue; + } + sendRequestBucketInfo(node_index, reply->getBucket(), replyGuard); } return true; @@ -502,8 +508,9 @@ StripeBucketDBUpdater::processSingleBucketInfoReply(const std::shared_ptr<api::R BucketRequest req = iter->second; _sentMessages.erase(iter); - if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) { - // Ignore replies from nodes that are down. + // TODO remove explicit node check in favor of cancellation only + if (req.cancelled || !_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) { + // Ignore replies from nodes that are cancelled/down. return true; } if (repl->getResult().getResult() != api::ReturnCode::OK) { @@ -516,6 +523,17 @@ StripeBucketDBUpdater::processSingleBucketInfoReply(const std::shared_ptr<api::R return true; } +bool +StripeBucketDBUpdater::cancel_message_by_id(uint64_t msg_id) +{ + auto iter = _sentMessages.find(msg_id); + if (iter == _sentMessages.end()) { + return false; + } + iter->second.cancelled = true; + return true; +} + void StripeBucketDBUpdater::addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node, BucketListMerger::BucketList& existing) @@ -652,12 +670,10 @@ StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover( _nonOwnedBuckets(), _removed_buckets(0), _removed_documents(0), - _localIndex(localIndex), _distribution(distribution), _upStates(upStates), - _track_non_owned_entries(track_non_owned_entries), - _cachedDecisionSuperbucket(UINT64_MAX), - _cachedOwned(false) + _ownership_calc(_state, _distribution, localIndex), + _track_non_owned_entries(track_non_owned_entries) { const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE); _available_nodes.resize(storage_count); @@ -678,45 +694,15 @@ StripeBucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& b LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg); } -namespace { - -uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept { - // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest. - return id.getRawId() & ~(UINT64_MAX << distribution_bits); -} - -} - bool StripeBucketDBUpdater::MergingNodeRemover::distributorOwnsBucket( const document::BucketId& bucketId) const { - // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor. - // TODO "too few bits used" case can be cheaply checked without needing exception - try { - const auto bits = _state.getDistributionBitCount(); - const auto this_superbucket = superbucket_from_id(bucketId, bits); - if (_cachedDecisionSuperbucket == this_superbucket) { - if (!_cachedOwned) { - logRemove(bucketId, "bucket now owned by another distributor (cached)"); - } - return _cachedOwned; - } - - uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim"); - _cachedDecisionSuperbucket = this_superbucket; - _cachedOwned = (distributor == _localIndex); - if (!_cachedOwned) { - logRemove(bucketId, "bucket now owned by another distributor"); - return false; - } - return true; - } catch (lib::TooFewBucketBitsInUseException& exc) { - logRemove(bucketId, "using too few distribution bits now"); - } catch (lib::NoDistributorsAvailableException& exc) { - logRemove(bucketId, "no distributors are available"); + const bool owns_bucket = _ownership_calc.this_distributor_owns_bucket(bucketId); + if (!owns_bucket) { + logRemove(bucketId, "bucket now owned by another distributor"); } - return false; + return owns_bucket; } void diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index 2e4ef2a7543..9536c84691d 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "bucket_ownership_calculator.h" #include "bucketlistmerger.h" #include "distributor_stripe_component.h" #include "distributormessagesender.h" @@ -49,6 +50,7 @@ public: bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override; bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override; void resendDelayedMessages(); + [[nodiscard]] bool cancel_message_by_id(uint64_t msg_id); vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const; vespalib::string getReportContentType(const framework::HttpUrlPath&) const override; @@ -75,7 +77,8 @@ public: private: class MergeReplyGuard { public: - MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept + MergeReplyGuard(DistributorStripeInterface& distributor_interface, + const std::shared_ptr<api::MergeBucketReply>& reply) noexcept : _distributor_interface(distributor_interface), _reply(reply) {} ~MergeReplyGuard(); @@ -89,22 +92,23 @@ private: }; struct BucketRequest { - BucketRequest() - : targetNode(0), bucket(), timestamp(0) {}; + BucketRequest() noexcept : targetNode(0), bucket(), timestamp(0), cancelled(false) {} BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b, - const std::shared_ptr<MergeReplyGuard>& guard) + const std::shared_ptr<MergeReplyGuard>& guard) noexcept : targetNode(t), bucket(b), timestamp(currentTime), - _mergeReplyGuard(guard) {}; + _mergeReplyGuard(guard), + cancelled(false) + {} void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute ×tampAttribute) const; uint16_t targetNode; document::Bucket bucket; uint64_t timestamp; - std::shared_ptr<MergeReplyGuard> _mergeReplyGuard; + bool cancelled; }; struct EnqueuedBucketRecheck { @@ -148,7 +152,7 @@ private: static void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl, uint16_t targetNode, BucketListMerger::BucketList& newList); void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket, - const std::shared_ptr<MergeReplyGuard>& mergeReplystatic ); + const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard); static void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node, BucketListMerger::BucketList& existing); void clearReadOnlyBucketRepoDatabases(); @@ -218,12 +222,10 @@ private: std::vector<BucketDatabase::Entry> _nonOwnedBuckets; size_t _removed_buckets; size_t _removed_documents; - uint16_t _localIndex; const lib::Distribution& _distribution; const char* _upStates; + BucketOwnershipCalculator _ownership_calc; bool _track_non_owned_entries; - mutable uint64_t _cachedDecisionSuperbucket; - mutable bool _cachedOwned; }; using DistributionContexts = std::unordered_map<document::BucketSpace, |