summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/distributor_stripe_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/distributor_stripe_test.cpp')
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp298
1 files changed, 261 insertions, 37 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 566fb704105..c87f5133997 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -14,6 +14,7 @@
#include <vespa/storageapi/message/visitor.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <gmock/gmock.h>
using document::Bucket;
@@ -47,8 +48,8 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
// Simple type aliases to make interfacing with certain utility functions
// easier. Note that this is only for readability and does not provide any
// added type safety.
- using NodeCount = int;
- using Redundancy = int;
+ using NodeCount = uint16_t;
+ using Redundancy = uint16_t;
using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
@@ -137,82 +138,114 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
return _stripe->handleMessage(msg);
}
- void configure_stale_reads_enabled(bool enabled) {
+ template <typename Func>
+ void configure_stripe_with(Func f) {
ConfigBuilder builder;
- builder.allowStaleReadsDuringClusterStateTransitions = enabled;
+ f(builder);
configure_stripe(builder);
}
+ void configure_stale_reads_enabled(bool enabled) {
+ configure_stripe_with([&](auto& builder) {
+ builder.allowStaleReadsDuringClusterStateTransitions = enabled;
+ });
+ }
+
void configure_update_fast_path_restart_enabled(bool enabled) {
- ConfigBuilder builder;
- builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
+ });
}
void configure_merge_operations_disabled(bool disabled) {
- ConfigBuilder builder;
- builder.mergeOperationsDisabled = disabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.mergeOperationsDisabled = disabled;
+ });
}
void configure_use_weak_internal_read_consistency(bool use_weak) {
- ConfigBuilder builder;
- builder.useWeakInternalReadConsistencyForClientGets = use_weak;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.useWeakInternalReadConsistencyForClientGets = use_weak;
+ });
}
void configure_metadata_update_phase_enabled(bool enabled) {
- ConfigBuilder builder;
- builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
+ });
}
void configure_prioritize_global_bucket_merges(bool enabled) {
- ConfigBuilder builder;
- builder.prioritizeGlobalBucketMerges = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.prioritizeGlobalBucketMerges = enabled;
+ });
}
void configure_max_activation_inhibited_out_of_sync_groups(uint32_t n_groups) {
- ConfigBuilder builder;
- builder.maxActivationInhibitedOutOfSyncGroups = n_groups;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.maxActivationInhibitedOutOfSyncGroups = n_groups;
+ });
}
void configure_implicitly_clear_priority_on_schedule(bool implicitly_clear) {
- ConfigBuilder builder;
- builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear;
+ });
}
void configure_use_unordered_merge_chaining(bool use_unordered) {
- ConfigBuilder builder;
- builder.useUnorderedMergeChaining = use_unordered;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.useUnorderedMergeChaining = use_unordered;
+ });
}
void configure_enable_two_phase_garbage_collection(bool use_two_phase) {
- ConfigBuilder builder;
- builder.enableTwoPhaseGarbageCollection = use_two_phase;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableTwoPhaseGarbageCollection = use_two_phase;
+ });
}
void configure_enable_condition_probing(bool enable_probing) {
- ConfigBuilder builder;
- builder.enableConditionProbing = enable_probing;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableConditionProbing = enable_probing;
+ });
+ }
+
+ void configure_enable_operation_cancellation(bool enable_cancellation) {
+ configure_stripe_with([&](auto& builder) {
+ builder.enableOperationCancellation = enable_cancellation;
+ });
}
- bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
+ [[nodiscard]] bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
+ [[nodiscard]] bool distributor_owns_bucket_in_current_and_pending_states(document::BucketId bucket_id) const {
+ return (getDistributorBucketSpace().get_bucket_ownership_flags(bucket_id).owned_in_pending_state() &&
+ getDistributorBucketSpace().check_ownership_in_pending_and_current_state(bucket_id).isOwned());
+ }
+
void configureMaxClusterClockSkew(int seconds);
void configure_mutation_sequencing(bool enabled);
void configure_merge_busy_inhibit_duration(int seconds);
void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled);
+ void simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending);
+ static std::shared_ptr<api::RemoveReply> make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd);
+
+ // TODO dedupe
+ auto sent_get_command(size_t idx) { return sent_command<api::GetCommand>(idx); }
+
+ auto make_get_reply(size_t idx, api::Timestamp ts, bool is_tombstone, bool condition_matched) {
+ return std::make_shared<api::GetReply>(*sent_get_command(idx), std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ void set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx);
+ void do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state);
+ void do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state);
};
DistributorStripeTest::DistributorStripeTest()
@@ -224,6 +257,34 @@ DistributorStripeTest::DistributorStripeTest()
DistributorStripeTest::~DistributorStripeTest() = default;
+void
+DistributorStripeTest::simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending)
+{
+ simulate_set_pending_cluster_state(state_str);
+ if (clear_pending) {
+ enable_cluster_state(state_str);
+ clear_pending_cluster_state_bundle();
+ }
+}
+
+std::shared_ptr<api::RemoveReply>
+DistributorStripeTest::make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd)
+{
+ auto& cmd_as_remove = dynamic_cast<api::RemoveCommand&>(originator_cmd);
+ auto reply = std::dynamic_pointer_cast<api::RemoveReply>(std::shared_ptr<api::StorageReply>(cmd_as_remove.makeReply()));
+ reply->setOldTimestamp(100);
+ // Including a bucket remapping as part of the response is a pragmatic way to avoid false
+ // negatives when testing whether cancelled operations may mutate the DB. This is because
+ // non-remapped buckets are not created in the DB if they are already removed (which will
+ // be the case after bucket pruning on a cluster state change), but remapped buckets _are_
+ // implicitly created upon insert.
+ // We expect the original bucket is 16 bits and fake a remap to a split bucket one level
+ // below the original bucket.
+ reply->remapBucketId(BucketId(17, (cmd_as_remove.getBucketId().getId() & 0xFFFF) | 0x10000));
+ reply->setBucketInfo(api::BucketInfo(0x1234, 2, 300));
+ return reply;
+}
+
TEST_F(DistributorStripeTest, operation_generation)
{
setup_stripe(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -455,8 +516,7 @@ TEST_F(DistributorStripeTest, no_db_resurrection_for_bucket_not_owned_in_pending
simulate_set_pending_cluster_state("storage:10 distributor:10");
document::BucketId nonOwnedBucket(16, 3);
- EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(nonOwnedBucket).owned_in_pending_state());
- EXPECT_FALSE(getDistributorBucketSpace().check_ownership_in_pending_and_current_state(nonOwnedBucket).isOwned());
+ ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(nonOwnedBucket));
std::vector<BucketCopy> copies;
copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2));
@@ -1052,4 +1112,168 @@ TEST_F(DistributorStripeTest, enable_condition_probing_config_is_propagated_to_i
EXPECT_TRUE(getConfig().enable_condition_probing());
}
+TEST_F(DistributorStripeTest, enable_operation_cancellation_config_is_propagated_to_internal_config) {
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ EXPECT_FALSE(getConfig().enable_operation_cancellation()); // TODO switch default once ready
+
+ configure_enable_operation_cancellation(false);
+ EXPECT_FALSE(getConfig().enable_operation_cancellation());
+
+ configure_enable_operation_cancellation(true);
+ EXPECT_TRUE(getConfig().enable_operation_cancellation());
+}
+
+TEST_F(DistributorStripeTest, cluster_state_node_down_edge_cancels_pending_operations_on_unavailable_nodes) {
+ setup_stripe(Redundancy(1), NodeCount(1), "version:1 distributor:1 storage:1");
+ configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled
+ addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t");
+
+ stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1}
+ ASSERT_EQ(_sender.getCommands(true), "Remove => 0");
+
+ // Oh no, node 0 goes down while we have a pending operation!
+ simulate_cluster_state_transition("version:2 distributor:1 storage:1 .0.s:d", true);
+ EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(16, 1))); // Implicitly cleared
+
+ auto reply = make_remove_reply_with_bucket_remap(*_sender.command(0));
+ // Before we receive the reply, node 0 is back online. Even though the node is available in the
+ // cluster state, we should not apply its bucket info to our DB, as it may represent stale
+ // information (it's possible that our node froze up and that another distributor took over and
+ // mutated the bucket in the meantime; a classic ABA scenario).
+ simulate_cluster_state_transition("version:5 distributor:1 storage:1", true);
+
+ _stripe->handleReply(std::move(reply));
+ EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(17, 0x10001)));
+}
+
+TEST_F(DistributorStripeTest, distribution_config_change_edge_cancels_pending_operations_on_unavailable_nodes) {
+ setup_stripe(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+ configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled
+ addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t,1=3/4/5/t");
+
+ stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1}
+ ASSERT_EQ(_sender.getCommands(true), "Remove => 0,Remove => 1");
+
+ // Node 1 is configured away; only node 0 remains. This is expected to be closely followed by
+ // (or--depending on the timing of operations in the cluster--preceded by) a cluster state with
+ // the node marked as down, but the ordering is not guaranteed.
+ auto new_config = make_default_distribution_config(1, 1);
+ simulate_distribution_config_change(std::move(new_config));
+
+ auto node_0_reply = make_remove_reply_with_bucket_remap(*_sender.command(0));
+ auto node_1_reply = make_remove_reply_with_bucket_remap(*_sender.command(1));
+
+ _stripe->handleReply(std::move(node_0_reply));
+ _stripe->handleReply(std::move(node_1_reply));
+
+ // Only node 0 should be present in the DB
+ EXPECT_EQ("BucketId(0x4000000000000001) : " // Original bucket
+ "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=true,active=false,ready=false)",
+ dumpBucket(BucketId(16, 1)));
+ EXPECT_EQ("BucketId(0x4400000000010001) : " // Remapped bucket
+ "node(idx=0,crc=0x1234,docs=2/2,bytes=300/300,trusted=true,active=false,ready=false)",
+ dumpBucket(BucketId(17, 0x10001)));
+}
+
+void DistributorStripeTest::set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx) {
+ setup_stripe(Redundancy(1), NodeCount(10), "version:1 distributor:2 storage:2");
+ configure_stripe_with([](auto& builder) {
+ builder.enableConditionProbing = true;
+ builder.enableOperationCancellation = true;
+ });
+
+ NodeSupportedFeatures features;
+ features.document_condition_probe = true;
+ set_node_supported_features(0, features);
+ set_node_supported_features(1, features);
+
+ // Note: replicas are intentionally out of sync to trigger a write-repair.
+ addNodesToBucketDB(BucketId(16, superbucket_idx), "0=3/4/5,1=4/5/6");
+}
+
+namespace {
+
+std::shared_ptr<api::RemoveCommand> make_conditional_remove_request(uint32_t superbucket_idx) {
+ auto client_remove = std::make_shared<api::RemoveCommand>(
+ makeDocumentBucket(document::BucketId(0)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%u:foo", superbucket_idx)),
+ api::Timestamp(0));
+ client_remove->setCondition(documentapi::TestAndSetCondition("foo.bar==baz"));
+ return client_remove;
+}
+
+}
+
+void DistributorStripeTest::do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state) {
+ constexpr uint32_t superbucket_idx = 3;
+ const BucketId bucket_id(16, superbucket_idx);
+ set_up_for_bucket_ownership_cancellation(superbucket_idx);
+ // To actually check if cancellation is happening, we need to trigger a code path that
+ // is only covered by cancellation and not the legacy "check buckets at DB insert time"
+ // logic. The latter would give a false negative.
+ stripe_handle_message(make_conditional_remove_request(superbucket_idx));
+ ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1"); // Condition probes, thunder cats go!
+
+ simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state);
+ ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(bucket_id));
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // Should have been pruned
+
+ _stripe->handleReply(make_get_reply(0, 100, false, true));
+ _stripe->handleReply(make_get_reply(1, 100, false, true));
+
+ // Condition probe was successful, but operation is cancelled and shall not continue.
+ ASSERT_EQ(_sender.getCommands(true, false, 2), "");
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:foo:testdoctype1:n=3:foo, timestamp 1, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // And definitely no resurrection
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_pending_case) {
+ do_test_cancelled_pending_op_with_bucket_ownership_change(false);
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_not_pending_case) {
+ do_test_cancelled_pending_op_with_bucket_ownership_change(true);
+}
+
+void DistributorStripeTest::do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state) {
+ constexpr uint32_t superbucket_idx = 14;
+ const BucketId bucket_id(16, superbucket_idx);
+ set_up_for_bucket_ownership_cancellation(superbucket_idx);
+
+ stripe_handle_message(make_conditional_remove_request(superbucket_idx));
+ ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1");
+
+ simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state);
+ ASSERT_TRUE(distributor_owns_bucket_in_current_and_pending_states(bucket_id));
+ EXPECT_EQ("BucketId(0x400000000000000e) : "
+ "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x4,docs=5/5,bytes=6/6,trusted=false,active=false,ready=false)",
+ dumpBucket(bucket_id)); // Should _not_ have been pruned
+
+ _stripe->handleReply(make_get_reply(0, 100, false, true));
+ _stripe->handleReply(make_get_reply(1, 100, false, true));
+
+ // Operation can proceed as planned as it has not been cancelled
+ ASSERT_EQ(_sender.getCommands(true, false, 2), "Remove => 0,Remove => 1");
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_pending_case) {
+ do_test_not_cancelled_pending_op_without_bucket_ownership_change(false);
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_not_pending_case) {
+ do_test_not_cancelled_pending_op_without_bucket_ownership_change(true);
+}
+
+// TODO we do not have good handling of bucket ownership changes combined with
+// distribution config changes... Hard to remove all such edge cases unless we
+// make state+config change an atomic operation initiated by the cluster controller
+// (hint: we should do this).
+
+
}