diff options
Diffstat (limited to 'storage/src/tests/distributor/distributor_stripe_test.cpp')
-rw-r--r-- | storage/src/tests/distributor/distributor_stripe_test.cpp | 298 |
1 files changed, 261 insertions, 37 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index 566fb704105..c87f5133997 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -14,6 +14,7 @@ #include <vespa/storageapi/message/visitor.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/util/stringfmt.h> #include <gmock/gmock.h> using document::Bucket; @@ -47,8 +48,8 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { // Simple type aliases to make interfacing with certain utility functions // easier. Note that this is only for readability and does not provide any // added type safety. - using NodeCount = int; - using Redundancy = int; + using NodeCount = uint16_t; + using Redundancy = uint16_t; using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; @@ -137,82 +138,114 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { return _stripe->handleMessage(msg); } - void configure_stale_reads_enabled(bool enabled) { + template <typename Func> + void configure_stripe_with(Func f) { ConfigBuilder builder; - builder.allowStaleReadsDuringClusterStateTransitions = enabled; + f(builder); configure_stripe(builder); } + void configure_stale_reads_enabled(bool enabled) { + configure_stripe_with([&](auto& builder) { + builder.allowStaleReadsDuringClusterStateTransitions = enabled; + }); + } + void configure_update_fast_path_restart_enabled(bool enabled) { - ConfigBuilder builder; - builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled; + }); } void configure_merge_operations_disabled(bool disabled) { - ConfigBuilder builder; - builder.mergeOperationsDisabled = disabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.mergeOperationsDisabled = disabled; + }); } void configure_use_weak_internal_read_consistency(bool use_weak) { - ConfigBuilder builder; - builder.useWeakInternalReadConsistencyForClientGets = use_weak; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.useWeakInternalReadConsistencyForClientGets = use_weak; + }); } void configure_metadata_update_phase_enabled(bool enabled) { - ConfigBuilder builder; - builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled; + }); } void configure_prioritize_global_bucket_merges(bool enabled) { - ConfigBuilder builder; - builder.prioritizeGlobalBucketMerges = enabled; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.prioritizeGlobalBucketMerges = enabled; + }); } void configure_max_activation_inhibited_out_of_sync_groups(uint32_t n_groups) { - ConfigBuilder builder; - builder.maxActivationInhibitedOutOfSyncGroups = n_groups; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.maxActivationInhibitedOutOfSyncGroups = n_groups; + }); } void configure_implicitly_clear_priority_on_schedule(bool implicitly_clear) { - ConfigBuilder builder; - builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear; + }); } void configure_use_unordered_merge_chaining(bool use_unordered) { - ConfigBuilder builder; - builder.useUnorderedMergeChaining = use_unordered; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.useUnorderedMergeChaining = use_unordered; + }); } void configure_enable_two_phase_garbage_collection(bool use_two_phase) { - ConfigBuilder builder; - builder.enableTwoPhaseGarbageCollection = use_two_phase; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.enableTwoPhaseGarbageCollection = use_two_phase; + }); } void configure_enable_condition_probing(bool enable_probing) { - ConfigBuilder builder; - builder.enableConditionProbing = enable_probing; - configure_stripe(builder); + configure_stripe_with([&](auto& builder) { + builder.enableConditionProbing = enable_probing; + }); + } + + void configure_enable_operation_cancellation(bool enable_cancellation) { + configure_stripe_with([&](auto& builder) { + builder.enableOperationCancellation = enable_cancellation; + }); } - bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { + [[nodiscard]] bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { return _stripe->_scheduler->implicitly_clear_priority_on_schedule(); } + [[nodiscard]] bool distributor_owns_bucket_in_current_and_pending_states(document::BucketId bucket_id) const { + return (getDistributorBucketSpace().get_bucket_ownership_flags(bucket_id).owned_in_pending_state() && + getDistributorBucketSpace().check_ownership_in_pending_and_current_state(bucket_id).isOwned()); + } + void configureMaxClusterClockSkew(int seconds); void configure_mutation_sequencing(bool enabled); void configure_merge_busy_inhibit_duration(int seconds); void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled); + void simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending); + static std::shared_ptr<api::RemoveReply> make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd); + + // TODO dedupe + auto sent_get_command(size_t idx) { return sent_command<api::GetCommand>(idx); } + + auto make_get_reply(size_t idx, api::Timestamp ts, bool is_tombstone, bool condition_matched) { + return std::make_shared<api::GetReply>(*sent_get_command(idx), std::shared_ptr<document::Document>(), ts, + false, is_tombstone, condition_matched); + } + + void set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx); + void do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state); + void do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state); }; DistributorStripeTest::DistributorStripeTest() @@ -224,6 +257,34 @@ DistributorStripeTest::DistributorStripeTest() DistributorStripeTest::~DistributorStripeTest() = default; +void +DistributorStripeTest::simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending) +{ + simulate_set_pending_cluster_state(state_str); + if (clear_pending) { + enable_cluster_state(state_str); + clear_pending_cluster_state_bundle(); + } +} + +std::shared_ptr<api::RemoveReply> +DistributorStripeTest::make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd) +{ + auto& cmd_as_remove = dynamic_cast<api::RemoveCommand&>(originator_cmd); + auto reply = std::dynamic_pointer_cast<api::RemoveReply>(std::shared_ptr<api::StorageReply>(cmd_as_remove.makeReply())); + reply->setOldTimestamp(100); + // Including a bucket remapping as part of the response is a pragmatic way to avoid false + // negatives when testing whether cancelled operations may mutate the DB. This is because + // non-remapped buckets are not created in the DB if they are already removed (which will + // be the case after bucket pruning on a cluster state change), but remapped buckets _are_ + // implicitly created upon insert. + // We expect the original bucket is 16 bits and fake a remap to a split bucket one level + // below the original bucket. + reply->remapBucketId(BucketId(17, (cmd_as_remove.getBucketId().getId() & 0xFFFF) | 0x10000)); + reply->setBucketInfo(api::BucketInfo(0x1234, 2, 300)); + return reply; +} + TEST_F(DistributorStripeTest, operation_generation) { setup_stripe(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); @@ -455,8 +516,7 @@ TEST_F(DistributorStripeTest, no_db_resurrection_for_bucket_not_owned_in_pending simulate_set_pending_cluster_state("storage:10 distributor:10"); document::BucketId nonOwnedBucket(16, 3); - EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(nonOwnedBucket).owned_in_pending_state()); - EXPECT_FALSE(getDistributorBucketSpace().check_ownership_in_pending_and_current_state(nonOwnedBucket).isOwned()); + ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(nonOwnedBucket)); std::vector<BucketCopy> copies; copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2)); @@ -1052,4 +1112,168 @@ TEST_F(DistributorStripeTest, enable_condition_probing_config_is_propagated_to_i EXPECT_TRUE(getConfig().enable_condition_probing()); } +TEST_F(DistributorStripeTest, enable_operation_cancellation_config_is_propagated_to_internal_config) { + setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + EXPECT_FALSE(getConfig().enable_operation_cancellation()); // TODO switch default once ready + + configure_enable_operation_cancellation(false); + EXPECT_FALSE(getConfig().enable_operation_cancellation()); + + configure_enable_operation_cancellation(true); + EXPECT_TRUE(getConfig().enable_operation_cancellation()); +} + +TEST_F(DistributorStripeTest, cluster_state_node_down_edge_cancels_pending_operations_on_unavailable_nodes) { + setup_stripe(Redundancy(1), NodeCount(1), "version:1 distributor:1 storage:1"); + configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled + addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t"); + + stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1} + ASSERT_EQ(_sender.getCommands(true), "Remove => 0"); + + // Oh no, node 0 goes down while we have a pending operation! + simulate_cluster_state_transition("version:2 distributor:1 storage:1 .0.s:d", true); + EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(16, 1))); // Implicitly cleared + + auto reply = make_remove_reply_with_bucket_remap(*_sender.command(0)); + // Before we receive the reply, node 0 is back online. Even though the node is available in the + // cluster state, we should not apply its bucket info to our DB, as it may represent stale + // information (it's possible that our node froze up and that another distributor took over and + // mutated the bucket in the meantime; a classic ABA scenario). + simulate_cluster_state_transition("version:5 distributor:1 storage:1", true); + + _stripe->handleReply(std::move(reply)); + EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(17, 0x10001))); +} + +TEST_F(DistributorStripeTest, distribution_config_change_edge_cancels_pending_operations_on_unavailable_nodes) { + setup_stripe(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); + configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled + addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t,1=3/4/5/t"); + + stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1} + ASSERT_EQ(_sender.getCommands(true), "Remove => 0,Remove => 1"); + + // Node 1 is configured away; only node 0 remains. This is expected to be closely followed by + // (or--depending on the timing of operations in the cluster--preceded by) a cluster state with + // the node marked as down, but the ordering is not guaranteed. + auto new_config = make_default_distribution_config(1, 1); + simulate_distribution_config_change(std::move(new_config)); + + auto node_0_reply = make_remove_reply_with_bucket_remap(*_sender.command(0)); + auto node_1_reply = make_remove_reply_with_bucket_remap(*_sender.command(1)); + + _stripe->handleReply(std::move(node_0_reply)); + _stripe->handleReply(std::move(node_1_reply)); + + // Only node 0 should be present in the DB + EXPECT_EQ("BucketId(0x4000000000000001) : " // Original bucket + "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=true,active=false,ready=false)", + dumpBucket(BucketId(16, 1))); + EXPECT_EQ("BucketId(0x4400000000010001) : " // Remapped bucket + "node(idx=0,crc=0x1234,docs=2/2,bytes=300/300,trusted=true,active=false,ready=false)", + dumpBucket(BucketId(17, 0x10001))); +} + +void DistributorStripeTest::set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx) { + setup_stripe(Redundancy(1), NodeCount(10), "version:1 distributor:2 storage:2"); + configure_stripe_with([](auto& builder) { + builder.enableConditionProbing = true; + builder.enableOperationCancellation = true; + }); + + NodeSupportedFeatures features; + features.document_condition_probe = true; + set_node_supported_features(0, features); + set_node_supported_features(1, features); + + // Note: replicas are intentionally out of sync to trigger a write-repair. + addNodesToBucketDB(BucketId(16, superbucket_idx), "0=3/4/5,1=4/5/6"); +} + +namespace { + +std::shared_ptr<api::RemoveCommand> make_conditional_remove_request(uint32_t superbucket_idx) { + auto client_remove = std::make_shared<api::RemoveCommand>( + makeDocumentBucket(document::BucketId(0)), + document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%u:foo", superbucket_idx)), + api::Timestamp(0)); + client_remove->setCondition(documentapi::TestAndSetCondition("foo.bar==baz")); + return client_remove; +} + +} + +void DistributorStripeTest::do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state) { + constexpr uint32_t superbucket_idx = 3; + const BucketId bucket_id(16, superbucket_idx); + set_up_for_bucket_ownership_cancellation(superbucket_idx); + // To actually check if cancellation is happening, we need to trigger a code path that + // is only covered by cancellation and not the legacy "check buckets at DB insert time" + // logic. The latter would give a false negative. + stripe_handle_message(make_conditional_remove_request(superbucket_idx)); + ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1"); // Condition probes, thunder cats go! + + simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state); + ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(bucket_id)); + EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // Should have been pruned + + _stripe->handleReply(make_get_reply(0, 100, false, true)); + _stripe->handleReply(make_get_reply(1, 100, false, true)); + + // Condition probe was successful, but operation is cancelled and shall not continue. + ASSERT_EQ(_sender.getCommands(true, false, 2), ""); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:foo:testdoctype1:n=3:foo, timestamp 1, not found) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); + EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // And definitely no resurrection +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_pending_case) { + do_test_cancelled_pending_op_with_bucket_ownership_change(false); +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_not_pending_case) { + do_test_cancelled_pending_op_with_bucket_ownership_change(true); +} + +void DistributorStripeTest::do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state) { + constexpr uint32_t superbucket_idx = 14; + const BucketId bucket_id(16, superbucket_idx); + set_up_for_bucket_ownership_cancellation(superbucket_idx); + + stripe_handle_message(make_conditional_remove_request(superbucket_idx)); + ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1"); + + simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state); + ASSERT_TRUE(distributor_owns_bucket_in_current_and_pending_states(bucket_id)); + EXPECT_EQ("BucketId(0x400000000000000e) : " + "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x4,docs=5/5,bytes=6/6,trusted=false,active=false,ready=false)", + dumpBucket(bucket_id)); // Should _not_ have been pruned + + _stripe->handleReply(make_get_reply(0, 100, false, true)); + _stripe->handleReply(make_get_reply(1, 100, false, true)); + + // Operation can proceed as planned as it has not been cancelled + ASSERT_EQ(_sender.getCommands(true, false, 2), "Remove => 0,Remove => 1"); +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_pending_case) { + do_test_not_cancelled_pending_op_without_bucket_ownership_change(false); +} + +TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_not_pending_case) { + do_test_not_cancelled_pending_op_without_bucket_ownership_change(true); +} + +// TODO we do not have good handling of bucket ownership changes combined with +// distribution config changes... Hard to remove all such edge cases unless we +// make state+config change an atomic operation initiated by the cluster controller +// (hint: we should do this). + + } |