summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-09-08 14:25:56 +0200
committerGitHub <noreply@github.com>2023-09-08 14:25:56 +0200
commit97bc82e63d2dd0b008bcccea9acd7b78ffe2a6ce (patch)
treee5dd49807a6d33b3355a5da5302fce20fbb17990
parente6919f1aaad79b2ee19448735e86214ff91bca6d (diff)
parent2aa0216ae69d33847a2e6db79722d19b49ff5744 (diff)
Merge pull request #28441 from vespa-engine/vekterli/expand-operation-cancellation-support
Wire distributor operation cancelling to cluster state/config change edges
-rw-r--r--storage/src/tests/distributor/bucketstateoperationtest.cpp23
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp298
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp30
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h3
-rw-r--r--storage/src/tests/distributor/joinbuckettest.cpp28
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp24
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp51
-rw-r--r--storage/src/tests/distributor/removelocationtest.cpp2
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp64
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp91
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h7
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def5
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp41
-rw-r--r--storage/src/vespa/storage/distributor/bucket_ownership_calculator.h44
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp115
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_interface.h7
-rw-r--r--storage/src/vespa/storage/distributor/messagetracker.h2
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h16
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h14
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h21
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h9
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp69
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h72
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h5
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp72
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h22
48 files changed, 976 insertions, 327 deletions
diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp
index c9fab0b37e5..44da88b4587 100644
--- a/storage/src/tests/distributor/bucketstateoperationtest.cpp
+++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp
@@ -201,4 +201,27 @@ TEST_F(BucketStateOperationTest, bucket_db_not_updated_on_failure) {
EXPECT_FALSE(op.ok());
}
+TEST_F(BucketStateOperationTest, cancelled_node_does_not_update_bucket_db) {
+ document::BucketId bid(16, 1);
+ insertBucketInfo(bid, 0, 0xabc, 10, 1100, true, false);
+
+ BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0));
+ std::vector<uint16_t> active = {0};
+ SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
+
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+ op.cancel(_sender, CancelScope::of_node_subset({0}));
+
+ ASSERT_EQ(_sender.commands().size(), 1);
+ std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ op.receive(_sender, reply);
+
+ BucketDatabase::Entry entry = getBucket(bid);
+ ASSERT_TRUE(entry.valid());
+ EXPECT_FALSE(entry->getNodeRef(0).active()); // Should not be updated
+ EXPECT_FALSE(op.ok());
+}
+
} // namespace storage::distributor
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 566fb704105..c87f5133997 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -14,6 +14,7 @@
#include <vespa/storageapi/message/visitor.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <gmock/gmock.h>
using document::Bucket;
@@ -47,8 +48,8 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
// Simple type aliases to make interfacing with certain utility functions
// easier. Note that this is only for readability and does not provide any
// added type safety.
- using NodeCount = int;
- using Redundancy = int;
+ using NodeCount = uint16_t;
+ using Redundancy = uint16_t;
using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
@@ -137,82 +138,114 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
return _stripe->handleMessage(msg);
}
- void configure_stale_reads_enabled(bool enabled) {
+ template <typename Func>
+ void configure_stripe_with(Func f) {
ConfigBuilder builder;
- builder.allowStaleReadsDuringClusterStateTransitions = enabled;
+ f(builder);
configure_stripe(builder);
}
+ void configure_stale_reads_enabled(bool enabled) {
+ configure_stripe_with([&](auto& builder) {
+ builder.allowStaleReadsDuringClusterStateTransitions = enabled;
+ });
+ }
+
void configure_update_fast_path_restart_enabled(bool enabled) {
- ConfigBuilder builder;
- builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
+ });
}
void configure_merge_operations_disabled(bool disabled) {
- ConfigBuilder builder;
- builder.mergeOperationsDisabled = disabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.mergeOperationsDisabled = disabled;
+ });
}
void configure_use_weak_internal_read_consistency(bool use_weak) {
- ConfigBuilder builder;
- builder.useWeakInternalReadConsistencyForClientGets = use_weak;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.useWeakInternalReadConsistencyForClientGets = use_weak;
+ });
}
void configure_metadata_update_phase_enabled(bool enabled) {
- ConfigBuilder builder;
- builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
+ });
}
void configure_prioritize_global_bucket_merges(bool enabled) {
- ConfigBuilder builder;
- builder.prioritizeGlobalBucketMerges = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.prioritizeGlobalBucketMerges = enabled;
+ });
}
void configure_max_activation_inhibited_out_of_sync_groups(uint32_t n_groups) {
- ConfigBuilder builder;
- builder.maxActivationInhibitedOutOfSyncGroups = n_groups;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.maxActivationInhibitedOutOfSyncGroups = n_groups;
+ });
}
void configure_implicitly_clear_priority_on_schedule(bool implicitly_clear) {
- ConfigBuilder builder;
- builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear;
+ });
}
void configure_use_unordered_merge_chaining(bool use_unordered) {
- ConfigBuilder builder;
- builder.useUnorderedMergeChaining = use_unordered;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.useUnorderedMergeChaining = use_unordered;
+ });
}
void configure_enable_two_phase_garbage_collection(bool use_two_phase) {
- ConfigBuilder builder;
- builder.enableTwoPhaseGarbageCollection = use_two_phase;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableTwoPhaseGarbageCollection = use_two_phase;
+ });
}
void configure_enable_condition_probing(bool enable_probing) {
- ConfigBuilder builder;
- builder.enableConditionProbing = enable_probing;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableConditionProbing = enable_probing;
+ });
+ }
+
+ void configure_enable_operation_cancellation(bool enable_cancellation) {
+ configure_stripe_with([&](auto& builder) {
+ builder.enableOperationCancellation = enable_cancellation;
+ });
}
- bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
+ [[nodiscard]] bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
+ [[nodiscard]] bool distributor_owns_bucket_in_current_and_pending_states(document::BucketId bucket_id) const {
+ return (getDistributorBucketSpace().get_bucket_ownership_flags(bucket_id).owned_in_pending_state() &&
+ getDistributorBucketSpace().check_ownership_in_pending_and_current_state(bucket_id).isOwned());
+ }
+
void configureMaxClusterClockSkew(int seconds);
void configure_mutation_sequencing(bool enabled);
void configure_merge_busy_inhibit_duration(int seconds);
void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled);
+ void simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending);
+ static std::shared_ptr<api::RemoveReply> make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd);
+
+ // TODO dedupe
+ auto sent_get_command(size_t idx) { return sent_command<api::GetCommand>(idx); }
+
+ auto make_get_reply(size_t idx, api::Timestamp ts, bool is_tombstone, bool condition_matched) {
+ return std::make_shared<api::GetReply>(*sent_get_command(idx), std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ void set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx);
+ void do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state);
+ void do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state);
};
DistributorStripeTest::DistributorStripeTest()
@@ -224,6 +257,34 @@ DistributorStripeTest::DistributorStripeTest()
DistributorStripeTest::~DistributorStripeTest() = default;
+void
+DistributorStripeTest::simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending)
+{
+ simulate_set_pending_cluster_state(state_str);
+ if (clear_pending) {
+ enable_cluster_state(state_str);
+ clear_pending_cluster_state_bundle();
+ }
+}
+
+std::shared_ptr<api::RemoveReply>
+DistributorStripeTest::make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd)
+{
+ auto& cmd_as_remove = dynamic_cast<api::RemoveCommand&>(originator_cmd);
+ auto reply = std::dynamic_pointer_cast<api::RemoveReply>(std::shared_ptr<api::StorageReply>(cmd_as_remove.makeReply()));
+ reply->setOldTimestamp(100);
+ // Including a bucket remapping as part of the response is a pragmatic way to avoid false
+ // negatives when testing whether cancelled operations may mutate the DB. This is because
+ // non-remapped buckets are not created in the DB if they are already removed (which will
+ // be the case after bucket pruning on a cluster state change), but remapped buckets _are_
+ // implicitly created upon insert.
+ // We expect the original bucket is 16 bits and fake a remap to a split bucket one level
+ // below the original bucket.
+ reply->remapBucketId(BucketId(17, (cmd_as_remove.getBucketId().getId() & 0xFFFF) | 0x10000));
+ reply->setBucketInfo(api::BucketInfo(0x1234, 2, 300));
+ return reply;
+}
+
TEST_F(DistributorStripeTest, operation_generation)
{
setup_stripe(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -455,8 +516,7 @@ TEST_F(DistributorStripeTest, no_db_resurrection_for_bucket_not_owned_in_pending
simulate_set_pending_cluster_state("storage:10 distributor:10");
document::BucketId nonOwnedBucket(16, 3);
- EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(nonOwnedBucket).owned_in_pending_state());
- EXPECT_FALSE(getDistributorBucketSpace().check_ownership_in_pending_and_current_state(nonOwnedBucket).isOwned());
+ ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(nonOwnedBucket));
std::vector<BucketCopy> copies;
copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2));
@@ -1052,4 +1112,168 @@ TEST_F(DistributorStripeTest, enable_condition_probing_config_is_propagated_to_i
EXPECT_TRUE(getConfig().enable_condition_probing());
}
+TEST_F(DistributorStripeTest, enable_operation_cancellation_config_is_propagated_to_internal_config) {
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ EXPECT_FALSE(getConfig().enable_operation_cancellation()); // TODO switch default once ready
+
+ configure_enable_operation_cancellation(false);
+ EXPECT_FALSE(getConfig().enable_operation_cancellation());
+
+ configure_enable_operation_cancellation(true);
+ EXPECT_TRUE(getConfig().enable_operation_cancellation());
+}
+
+TEST_F(DistributorStripeTest, cluster_state_node_down_edge_cancels_pending_operations_on_unavailable_nodes) {
+ setup_stripe(Redundancy(1), NodeCount(1), "version:1 distributor:1 storage:1");
+ configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled
+ addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t");
+
+ stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1}
+ ASSERT_EQ(_sender.getCommands(true), "Remove => 0");
+
+ // Oh no, node 0 goes down while we have a pending operation!
+ simulate_cluster_state_transition("version:2 distributor:1 storage:1 .0.s:d", true);
+ EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(16, 1))); // Implicitly cleared
+
+ auto reply = make_remove_reply_with_bucket_remap(*_sender.command(0));
+ // Before we receive the reply, node 0 is back online. Even though the node is available in the
+ // cluster state, we should not apply its bucket info to our DB, as it may represent stale
+ // information (it's possible that our node froze up and that another distributor took over and
+ // mutated the bucket in the meantime; a classic ABA scenario).
+ simulate_cluster_state_transition("version:5 distributor:1 storage:1", true);
+
+ _stripe->handleReply(std::move(reply));
+ EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(17, 0x10001)));
+}
+
+TEST_F(DistributorStripeTest, distribution_config_change_edge_cancels_pending_operations_on_unavailable_nodes) {
+ setup_stripe(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+ configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled
+ addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t,1=3/4/5/t");
+
+ stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1}
+ ASSERT_EQ(_sender.getCommands(true), "Remove => 0,Remove => 1");
+
+ // Node 1 is configured away; only node 0 remains. This is expected to be closely followed by
+ // (or--depending on the timing of operations in the cluster--preceded by) a cluster state with
+ // the node marked as down, but the ordering is not guaranteed.
+ auto new_config = make_default_distribution_config(1, 1);
+ simulate_distribution_config_change(std::move(new_config));
+
+ auto node_0_reply = make_remove_reply_with_bucket_remap(*_sender.command(0));
+ auto node_1_reply = make_remove_reply_with_bucket_remap(*_sender.command(1));
+
+ _stripe->handleReply(std::move(node_0_reply));
+ _stripe->handleReply(std::move(node_1_reply));
+
+ // Only node 0 should be present in the DB
+ EXPECT_EQ("BucketId(0x4000000000000001) : " // Original bucket
+ "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=true,active=false,ready=false)",
+ dumpBucket(BucketId(16, 1)));
+ EXPECT_EQ("BucketId(0x4400000000010001) : " // Remapped bucket
+ "node(idx=0,crc=0x1234,docs=2/2,bytes=300/300,trusted=true,active=false,ready=false)",
+ dumpBucket(BucketId(17, 0x10001)));
+}
+
+void DistributorStripeTest::set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx) {
+ setup_stripe(Redundancy(1), NodeCount(10), "version:1 distributor:2 storage:2");
+ configure_stripe_with([](auto& builder) {
+ builder.enableConditionProbing = true;
+ builder.enableOperationCancellation = true;
+ });
+
+ NodeSupportedFeatures features;
+ features.document_condition_probe = true;
+ set_node_supported_features(0, features);
+ set_node_supported_features(1, features);
+
+ // Note: replicas are intentionally out of sync to trigger a write-repair.
+ addNodesToBucketDB(BucketId(16, superbucket_idx), "0=3/4/5,1=4/5/6");
+}
+
+namespace {
+
+std::shared_ptr<api::RemoveCommand> make_conditional_remove_request(uint32_t superbucket_idx) {
+ auto client_remove = std::make_shared<api::RemoveCommand>(
+ makeDocumentBucket(document::BucketId(0)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%u:foo", superbucket_idx)),
+ api::Timestamp(0));
+ client_remove->setCondition(documentapi::TestAndSetCondition("foo.bar==baz"));
+ return client_remove;
+}
+
+}
+
+void DistributorStripeTest::do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state) {
+ constexpr uint32_t superbucket_idx = 3;
+ const BucketId bucket_id(16, superbucket_idx);
+ set_up_for_bucket_ownership_cancellation(superbucket_idx);
+ // To actually check if cancellation is happening, we need to trigger a code path that
+ // is only covered by cancellation and not the legacy "check buckets at DB insert time"
+ // logic. The latter would give a false negative.
+ stripe_handle_message(make_conditional_remove_request(superbucket_idx));
+ ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1"); // Condition probes, thunder cats go!
+
+ simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state);
+ ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(bucket_id));
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // Should have been pruned
+
+ _stripe->handleReply(make_get_reply(0, 100, false, true));
+ _stripe->handleReply(make_get_reply(1, 100, false, true));
+
+ // Condition probe was successful, but operation is cancelled and shall not continue.
+ ASSERT_EQ(_sender.getCommands(true, false, 2), "");
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:foo:testdoctype1:n=3:foo, timestamp 1, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // And definitely no resurrection
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_pending_case) {
+ do_test_cancelled_pending_op_with_bucket_ownership_change(false);
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_not_pending_case) {
+ do_test_cancelled_pending_op_with_bucket_ownership_change(true);
+}
+
+void DistributorStripeTest::do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state) {
+ constexpr uint32_t superbucket_idx = 14;
+ const BucketId bucket_id(16, superbucket_idx);
+ set_up_for_bucket_ownership_cancellation(superbucket_idx);
+
+ stripe_handle_message(make_conditional_remove_request(superbucket_idx));
+ ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1");
+
+ simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state);
+ ASSERT_TRUE(distributor_owns_bucket_in_current_and_pending_states(bucket_id));
+ EXPECT_EQ("BucketId(0x400000000000000e) : "
+ "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x4,docs=5/5,bytes=6/6,trusted=false,active=false,ready=false)",
+ dumpBucket(bucket_id)); // Should _not_ have been pruned
+
+ _stripe->handleReply(make_get_reply(0, 100, false, true));
+ _stripe->handleReply(make_get_reply(1, 100, false, true));
+
+ // Operation can proceed as planned as it has not been cancelled
+ ASSERT_EQ(_sender.getCommands(true, false, 2), "Remove => 0,Remove => 1");
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_pending_case) {
+ do_test_not_cancelled_pending_op_without_bucket_ownership_change(false);
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_not_pending_case) {
+ do_test_not_cancelled_pending_op_without_bucket_ownership_change(true);
+}
+
+// TODO we do not have good handling of bucket ownership changes combined with
+// distribution config changes... Hard to remove all such edge cases unless we
+// make state+config change an atomic operation initiated by the cluster controller
+// (hint: we should do this).
+
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index 5babde49380..ba6c4cb4ac4 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -72,7 +72,7 @@ DistributorStripeTestUtil::setup_stripe(int redundancy, int node_count, const li
// explicitly (which is what happens in "real life"), that is what would
// take place.
// The inverse case of this can be explicitly accomplished by calling
- // triggerDistributionChange().
+ // trigger_distribution_change().
// This isn't pretty, folks, but it avoids breaking the world for now,
// as many tests have implicit assumptions about this being the behavior.
auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
@@ -93,10 +93,31 @@ void
DistributorStripeTestUtil::trigger_distribution_change(lib::Distribution::SP distr)
{
_node->getComponentRegister().setDistribution(distr);
- auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(distr);
+ auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distr));
_stripe->update_distribution_config(new_config);
}
+void
+DistributorStripeTestUtil::simulate_distribution_config_change(std::shared_ptr<lib::Distribution> new_config)
+{
+ trigger_distribution_change(std::move(new_config));
+ _stripe->notifyDistributionChangeEnabled();
+ for (auto& space : _stripe->getBucketSpaceRepo()) {
+ auto cur_state = current_cluster_state_bundle().getDerivedClusterState(space.first); // no change in state itself
+ _stripe->remove_superfluous_buckets(space.first, *cur_state, true);
+ }
+}
+
+std::shared_ptr<lib::Distribution>
+DistributorStripeTestUtil::make_default_distribution_config(uint16_t redundancy, uint16_t node_count)
+{
+ lib::Distribution::DistributionConfigBuilder config(lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get());
+ config.redundancy = redundancy;
+ config.initialRedundancy = redundancy;
+ config.ensurePrimaryPersisted = true;
+ return std::make_shared<lib::Distribution>(config);
+}
+
std::shared_ptr<DistributorConfiguration>
DistributorStripeTestUtil::make_config() const
{
@@ -416,6 +437,11 @@ DistributorStripeTestUtil::getDistributorBucketSpace() {
return getBucketSpaceRepo().get(makeBucketSpace());
}
+const DistributorBucketSpace&
+DistributorStripeTestUtil::getDistributorBucketSpace() const {
+ return getBucketSpaceRepo().get(makeBucketSpace());
+}
+
BucketDatabase&
DistributorStripeTestUtil::getBucketDatabase() {
return getDistributorBucketSpace().getBucketDatabase();
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index 272301bf4a6..2892cec6fcf 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -138,6 +138,7 @@ public:
// TODO explicit notion of bucket spaces for tests
DistributorBucketSpace& getDistributorBucketSpace();
+ const DistributorBucketSpace& getDistributorBucketSpace() const;
BucketDatabase& getBucketDatabase(); // Implicit default space only
BucketDatabase& getBucketDatabase(document::BucketSpace space);
const BucketDatabase& getBucketDatabase() const; // Implicit default space only
@@ -175,6 +176,8 @@ public:
void set_redundancy(uint32_t redundancy);
void trigger_distribution_change(std::shared_ptr<lib::Distribution> distr);
+ void simulate_distribution_config_change(std::shared_ptr<lib::Distribution> new_config);
+ static std::shared_ptr<lib::Distribution> make_default_distribution_config(uint16_t redundancy, uint16_t node_count);
using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp
index 570fe24679e..ae389ecd6c2 100644
--- a/storage/src/tests/distributor/joinbuckettest.cpp
+++ b/storage/src/tests/distributor/joinbuckettest.cpp
@@ -109,4 +109,32 @@ TEST_F(JoinOperationTest, send_sparse_joins_to_nodes_without_both_source_buckets
ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}}));
}
+TEST_F(JoinOperationTest, cancelled_node_does_not_update_bucket_db) {
+ auto cfg = make_config();
+ cfg->setJoinCount(100);
+ cfg->setJoinSize(1000);
+ configure_stripe(cfg);
+
+ addNodesToBucketDB(document::BucketId(33, 1), "0=250/50/300");
+ addNodesToBucketDB(document::BucketId(33, 0x100000001), "0=300/40/200");
+ enable_cluster_state("distributor:1 storage:1");
+
+ JoinOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(32, 0)), toVector<uint16_t>(0)),
+ {document::BucketId(33, 1), document::BucketId(33, 0x100000001)});
+
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+
+ op.cancel(_sender, CancelScope::of_node_subset({0}));
+
+ checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}});
+
+ // DB is not touched, so source buckets remain unchanged and target buckets is not created
+ EXPECT_TRUE(getBucket(document::BucketId(33, 0x100000001)).valid());
+ EXPECT_TRUE(getBucket(document::BucketId(33, 1)).valid());
+ EXPECT_FALSE(getBucket(document::BucketId(32, 0)).valid());
+ EXPECT_FALSE(op.ok());
+}
+
}
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 512c092d8ae..12280980998 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -548,8 +548,7 @@ TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info
EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
}
-TEST_F(MergeOperationTest, on_blocked_updates_metrics)
-{
+TEST_F(MergeOperationTest, on_blocked_updates_metrics) {
auto op = setup_minimal_merge_op();
auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET];
EXPECT_EQ(0, metrics->blocked.getValue());
@@ -557,8 +556,7 @@ TEST_F(MergeOperationTest, on_blocked_updates_metrics)
EXPECT_EQ(1, metrics->blocked.getValue());
}
-TEST_F(MergeOperationTest, on_throttled_updates_metrics)
-{
+TEST_F(MergeOperationTest, on_throttled_updates_metrics) {
auto op = setup_minimal_merge_op();
auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET];
EXPECT_EQ(0, metrics->throttled.getValue());
@@ -628,4 +626,22 @@ TEST_F(MergeOperationTest, delete_bucket_priority_is_capped_to_feed_pri_120) {
EXPECT_EQ(int(del_cmd->getPriority()), 120);
}
+TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_fully_cancelled) {
+ auto op = setup_simple_merge_op();
+ ASSERT_NO_FATAL_FAILURE(assert_simple_merge_bucket_command());
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ sendReply(*op);
+ EXPECT_EQ(_sender.getCommands(true, false, 1), ""); // nothing more
+ EXPECT_FALSE(op->ok());
+}
+
+TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) {
+ auto op = setup_simple_merge_op(); // to nodes, 0, 2, 1 (source only)
+ ASSERT_NO_FATAL_FAILURE(assert_simple_merge_bucket_command());
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+ sendReply(*op);
+ EXPECT_EQ(_sender.getCommands(true, false, 1), ""); // nothing more
+ EXPECT_FALSE(op->ok());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index 68d86884036..b9d1d804761 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -24,6 +24,14 @@ struct RemoveBucketOperationTest : Test, DistributorStripeTestUtil {
void TearDown() override {
close();
}
+
+ void reject_with_bucket_info(RemoveBucketOperation& op, size_t msg_index) {
+ std::shared_ptr<api::StorageCommand> msg2 = _sender.command(msg_index);
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
+ dynamic_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(api::BucketInfo(10, 200, 1));
+ reply->setResult(api::ReturnCode::REJECTED);
+ op.receive(_sender, reply);
+ }
};
TEST_F(RemoveBucketOperationTest, simple) {
@@ -36,11 +44,10 @@ TEST_F(RemoveBucketOperationTest, simple) {
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
- toVector<uint16_t>(1,2)));
+ toVector<uint16_t>(1, 2)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender);
-
ASSERT_EQ("Delete bucket => 1,"
"Delete bucket => 2",
_sender.getCommands(true));
@@ -75,16 +82,11 @@ TEST_F(RemoveBucketOperationTest, bucket_info_mismatch_failure) {
ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true));
ASSERT_EQ(1, _sender.commands().size());
- std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
- std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
- dynamic_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(
- api::BucketInfo(10, 100, 1));
- reply->setResult(api::ReturnCode::REJECTED);
- op.receive(_sender, reply);
+ reject_with_bucket_info(op, 0);
// RemoveBucketOperation should reinsert bucketinfo into database
ASSERT_EQ("BucketId(0x4000000000000001) : "
- "node(idx=1,crc=0xa,docs=100/100,bytes=1/1,trusted=true,active=false,ready=false)",
+ "node(idx=1,crc=0xa,docs=200/200,bytes=1/1,trusted=true,active=false,ready=false)",
dumpBucket(document::BucketId(16, 1)));
}
@@ -130,4 +132,35 @@ TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_targ
EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120));
}
+TEST_F(RemoveBucketOperationTest, cancelled_node_does_not_update_bucket_db_upon_rejection) {
+ addNodesToBucketDB(document::BucketId(16, 1),
+ "0=10/100/1/t,"
+ "1=10/100/1/t,"
+ "2=10/100/1/t");
+ set_redundancy(1);
+ enable_cluster_state("distributor:1 storage:3");
+
+ RemoveBucketOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ toVector<uint16_t>(1,2)));
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+
+ ASSERT_EQ("Delete bucket => 1,"
+ "Delete bucket => 2",
+ _sender.getCommands(true));
+
+ op.cancel(_sender, CancelScope::of_node_subset({1}));
+
+ // Rejections will by default reinsert the bucket into the DB with the bucket info contained
+ // in the reply, but here the node is cancelled and should therefore not be reinserted.
+ reject_with_bucket_info(op, 0);
+ sendReply(op, 1);
+ // Node 1 not reinserted
+ ASSERT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1/1,trusted=true,active=false,ready=false)",
+ dumpBucket(document::BucketId(16, 1)));
+ EXPECT_FALSE(op.ok());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp
index b19a448199b..be688e64a8b 100644
--- a/storage/src/tests/distributor/removelocationtest.cpp
+++ b/storage/src/tests/distributor/removelocationtest.cpp
@@ -67,4 +67,6 @@ TEST_F(RemoveLocationOperationTest, simple) {
_sender.getLastReply());
}
+// TODO test cancellation (implicitly covered via operation PersistenceMessageTracker)
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index edb392d9532..05a13f67bc9 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -48,14 +48,14 @@ SplitOperationTest::SplitOperationTest()
}
namespace {
- api::StorageMessageAddress _Storage0Address(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 0);
+ api::StorageMessageAddress _storage0Address(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 0);
}
TEST_F(SplitOperationTest, simple) {
enable_cluster_state("distributor:1 storage:1");
insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000,
- tooLargeBucketSize, 250);
+ tooLargeBucketSize, true);
SplitOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
@@ -72,10 +72,10 @@ TEST_F(SplitOperationTest, simple) {
std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET);
- EXPECT_EQ(_Storage0Address.toString(),
+ EXPECT_EQ(_storage0Address.toString(),
msg->getAddress()->toString());
- std::shared_ptr<api::StorageReply> reply(msg->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
auto* sreply = static_cast<api::SplitBucketReply*>(reply.get());
sreply->getSplitInfo().emplace_back(document::BucketId(17, 1),
@@ -142,19 +142,15 @@ TEST_F(SplitOperationTest, multi_node_failure) {
{
std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET);
- EXPECT_EQ(_Storage0Address.toString(),
- msg->getAddress()->toString());
+ EXPECT_EQ(_storage0Address.toString(), msg->getAddress()->toString());
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ auto& sreply = dynamic_cast<api::SplitBucketReply&>(*reply);
- auto* sreply = static_cast<api::SplitBucketReply*>(msg->makeReply().release());
- sreply->setResult(api::ReturnCode::OK);
+ sreply.setResult(api::ReturnCode::OK);
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000));
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000));
- sreply->getSplitInfo().emplace_back(document::BucketId(17, 1),
- api::BucketInfo(100, 600, 5000000));
-
- sreply->getSplitInfo().emplace_back(document::BucketId(17, 0x10001),
- api::BucketInfo(110, 400, 6000000));
-
- op.receive(_sender, std::shared_ptr<api::StorageReply>(sreply));
+ op.receive(_sender, reply);
}
sendReply(op, 1, api::ReturnCode::NOT_CONNECTED);
@@ -230,7 +226,7 @@ TEST_F(SplitOperationTest, copy_trusted_status_not_carried_over_after_split) {
for (int i = 0; i < 2; ++i) {
std::shared_ptr<api::StorageCommand> msg = _sender.command(i);
ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET);
- std::shared_ptr<api::StorageReply> reply(msg->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
auto* sreply = static_cast<api::SplitBucketReply*>(reply.get());
// Make sure copies differ so they cannot become implicitly trusted.
@@ -271,7 +267,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
};
auto joinCmd = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(joinTarget));
joinCmd->getSourceBuckets() = joinSources;
- joinCmd->setAddress(_Storage0Address);
+ joinCmd->setAddress(_storage0Address);
pending_message_tracker().insert(joinCmd);
@@ -307,7 +303,7 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
enable_cluster_state("distributor:1 storage:2");
document::BucketId source_bucket(16, 1);
- insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, 250);
+ insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, true);
SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)),
maxSplitBits, splitCount, splitByteSize);
@@ -318,4 +314,36 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
+TEST_F(SplitOperationTest, cancelled_node_does_not_update_bucket_db) {
+ enable_cluster_state("distributor:1 storage:1");
+ insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000, tooLargeBucketSize, true);
+
+ SplitOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0)),
+ maxSplitBits, splitCount, splitByteSize);
+
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+
+ op.cancel(_sender, CancelScope::of_node_subset({0}));
+
+ {
+ ASSERT_EQ(_sender.commands().size(), 1);
+ std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ auto& sreply = dynamic_cast<api::SplitBucketReply&>(*reply);
+
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000));
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000));
+ op.receive(_sender, reply);
+ }
+
+ // DB is not touched, so source bucket remains (will be removed during actual operation)
+ // while target buckets are not created
+ EXPECT_TRUE(getBucket(document::BucketId(16, 1)).valid());
+ EXPECT_FALSE(getBucket(document::BucketId(17, 0x00001)).valid());
+ EXPECT_FALSE(getBucket(document::BucketId(17, 0x10001)).valid());
+ EXPECT_FALSE(op.ok());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index d3af4cd564a..f3aa9c2eb92 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -1056,7 +1056,32 @@ TEST_F(TopLevelBucketDBUpdaterTest, recheck_node) {
const BucketCopy* copy = entry->getNode(1);
ASSERT_TRUE(copy != nullptr);
- EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo());
+ EXPECT_EQ(api::BucketInfo(20, 10, 12, 50, 60, true, true), copy->getBucketInfo());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, cancelled_pending_recheck_command_does_not_update_db) {
+ ASSERT_NO_FATAL_FAILURE(initialize_nodes_and_buckets(3, 5));
+ _sender.clear();
+
+ auto bucket = makeDocumentBucket(document::BucketId(16, 3));
+ auto& stripe_bucket_db_updater = stripe_of_bucket(bucket.getBucketId()).bucket_db_updater();
+ stripe_bucket_db_updater.recheckBucketInfo(0, bucket);
+
+ ASSERT_EQ(_sender.getCommands(true), "Request bucket info => 0");
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0));
+
+ ASSERT_TRUE(stripe_bucket_db_updater.cancel_message_by_id(req.getMsgId()));
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(req);
+ reply->getBucketInfo().emplace_back(document::BucketId(16, 3), api::BucketInfo(20, 10, 12, 50, 60, true, true));
+ stripe_bucket_db_updater.onRequestBucketInfoReply(reply);
+
+ BucketDatabase::Entry entry = get_bucket(bucket);
+ ASSERT_TRUE(entry.valid());
+ const BucketCopy* copy = entry->getNode(0);
+ ASSERT_TRUE(copy != nullptr);
+ // Existing bucket info not modified by reply (0xa ... is the initialized test state).
+ EXPECT_EQ(api::BucketInfo(0xa, 1, 1, 1, 1, false, false), copy->getBucketInfo());
}
TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) {
@@ -1217,11 +1242,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) {
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- std::vector<api::MergeBucketCommand::Node> nodes;
- nodes.emplace_back(0);
- nodes.emplace_back(1);
- nodes.emplace_back(2);
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
@@ -1248,19 +1269,15 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) {
"node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), "
"node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)",
dump_bucket(bucket_id));
-};
+}
TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) {
enable_distributor_cluster_state("distributor:1 storage:3");
- std::vector<api::MergeBucketCommand::Node> nodes;
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- for (uint32_t i = 0; i < 3; ++i) {
- nodes.emplace_back(i);
- }
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
@@ -1287,19 +1304,15 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) {
"node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
"node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
dump_bucket(bucket_id));
-};
+}
TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
enable_distributor_cluster_state("distributor:1 storage:3");
- std::vector<api::MergeBucketCommand::Node> nodes;
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- for (uint32_t i = 0; i < 3; ++i) {
- nodes.emplace_back(i);
- }
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
@@ -1326,7 +1339,41 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
"node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
"node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
dump_bucket(bucket_id));
-};
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_triggered_bucket_info_request_not_sent_to_cancelled_nodes) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ document::BucketId bucket_id(16, 1234);
+ // DB has one bucket with 3 mutually out of sync replicas. Node 0 is explicitly tagged as Active;
+ // this is done to prevent the distributor from scheduling a bucket activation as its first task.
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=10/1/1/t/a,1=20/1/1/t,2=30/1/1/t");
+
+ // Poke at the business end of the stripe until it has scheduled a merge for the inconsistent bucket
+ ASSERT_EQ(_sender.commands().size(), 0u);
+ const int max_tick_tries = 20;
+ for (int i = 0; i <= max_tick_tries; ++i) {
+ stripe_of_bucket(bucket_id).tick();
+ if (!_sender.commands().empty()) {
+ continue;
+ }
+ if (i == max_tick_tries) {
+ FAIL() << "no merge sent after ticking " << max_tick_tries << " times";
+ }
+ }
+ ASSERT_EQ(_sender.getCommands(true), "Merge bucket => 0");
+
+ auto cmd = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]);
+ _sender.commands().clear();
+ auto reply = std::make_shared<api::MergeBucketReply>(*cmd);
+
+ auto op = stripe_of_bucket(bucket_id).maintenance_op_from_message_id(cmd->getMsgId());
+ ASSERT_TRUE(op);
+
+ op->cancel(_sender, CancelScope::of_node_subset({0, 2}));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+ // RequestBucketInfo only sent to node 1
+ ASSERT_EQ(_sender.getCommands(true), "Request bucket info => 1");
+}
TEST_F(TopLevelBucketDBUpdaterTest, flush) {
enable_distributor_cluster_state("distributor:1 storage:3");
@@ -1335,11 +1382,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, flush) {
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- std::vector<api::MergeBucketCommand::Node> nodes;
- for (uint32_t i = 0; i < 3; ++i) {
- nodes.emplace_back(i);
- }
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index d6d21c89b68..959fdc612cb 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -54,6 +54,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_inhibit_default_merges_when_global_merges_pending(false),
_enable_two_phase_garbage_collection(false),
_enable_condition_probing(false),
+ _enable_operation_cancellation(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -179,6 +180,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_inhibit_default_merges_when_global_merges_pending = config.inhibitDefaultMergesWhenGlobalMergesPending;
_enable_two_phase_garbage_collection = config.enableTwoPhaseGarbageCollection;
_enable_condition_probing = config.enableConditionProbing;
+ _enable_operation_cancellation = config.enableOperationCancellation;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index e3664276518..6056f0a1d5c 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -293,6 +293,12 @@ public:
[[nodiscard]] bool enable_condition_probing() const noexcept {
return _enable_condition_probing;
}
+ void set_enable_operation_cancellation(bool enable) noexcept {
+ _enable_operation_cancellation = enable;
+ }
+ [[nodiscard]] bool enable_operation_cancellation() const noexcept {
+ return _enable_operation_cancellation;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -354,6 +360,7 @@ private:
bool _inhibit_default_merges_when_global_merges_pending;
bool _enable_two_phase_garbage_collection;
bool _enable_condition_probing;
+ bool _enable_operation_cancellation;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 95461eb5dc2..debbe443b31 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -313,3 +313,8 @@ enable_two_phase_garbage_collection bool default=true
## replicas will trigger an implicit distributed condition probe to resolve the outcome of
## the condition across all divergent replicas.
enable_condition_probing bool default=true
+
+## If true, changes in the cluster where a subset of the nodes become unavailable or buckets
+## change ownership between distributors will trigger an explicit cancellation of all pending
+## requests partially or fully "invalidated" by such a change.
+enable_operation_cancellation bool default=false
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index c889afcc77c..16a4fb5691f 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(storage_distributor OBJECT
activecopy.cpp
blockingoperationstarter.cpp
bucket_db_prune_elision.cpp
+ bucket_ownership_calculator.cpp
bucket_space_distribution_configs.cpp
bucket_space_distribution_context.cpp
bucket_space_state_map.cpp
diff --git a/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp
new file mode 100644
index 00000000000..6f94235e548
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp
@@ -0,0 +1,41 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "bucket_ownership_calculator.h"
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+
+namespace storage::distributor {
+
+namespace {
+
+uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
+ // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
+ return id.getRawId() & ~(UINT64_MAX << distribution_bits);
+}
+
+}
+
+bool
+BucketOwnershipCalculator::this_distributor_owns_bucket(const document::BucketId& bucket_id) const
+{
+ // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
+ // TODO "too few bits used" case can be cheaply checked without needing exception
+ try {
+ const auto bits = _state.getDistributionBitCount();
+ const auto this_superbucket = superbucket_from_id(bucket_id, bits);
+ if (_cached_decision_superbucket == this_superbucket) {
+ return _cached_owned;
+ }
+ uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucket_id, "uim");
+ _cached_decision_superbucket = this_superbucket;
+ _cached_owned = (distributor == _this_node_index);
+ return _cached_owned;
+ } catch (lib::TooFewBucketBitsInUseException&) {
+ // Ignore; implicitly not owned
+ } catch (lib::NoDistributorsAvailableException&) {
+ // Ignore; implicitly not owned
+ }
+ return false;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h
new file mode 100644
index 00000000000..b67bb41e85d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h
@@ -0,0 +1,44 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+
+namespace document { class BucketId; }
+
+namespace storage::lib {
+class ClusterState;
+class Distribution;
+}
+
+namespace storage::distributor {
+
+/**
+ * Calculator for determining if a bucket is owned by the current distributor.
+ * Ideal state calculations are cached and reused for all consecutive sub-buckets
+ * under the same super bucket. The cache is invalidated when a new super bucket
+ * is encountered, so it only provides a benefit when invoked in bucket ID order.
+ *
+ * Not thread safe due to internal caching.
+ */
+class BucketOwnershipCalculator {
+ const lib::ClusterState& _state;
+ const lib::Distribution& _distribution;
+ mutable uint64_t _cached_decision_superbucket;
+ const uint16_t _this_node_index;
+ mutable bool _cached_owned;
+public:
+ BucketOwnershipCalculator(const lib::ClusterState& state,
+ const lib::Distribution& distribution,
+ uint16_t this_node_index) noexcept
+ : _state(state),
+ _distribution(distribution),
+ _cached_decision_superbucket(UINT64_MAX),
+ _this_node_index(this_node_index),
+ _cached_owned(false)
+ {
+ }
+
+ [[nodiscard]] bool this_distributor_owns_bucket(const document::BucketId& bucket_id) const;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index ad1cce46bea..ac5cb740361 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -16,6 +16,8 @@
#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
+#include <vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/memoryusage.h>
@@ -177,6 +179,12 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM
return true;
}
+std::shared_ptr<Operation>
+DistributorStripe::maintenance_op_from_message_id(uint64_t msg_id) const noexcept
+{
+ return _maintenanceOperationOwner.find_by_id(msg_id);
+}
+
void
DistributorStripe::handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply)
{
@@ -210,6 +218,8 @@ DistributorStripe::handleReply(const std::shared_ptr<api::StorageReply>& reply)
bucket.getBucketId() != document::BucketId(0) &&
reply->getAddress())
{
+ // Won't be triggered for replies of cancelled ops since they will be missing
+ // from `_pendingMessageTracker` and thus `bucket` will be zero.
recheckBucketInfo(reply->getAddress()->getIndex(), bucket);
}
@@ -271,18 +281,82 @@ DistributorStripe::getClusterStateBundle() const
}
void
-DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state)
+DistributorStripe::cancel_single_message_by_id_if_found(uint64_t msg_id, const CancelScope& cancel_scope)
{
- lib::ClusterStateBundle oldState = _clusterStateBundle;
- _clusterStateBundle = state;
- propagateClusterStates();
+ // In descending order of likelihood:
+ if (_operationOwner.try_cancel_by_id(msg_id, cancel_scope)) {
+ return;
+ }
+ if (_maintenanceOperationOwner.try_cancel_by_id(msg_id, cancel_scope)) {
+ return;
+ }
+ (void)_bucketDBUpdater.cancel_message_by_id(msg_id);
+}
- const auto& baseline_state = *state.getBaselineClusterState();
- enterRecoveryMode();
+void
+DistributorStripe::handle_node_down_edge_with_cancellations(uint16_t node_index, std::span<const uint64_t> msg_ids)
+{
+ auto cancel_scope = CancelScope::of_node_subset({node_index});
+ for (const auto msg_id : msg_ids) {
+ cancel_single_message_by_id_if_found(msg_id, cancel_scope);
+ }
+}
+
+void
+DistributorStripe::cancel_ops_for_buckets_no_longer_owned(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state)
+{
+ // Note: we explicitly do not simply reuse the set of buckets removed from the bucket database
+ // when deciding which operations to cancel. This is because that would depend on every candidate
+ // bucket to cancel already being present in the DB, which is hard to guarantee always holds.
+ const auto& distribution = _bucketSpaceRepo->get(bucket_space).getDistribution();
+ BucketOwnershipCalculator ownership_calc(new_state, distribution, getDistributorIndex());
+
+ auto bucket_not_owned_in_new_state = [&](const document::Bucket& bucket) {
+ return !ownership_calc.this_distributor_owns_bucket(bucket.getBucketId());
+ };
+ auto cancel_op_by_msg_id = [&](uint64_t msg_id) {
+ cancel_single_message_by_id_if_found(msg_id, CancelScope::of_fully_cancelled());
+ };
+ _pendingMessageTracker.enumerate_matching_pending_bucket_ops(bucket_not_owned_in_new_state, cancel_op_by_msg_id);
+}
+
+void
+DistributorStripe::cancel_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle)
+{
+ // TODO we should probably only consider a node as unavailable if it is unavailable in
+ // _all_ bucket spaces. Consider: implicit maintenance mode for global merges (although
+ // that _should_ only be triggered by the CC when the node was already down...).
+ const auto& baseline_state = *new_state_bundle.getBaselineClusterState();
+ const uint16_t old_node_count = old_state_bundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
+ const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
+ const auto& distribution = _bucketSpaceRepo->get(document::FixedBucketSpaces::default_space()).getDistribution();
+ for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
+ // Handle both the case where a node may be gone from the cluster state and from the config.
+ // These are not atomic, so one may happen before the other.
+ const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState();
+ const auto* node_group = distribution.getNodeGraph().getGroupForNode(i);
+ if (!node_state.oneOf(storage_node_up_states()) || !node_group) {
+ // Note: this also clears _non-maintenance_ operations from the pending message tracker, but
+ // the client operation mapping (_operationOwner) is _not_ cleared, so replies from the
+ // unavailable node(s) will still be processed as expected.
+ std::vector<uint64_t> msg_ids = _pendingMessageTracker.clearMessagesForNode(i);
+ LOG(debug, "Node %u is unavailable, cancelling %zu pending operations", i, msg_ids.size());
+ handle_node_down_edge_with_cancellations(i, msg_ids);
+ }
+ }
+}
+
+// TODO remove once cancellation support has proven itself worthy of prime time
+void
+DistributorStripe::legacy_erase_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle)
+{
+ const auto& baseline_state = *new_state_bundle.getBaselineClusterState();
// Clear all active messages on nodes that are down.
- // TODO this should also be done on nodes that are no longer part of the config!
- const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
+ const uint16_t old_node_count = old_state_bundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState();
@@ -291,12 +365,28 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state
LOG(debug, "Node %u is down, clearing %zu pending maintenance operations", i, msgIds.size());
for (const auto & msgId : msgIds) {
- _maintenanceOperationOwner.erase(msgId);
+ (void)_maintenanceOperationOwner.erase(msgId);
}
}
}
}
+void
+DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state)
+{
+ lib::ClusterStateBundle old_state = _clusterStateBundle;
+ _clusterStateBundle = state;
+
+ propagateClusterStates();
+ enterRecoveryMode();
+
+ if (_total_config->enable_operation_cancellation()) {
+ cancel_ops_for_unavailable_nodes(old_state, state);
+ } else {
+ legacy_erase_ops_for_unavailable_nodes(old_state, state);
+ }
+}
+
OperationRoutingSnapshot DistributorStripe::read_snapshot_for_bucket(const document::Bucket& bucket) const {
return _bucketDBUpdater.read_snapshot_for_bucket(bucket);
}
@@ -308,6 +398,10 @@ DistributorStripe::notifyDistributionChangeEnabled()
// Trigger a re-scan of bucket database, just like we do when a new cluster
// state has been enabled.
enterRecoveryMode();
+
+ if (_total_config->enable_operation_cancellation()) {
+ cancel_ops_for_unavailable_nodes(_clusterStateBundle, _clusterStateBundle);
+ }
}
void
@@ -850,6 +944,9 @@ DistributorStripe::remove_superfluous_buckets(document::BucketSpace bucket_space
const lib::ClusterState& new_state,
bool is_distribution_change)
{
+ if (_total_config->enable_operation_cancellation()) {
+ cancel_ops_for_buckets_no_longer_owned(bucket_space, new_state);
+ }
return bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 338a6c72125..566e6ed454a 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -26,6 +26,7 @@
#include <atomic>
#include <mutex>
#include <queue>
+#include <span>
#include <unordered_map>
namespace storage {
@@ -135,6 +136,8 @@ public:
const lib::ClusterStateBundle& getClusterStateBundle() const override;
+ std::shared_ptr<Operation> maintenance_op_from_message_id(uint64_t msg_id) const noexcept override;
+
/**
* Called by bucket db updater after a merge has finished, and all the
* request bucket info operations have been performed as well. Passes the
@@ -251,6 +254,16 @@ private:
void enterRecoveryMode();
void leaveRecoveryMode();
+ void cancel_single_message_by_id_if_found(uint64_t msg_id, const CancelScope& cancel_scope);
+ void handle_node_down_edge_with_cancellations(uint16_t node_index, std::span<const uint64_t> msg_ids);
+ void cancel_ops_for_buckets_no_longer_owned(document::BucketSpace bucket_space, const lib::ClusterState& new_state);
+ // Note: old and new state bundles may be the same if this is called for distribution config changes
+ void cancel_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle);
+
+ void legacy_erase_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle);
+
// Tries to generate an operation from the given message. Returns true
// if we either returned an operation, or the message was otherwise handled
// (for instance, wrong distribution).
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
index dfed59499c6..14888de961e 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
@@ -18,6 +18,7 @@ namespace storage::distributor {
class DistributorMetricSet;
class NodeSupportedFeaturesRepo;
class PendingMessageTracker;
+class Operation;
/**
* TODO STRIPE add class comment.
@@ -27,7 +28,7 @@ class DistributorStripeInterface : public DistributorStripeMessageSender
public:
virtual DistributorMetricSet& getMetrics() = 0;
virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0;
- virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0;
+ [[nodiscard]] virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0;
virtual void notifyDistributionChangeEnabled() = 0;
/**
@@ -57,7 +58,9 @@ public:
/**
* Returns true if the node is currently initializing.
*/
- virtual bool initializing() const = 0;
+ [[nodiscard]] virtual bool initializing() const = 0;
+
+ [[nodiscard]] virtual std::shared_ptr<Operation> maintenance_op_from_message_id(uint64_t msg_id) const noexcept = 0;
virtual void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>&) = 0;
virtual const DistributorConfiguration& getConfig() const = 0;
virtual ChainedMessageSender& getMessageSender() = 0;
diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h
index a0234f425a0..92cc921d91c 100644
--- a/storage/src/vespa/storage/distributor/messagetracker.h
+++ b/storage/src/vespa/storage/distributor/messagetracker.h
@@ -25,7 +25,7 @@ public:
uint16_t _target;
};
- MessageTracker(const ClusterContext& cluster_context);
+ explicit MessageTracker(const ClusterContext& cluster_context);
MessageTracker(MessageTracker&&) noexcept = default;
MessageTracker& operator=(MessageTracker&&) noexcept = delete;
MessageTracker(const MessageTracker &) = delete;
diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp
index c92544c8cb5..16bbc36e4bc 100644
--- a/storage/src/vespa/storage/distributor/operationowner.cpp
+++ b/storage/src/vespa/storage/distributor/operationowner.cpp
@@ -70,10 +70,27 @@ OperationOwner::onClose()
}
}
-void
+std::shared_ptr<Operation>
+OperationOwner::find_by_id(api::StorageMessage::Id msg_id) const noexcept
+{
+ return _sentMessageMap.find_by_id_or_empty(msg_id);
+}
+
+bool
+OperationOwner::try_cancel_by_id(api::StorageMessage::Id id, const CancelScope& cancel_scope)
+{
+ auto* op = _sentMessageMap.find_by_id_or_nullptr(id);
+ if (!op) {
+ return false;
+ }
+ op->cancel(_sender, cancel_scope);
+ return true;
+}
+
+std::shared_ptr<Operation>
OperationOwner::erase(api::StorageMessage::Id msgId)
{
- (void)_sentMessageMap.pop(msgId);
+ return _sentMessageMap.pop(msgId);
}
}
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index 009bb5b80aa..828d776f1a6 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -9,6 +9,7 @@ namespace storage::framework { struct Clock; }
namespace storage::distributor {
+class CancelScope;
class Operation;
/**
@@ -87,10 +88,19 @@ public:
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
/**
- If the given message exists, create a reply and pass it to the
- appropriate callback.
+ * If the given message exists, remove it from the internal operation mapping.
+ *
+ * Returns the operation the message belonged to, if any.
*/
- void erase(api::StorageMessage::Id msgId);
+ [[nodiscard]] std::shared_ptr<Operation> erase(api::StorageMessage::Id msgId);
+
+ /**
+ * Returns a strong ref to the pending operation with the given msg_id if it exists.
+ * Otherwise returns an empty shared_ptr.
+ */
+ [[nodiscard]] std::shared_ptr<Operation> find_by_id(api::StorageMessage::Id msg_id) const noexcept;
+
+ [[nodiscard]] bool try_cancel_by_id(api::StorageMessage::Id msg_id, const CancelScope& cancel_scope);
[[nodiscard]] DistributorStripeMessageSender& sender() noexcept { return _sender; }
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index bd7f3709575..03be507f467 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -178,6 +178,8 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
if (_bucket_space.has_pending_cluster_state()) {
state_version_now = _bucket_space.get_pending_cluster_state().getVersion();
}
+ // TODO disable these explicit (and possibly costly) checks when cancellation is enabled,
+ // as cancellation shall cover a superset of the cases that can be detected here.
if ((state_version_now != _cluster_state_version_at_creation_time)
&& (replica_set_changed_after_get_operation()
|| distributor_no_longer_owns_bucket()))
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index c7f858de608..5735e47ec87 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -28,8 +28,8 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencing_handle)
: SequencedOperation(std::move(sequencing_handle)),
- _tracker_instance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_tracker_instance),
+ _tracker(metric, std::make_shared<api::PutReply>(*msg), node_ctx,
+ op_ctx, _cancel_scope, msg->getTimestamp()),
_msg(std::move(msg)),
_doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
_node_ctx(node_ctx),
@@ -253,7 +253,6 @@ PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScop
if (_check_condition) {
_check_condition->cancel(sender, cancel_scope);
}
- _tracker.cancel(cancel_scope);
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 8b8e3e15375..4d26ffda61e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -39,8 +39,7 @@ public:
void onClose(DistributorStripeMessageSender& sender) override;
private:
- PersistenceMessageTrackerImpl _tracker_instance;
- PersistenceMessageTracker& _tracker;
+ PersistenceMessageTracker _tracker;
std::shared_ptr<api::PutCommand> _msg;
document::BucketId _doc_id_bucket_id;
const DistributorNodeContext& _node_ctx;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
index 5f52a8208fc..ab3855b2687 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
@@ -23,12 +23,7 @@ RemoveLocationOperation::RemoveLocationOperation(
std::shared_ptr<api::RemoveLocationCommand> msg,
PersistenceOperationMetricSet& metric)
: Operation(),
- _trackerInstance(metric,
- std::make_shared<api::RemoveLocationReply>(*msg),
- node_ctx,
- op_ctx,
- 0),
- _tracker(_trackerInstance),
+ _tracker(metric, std::make_shared<api::RemoveLocationReply>(*msg), node_ctx, op_ctx, _cancel_scope, 0),
_msg(std::move(msg)),
_node_ctx(node_ctx),
_parser(parser),
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
index d177676ff03..1ac4af0997a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
@@ -10,8 +10,7 @@ namespace storage::distributor {
class DistributorBucketSpace;
-class RemoveLocationOperation : public Operation
-{
+class RemoveLocationOperation : public Operation {
public:
RemoveLocationOperation(const DistributorNodeContext& node_ctx,
DistributorStripeOperationContext& op_ctx,
@@ -32,14 +31,11 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>&) override;
void onClose(DistributorStripeMessageSender& sender) override;
private:
- PersistenceMessageTrackerImpl _trackerInstance;
- PersistenceMessageTracker& _tracker;
-
+ PersistenceMessageTracker _tracker;
std::shared_ptr<api::RemoveLocationCommand> _msg;
-
- const DistributorNodeContext& _node_ctx;
- const DocumentSelectionParser& _parser;
- DistributorBucketSpace &_bucketSpace;
+ const DistributorNodeContext& _node_ctx;
+ const DocumentSelectionParser& _parser;
+ DistributorBucketSpace& _bucketSpace;
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index be43aac3d9e..c7cfea017ac 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -19,10 +19,8 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencingHandle)
: SequencedOperation(std::move(sequencingHandle)),
- _tracker_instance(metric,
- std::make_shared<api::RemoveReply>(*msg),
- node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_tracker_instance),
+ _tracker(metric, std::make_shared<api::RemoveReply>(*msg), node_ctx,
+ op_ctx, _cancel_scope, msg->getTimestamp()),
_msg(std::move(msg)),
_doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
_node_ctx(node_ctx),
@@ -168,7 +166,6 @@ RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelS
if (_check_condition) {
_check_condition->cancel(sender, cancel_scope);
}
- _tracker.cancel(cancel_scope);
}
bool RemoveOperation::has_condition() const noexcept {
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 221def81fdc..772047b96ca 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -32,8 +32,7 @@ public:
void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
private:
- PersistenceMessageTrackerImpl _tracker_instance;
- PersistenceMessageTracker& _tracker;
+ PersistenceMessageTracker _tracker;
std::shared_ptr<api::RemoveCommand> _msg;
document::BucketId _doc_id_bucket_id;
const DistributorNodeContext& _node_ctx;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 60bddebbb89..90f3f8c9c43 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -25,9 +25,8 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx,
std::vector<BucketDatabase::Entry> entries,
UpdateMetricSet& metric)
: Operation(),
- _trackerInstance(metric, std::make_shared<api::UpdateReply>(*msg),
- node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_trackerInstance),
+ _tracker(metric, std::make_shared<api::UpdateReply>(*msg), node_ctx,
+ op_ctx, _cancel_scope, msg->getTimestamp()),
_msg(msg),
_entries(std::move(entries)),
_new_timestamp(_msg->getTimestamp()),
@@ -207,13 +206,6 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender)
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
-void
-UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope)
-{
- _tracker.cancel(cancel_scope);
-}
-
-
// The backend behavior of "create-if-missing" updates is to return the timestamp of the
// _new_ update operation if the document was created from scratch. The two-phase update
// operation logic auto-detects unexpected inconsistencies and tries to reconcile
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 7d2131d426d..750e50aeae5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -31,25 +31,22 @@ public:
std::string getStatus() const override { return ""; };
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
void onClose(DistributorStripeMessageSender& sender) override;
- void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const {
return _newestTimestampLocation;
}
private:
- PersistenceMessageTrackerImpl _trackerInstance;
- PersistenceMessageTracker& _tracker;
- std::shared_ptr<api::UpdateCommand> _msg;
- std::vector<BucketDatabase::Entry> _entries;
- const api::Timestamp _new_timestamp;
- const bool _is_auto_create_update;
-
- const DistributorNodeContext& _node_ctx;
- DistributorStripeOperationContext& _op_ctx;
- DistributorBucketSpace &_bucketSpace;
+ PersistenceMessageTracker _tracker;
+ std::shared_ptr<api::UpdateCommand> _msg;
+ std::vector<BucketDatabase::Entry> _entries;
+ const api::Timestamp _new_timestamp;
+ const bool _is_auto_create_update;
+ const DistributorNodeContext& _node_ctx;
+ DistributorStripeOperationContext& _op_ctx;
+ DistributorBucketSpace& _bucketSpace;
std::pair<document::BucketId, uint16_t> _newestTimestampLocation;
- api::BucketInfo _infoAtSendTime; // Should be same across all replicas
+ api::BucketInfo _infoAtSendTime; // Should be same across all replicas
bool anyStorageNodesAvailable() const;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index bf64fa2eb82..e384163f421 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -23,7 +23,6 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
_cluster_state_version_at_phase1_start_time(0),
_remove_candidates(),
_replica_info(),
- _cancel_scope(),
_max_documents_removed(0),
_is_done(false)
{}
@@ -150,10 +149,6 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender,
}
}
-void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) {
- _cancel_scope.merge(cancel_scope);
-}
-
void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
_replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
from_node, reply.getBucketInfo());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 97efbe694de..d5c6d655857 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -7,7 +7,6 @@
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
-#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>
@@ -23,7 +22,6 @@ public:
void onStart(DistributorStripeMessageSender& sender) override;
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
- void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
const char* getName() const noexcept override { return "garbagecollection"; };
Type getType() const noexcept override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
@@ -56,7 +54,6 @@ private:
RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
- CancelScope _cancel_scope;
uint32_t _max_documents_removed;
bool _is_done;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index a69b7739e07..09082e718e0 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -28,6 +28,7 @@ IdealStateOperation::IdealStateOperation(const BucketAndNodes& bucketAndNodes)
: _manager(nullptr),
_bucketSpace(nullptr),
_bucketAndNodes(bucketAndNodes),
+ _detailedReason(),
_priority(255),
_ok(true)
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index 6c52bdb738d..ba4a2f95686 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/storage/distributor/maintenance/maintenanceoperation.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageapi/messageapi/storagereply.h>
#include <vespa/storageapi/messageapi/maintenancecommand.h>
@@ -110,7 +111,7 @@ public:
using Vector = std::vector<SP>;
using Map = std::map<document::BucketId, SP>;
- IdealStateOperation(const BucketAndNodes& bucketAndNodes);
+ explicit IdealStateOperation(const BucketAndNodes& bucketAndNodes);
~IdealStateOperation() override;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 616c4962dca..6153306861c 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -93,14 +93,18 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
void
JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
- auto& rep = static_cast<api::JoinBucketsReply&>(*msg);
+ auto& rep = dynamic_cast<api::JoinBucketsReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
if (node == 0xffff) {
LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons");
return;
}
- if (rep.getResult().success()) {
+ _ok = rep.getResult().success();
+ if (_cancel_scope.node_is_cancelled(node)) {
+ LOG(debug, "Join operation for %s has been cancelled", getBucketId().toString().c_str());
+ _ok = false;
+ } else if (rep.getResult().success()) {
const std::vector<document::BucketId>& sourceBuckets(
rep.getSourceBuckets());
for (auto bucket : sourceBuckets) {
@@ -133,7 +137,6 @@ JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRepl
LOG(debug, "Join failed for %s with non-critical failure: %s",
getBucketId().toString().c_str(), rep.getResult().toString().c_str());
}
- _ok = rep.getResult().success();
LOG(debug, "Bucket %s join finished", getBucketId().toString().c_str());
if (_tracker.finished()) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 9469403daae..0a11a8233aa 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -177,7 +177,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge(
const BucketDatabase::Entry& currentState) const
{
assert(currentState.valid());
- for (const auto & mnode : _mnodes) {
+ for (const auto& mnode : _mnodes) {
const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index));
if (!copyBefore) {
continue;
@@ -205,7 +205,7 @@ MergeOperation::deleteSourceOnlyNodes(
{
assert(currentState.valid());
std::vector<uint16_t> sourceOnlyNodes;
- for (const auto & mnode : _mnodes) {
+ for (const auto& mnode : _mnodes) {
const uint16_t nodeIndex = mnode.index;
const BucketCopy* copy = currentState->getNode(nodeIndex);
if (!copy) {
@@ -272,7 +272,14 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::sha
api::ReturnCode result = reply.getResult();
_ok = result.success();
- if (_ok) {
+ // We avoid replica deletion entirely if _any_ aspect of the merge has been cancelled.
+ // It is, for instance, possible that a node that was previously considered source-only
+ // now is part of the #redundancy ideal copies because another node became unavailable.
+ // Leave it up to the maintenance state checkers to figure this out.
+ if (_cancel_scope.is_cancelled()) {
+ LOG(debug, "Merge operation for %s has been cancelled", getBucketId().toString().c_str());
+ _ok = false;
+ } else if (_ok) {
BucketDatabase::Entry entry(_bucketSpace->getBucketDatabase().get(getBucketId()));
if (!entry.valid()) {
LOG(debug, "Bucket %s no longer exists after merge", getBucketId().toString().c_str());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 41767f0e3af..2184739f82c 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -59,13 +59,14 @@ RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply
{
auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get());
- uint16_t node = _tracker.handleReply(*rep);
+ const uint16_t node = _tracker.handleReply(*rep);
- LOG(debug, "Got DeleteBucket reply for %s from node %u",
- getBucketId().toString().c_str(),
- node);
+ LOG(debug, "Got DeleteBucket reply for %s from node %u", getBucketId().toString().c_str(), node);
- if (rep->getResult().failed()) {
+ if (_cancel_scope.node_is_cancelled(node)) {
+ LOG(debug, "DeleteBucket operation for %s has been cancelled", getBucketId().toString().c_str());
+ _ok = false;
+ } else if (rep->getResult().failed()) {
if (rep->getResult().getResult() == api::ReturnCode::REJECTED
&& rep->getBucketInfo().valid())
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
index 531f7f64b68..5ddf082a544 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
@@ -69,13 +69,17 @@ void
SetBucketStateOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply)
{
- auto& rep = static_cast<api::SetBucketStateReply&>(*reply);
+ auto& rep = dynamic_cast<api::SetBucketStateReply&>(*reply);
const uint16_t node = _tracker.handleReply(rep);
LOG(debug, "Got %s from node %u", reply->toString(true).c_str(), node);
bool deactivate = false;
- if (reply->getResult().success()) {
+
+ if (_cancel_scope.node_is_cancelled(node)) {
+ LOG(debug, "SetBucketState for %s has been cancelled", rep.getBucketId().toString().c_str());
+ _ok = false;
+ } else if (reply->getResult().success()) {
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId());
if (entry.valid()) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index d704a42e96b..c894deeecd8 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -50,7 +50,7 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender)
void
SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
- auto & rep = static_cast<api::SplitBucketReply&>(*msg);
+ auto& rep = dynamic_cast<api::SplitBucketReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
@@ -61,7 +61,9 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep
std::ostringstream ost;
- if (rep.getResult().success()) {
+ if (_cancel_scope.node_is_cancelled(node)) {
+ _ok = false;
+ } else if (rep.getResult().success()) {
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId());
if (entry.valid()) {
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index 9f944a94178..f60dc8eecff 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -13,7 +13,7 @@ namespace storage::distributor {
Operation::Operation()
: _startTime(),
- _cancelled(false)
+ _cancel_scope()
{
}
@@ -47,7 +47,7 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo
}
void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
- _cancelled = true;
+ _cancel_scope.merge(cancel_scope);
on_cancel(sender, cancel_scope);
}
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index 64caacfc642..c742f918c30 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "cancel_scope.h"
#include <vespa/vdslib/state/nodetype.h>
#include <vespa/storage/distributor/distributormessagesender.h>
#include <vespa/vespalib/util/time.h>
@@ -68,13 +69,15 @@ public:
*/
void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+ [[nodiscard]] const CancelScope& cancel_scope() const noexcept { return _cancel_scope; }
+
/**
* Whether cancel() has been invoked at least once on this instance. This does not
* distinguish between cancellations caused by ownership transfers and those caused
* by nodes becoming unavailable; Operation implementations that care about this need
- * to implement cancel() themselves and inspect the provided CancelScope.
+ * to inspect cancel_scope() themselves.
*/
- [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; }
+ [[nodiscard]] bool is_cancelled() const noexcept { return _cancel_scope.is_cancelled(); }
/**
* Returns true if we are blocked to start this operation given
@@ -118,7 +121,7 @@ protected:
static constexpr vespalib::duration MAX_TIMEOUT = 3600s;
vespalib::system_time _startTime;
- bool _cancelled;
+ CancelScope _cancel_scope;
};
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index 498f3a5feab..43e3f6831d4 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -13,11 +13,12 @@ LOG_SETUP(".persistencemessagetracker");
namespace storage::distributor {
-PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
+PersistenceMessageTracker::PersistenceMessageTracker(
PersistenceOperationMetricSet& metric,
std::shared_ptr<api::BucketInfoReply> reply,
const DistributorNodeContext& node_ctx,
DistributorStripeOperationContext& op_ctx,
+ CancelScope& cancel_scope,
api::Timestamp revertTimestamp)
: MessageTracker(node_ctx),
_remapBucketInfo(),
@@ -28,7 +29,7 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
_revertTimestamp(revertTimestamp),
_trace(_reply->getTrace().getLevel()),
_requestTimer(node_ctx.clock()),
- _cancel_scope(),
+ _cancel_scope(cancel_scope),
_n_persistence_replies_total(0),
_n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
@@ -36,33 +37,35 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
{
}
-PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default;
+PersistenceMessageTracker::~PersistenceMessageTracker() = default;
-void
-PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope)
-{
- _cancel_scope.merge(cancel_scope);
-}
-
-void
-PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present(
+PersistenceMessageTracker::PostPruningStatus
+PersistenceMessageTracker::prune_cancelled_nodes_if_present(
BucketInfoMap& bucket_and_replicas,
const CancelScope& cancel_scope)
{
+ bool any_replicas = false;
for (auto& info : bucket_and_replicas) {
info.second = prune_cancelled_nodes(info.second, cancel_scope);
+ any_replicas |= !info.second.empty();
}
+ return (any_replicas ? PostPruningStatus ::ReplicasStillPresent
+ : PostPruningStatus::NoReplicasPresent);
}
void
-PersistenceMessageTrackerImpl::updateDB()
+PersistenceMessageTracker::updateDB()
{
if (_cancel_scope.is_cancelled()) {
if (_cancel_scope.fully_cancelled()) {
return; // Fully cancelled ops cannot mutate the DB at all
}
- prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope);
- prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope);
+ const bool any_infos = still_has_replicas(prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope));
+ const bool any_remapped = still_has_replicas(prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope));
+ if (!(any_infos || any_remapped)) {
+ LOG(spam, "No usable bucket info left after pruning; returning without updating DB");
+ return;
+ }
}
for (const auto & entry : _bucketInfo) {
@@ -75,7 +78,7 @@ PersistenceMessageTrackerImpl::updateDB()
}
void
-PersistenceMessageTrackerImpl::updateMetrics()
+PersistenceMessageTracker::updateMetrics()
{
const api::ReturnCode& result(_reply->getResult());
_metric.updateFromResult(result);
@@ -83,7 +86,7 @@ PersistenceMessageTrackerImpl::updateMetrics()
}
void
-PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode& result) {
+PersistenceMessageTracker::fail(MessageSender& sender, const api::ReturnCode& result) {
if (_reply.get()) {
_reply->setResult(result);
updateMetrics();
@@ -94,7 +97,7 @@ PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode
}
uint16_t
-PersistenceMessageTrackerImpl::receiveReply(MessageSender& sender, api::BucketInfoReply& reply)
+PersistenceMessageTracker::receiveReply(MessageSender& sender, api::BucketInfoReply& reply)
{
uint16_t node = handleReply(reply);
@@ -106,7 +109,7 @@ PersistenceMessageTrackerImpl::receiveReply(MessageSender& sender, api::BucketIn
}
void
-PersistenceMessageTrackerImpl::revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes)
+PersistenceMessageTracker::revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes)
{
if (_revertTimestamp != 0) {
// Since we're reverting, all received bucket info is voided.
@@ -126,7 +129,7 @@ PersistenceMessageTrackerImpl::revert(MessageSender& sender, const std::vector<B
}
void
-PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) {
+PersistenceMessageTracker::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) {
_messageBatches.emplace_back();
auto & batch = _messageBatches.back();
batch.reserve(messages.size());
@@ -142,7 +145,7 @@ PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToS
}
bool
-PersistenceMessageTrackerImpl::canSendReplyEarly() const
+PersistenceMessageTracker::canSendReplyEarly() const
{
if (!_reply.get() || !_reply->getResult().success()) {
LOG(spam, "Can't return early because we have already replied or failed");
@@ -181,7 +184,7 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const
}
void
-PersistenceMessageTrackerImpl::addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply)
+PersistenceMessageTracker::addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply)
{
document::Bucket bucket(reply.getBucket());
const api::BucketInfo& bucketInfo(reply.getBucketInfo());
@@ -198,7 +201,7 @@ PersistenceMessageTrackerImpl::addBucketInfoFromReply(uint16_t node, const api::
}
void
-PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const
+PersistenceMessageTracker::logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const
{
LOG(spam, "Bucket %s: Received successful reply %s",
reply.getBucketId().toString().c_str(), reply.toString().c_str());
@@ -211,26 +214,26 @@ PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::Buck
}
bool
-PersistenceMessageTrackerImpl::shouldRevert() const
+PersistenceMessageTracker::shouldRevert() const
{
return _op_ctx.distributor_config().enable_revert()
&& !_revertNodes.empty() && !_success && _reply;
}
-bool PersistenceMessageTrackerImpl::has_majority_successful_replies() const noexcept {
+bool PersistenceMessageTracker::has_majority_successful_replies() const noexcept {
// FIXME this has questionable interaction with early client ACK since we only count
// the number of observed replies rather than the number of total requests sent.
// ... but the early ACK-feature dearly needs a redesign anyway.
return (_n_successful_persistence_replies >= (_n_persistence_replies_total / 2 + 1));
}
-bool PersistenceMessageTrackerImpl::has_minority_test_and_set_failure() const noexcept {
+bool PersistenceMessageTracker::has_minority_test_and_set_failure() const noexcept {
return ((_reply->getResult().getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED)
&& has_majority_successful_replies());
}
void
-PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
+PersistenceMessageTracker::sendReply(MessageSender& sender)
{
// If we've observed _partial_ TaS failures but have had a majority of good ACKs,
// treat the reply as successful. This is because the ACKed write(s) will eventually
@@ -247,7 +250,7 @@ PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
}
void
-PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& reply)
+PersistenceMessageTracker::updateFailureResult(const api::BucketInfoReply& reply)
{
LOG(debug, "Bucket %s: Received failed reply %s with result %s",
reply.getBucketId().toString().c_str(), reply.toString().c_str(), reply.getResult().toString().c_str());
@@ -259,13 +262,13 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r
}
bool
-PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept
+PersistenceMessageTracker::node_is_effectively_cancelled(uint16_t node) const noexcept
{
return _cancel_scope.node_is_cancelled(node); // Implicitly covers the fully cancelled case
}
void
-PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node)
+PersistenceMessageTracker::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node)
{
LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node);
if (!reply.getResult().success()
@@ -285,7 +288,7 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& rep
}
void
-PersistenceMessageTrackerImpl::handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node)
+PersistenceMessageTracker::handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node)
{
++_n_persistence_replies_total;
if (reply.getBucketInfo().valid()) {
@@ -301,7 +304,7 @@ PersistenceMessageTrackerImpl::handlePersistenceReply(api::BucketInfoReply& repl
}
void
-PersistenceMessageTrackerImpl::transfer_trace_state_to_reply()
+PersistenceMessageTracker::transfer_trace_state_to_reply()
{
if (!_trace.isEmpty()) {
_trace.setStrict(false);
@@ -310,7 +313,7 @@ PersistenceMessageTrackerImpl::transfer_trace_state_to_reply()
}
void
-PersistenceMessageTrackerImpl::updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node)
+PersistenceMessageTracker::updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node)
{
_trace.addChild(reply.steal_trace());
@@ -338,7 +341,7 @@ PersistenceMessageTrackerImpl::updateFromReply(MessageSender& sender, api::Bucke
}
void
-PersistenceMessageTrackerImpl::add_trace_tree_to_reply(vespalib::Trace trace)
+PersistenceMessageTracker::add_trace_tree_to_reply(vespalib::Trace trace)
{
_trace.addChild(std::move(trace));
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 8c44d70062c..06ad8dc95b3 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -11,47 +11,29 @@
namespace storage::distributor {
-struct PersistenceMessageTracker {
- virtual ~PersistenceMessageTracker() = default;
- using ToSend = MessageTracker::ToSend;
-
- virtual void cancel(const CancelScope& cancel_scope) = 0;
- virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
- virtual void queueMessageBatch(std::vector<ToSend> messages) = 0;
- virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0;
- virtual std::shared_ptr<api::BucketInfoReply>& getReply() = 0;
- virtual void updateFromReply(MessageSender&, api::BucketInfoReply&, uint16_t node) = 0;
- virtual void queueCommand(api::BucketCommand::SP, uint16_t target) = 0;
- virtual void flushQueue(MessageSender&) = 0;
- virtual uint16_t handleReply(api::BucketReply& reply) = 0;
- virtual void add_trace_tree_to_reply(vespalib::Trace trace) = 0;
-};
-
-class PersistenceMessageTrackerImpl final
- : public PersistenceMessageTracker,
- public MessageTracker
-{
+class PersistenceMessageTracker final : public MessageTracker {
public:
- PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric,
- std::shared_ptr<api::BucketInfoReply> reply,
- const DistributorNodeContext& node_ctx,
- DistributorStripeOperationContext& op_ctx,
- api::Timestamp revertTimestamp = 0);
- ~PersistenceMessageTrackerImpl() override;
+ using ToSend = MessageTracker::ToSend;
- void cancel(const CancelScope& cancel_scope) override;
+ PersistenceMessageTracker(PersistenceOperationMetricSet& metric,
+ std::shared_ptr<api::BucketInfoReply> reply,
+ const DistributorNodeContext& node_ctx,
+ DistributorStripeOperationContext& op_ctx,
+ CancelScope& cancel_scope,
+ api::Timestamp revertTimestamp = 0);
+ ~PersistenceMessageTracker();
void updateDB();
void updateMetrics();
[[nodiscard]] bool success() const noexcept { return _success; }
- void fail(MessageSender& sender, const api::ReturnCode& result) override;
+ void fail(MessageSender& sender, const api::ReturnCode& result);
/**
Returns the node the reply was from.
*/
- uint16_t receiveReply(MessageSender& sender, api::BucketInfoReply& reply) override;
- void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) override;
- std::shared_ptr<api::BucketInfoReply>& getReply() override { return _reply; }
+ uint16_t receiveReply(MessageSender& sender, api::BucketInfoReply& reply);
+ void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node);
+ std::shared_ptr<api::BucketInfoReply>& getReply() { return _reply; }
using BucketNodePair = std::pair<document::Bucket, uint16_t>;
@@ -63,7 +45,9 @@ public:
have at most (messages.size() - initial redundancy) messages left in the
queue and have it's first message be done.
*/
- void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override;
+ void queueMessageBatch(std::vector<MessageTracker::ToSend> messages);
+
+ void add_trace_tree_to_reply(vespalib::Trace trace);
private:
using MessageBatch = std::vector<uint64_t>;
@@ -79,14 +63,25 @@ private:
std::vector<BucketNodePair> _revertNodes;
mbus::Trace _trace;
framework::MilliSecTimer _requestTimer;
- CancelScope _cancel_scope;
+ CancelScope& _cancel_scope;
uint32_t _n_persistence_replies_total;
uint32_t _n_successful_persistence_replies;
uint8_t _priority;
bool _success;
- static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas,
- const CancelScope& cancel_scope);
+ enum class PostPruningStatus {
+ ReplicasStillPresent,
+ NoReplicasPresent
+ };
+
+ constexpr static bool still_has_replicas(PostPruningStatus status) {
+ return status == PostPruningStatus::ReplicasStillPresent;
+ }
+
+ // Returns ReplicasStillPresent iff `bucket_and_replicas` has at least 1 usable entry after pruning,
+ // otherwise returns NoReplicasPresent
+ [[nodiscard]] static PostPruningStatus prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope);
[[nodiscard]] bool canSendReplyEarly() const;
void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply);
void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const;
@@ -100,13 +95,6 @@ private:
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);
void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node);
void transfer_trace_state_to_reply();
-
- void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) override {
- MessageTracker::queueCommand(std::move(msg), target);
- }
- void flushQueue(MessageSender& s) override { MessageTracker::flushQueue(s); }
- uint16_t handleReply(api::BucketReply& r) override { return MessageTracker::handleReply(r); }
- void add_trace_tree_to_reply(vespalib::Trace trace) override;
};
}
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.cpp b/storage/src/vespa/storage/distributor/sentmessagemap.cpp
index 4b7292c1e81..2ae70f417d1 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.cpp
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.cpp
@@ -18,18 +18,30 @@ SentMessageMap::SentMessageMap()
SentMessageMap::~SentMessageMap() = default;
+Operation*
+SentMessageMap::find_by_id_or_nullptr(api::StorageMessage::Id id) const noexcept
+{
+ auto iter = _map.find(id);
+ return ((iter != _map.end()) ? iter->second.get() : nullptr);
+}
+
+std::shared_ptr<Operation>
+SentMessageMap::find_by_id_or_empty(api::StorageMessage::Id id) const noexcept
+{
+ auto iter = _map.find(id);
+ return ((iter != _map.end()) ? iter->second : std::shared_ptr<Operation>());
+}
std::shared_ptr<Operation>
SentMessageMap::pop()
{
auto found = _map.begin();
-
if (found != _map.end()) {
- std::shared_ptr<Operation> retVal = found->second;
+ std::shared_ptr<Operation> op = std::move(found->second);
_map.erase(found);
- return retVal;
+ return op;
} else {
- return std::shared_ptr<Operation>();
+ return {};
}
}
@@ -37,17 +49,15 @@ std::shared_ptr<Operation>
SentMessageMap::pop(api::StorageMessage::Id id)
{
auto found = _map.find(id);
-
if (found != _map.end()) {
LOG(spam, "Found Id %" PRIu64 " in callback map: %p", id, found->second.get());
- std::shared_ptr<Operation> retVal = found->second;
+ std::shared_ptr<Operation> op = std::move(found->second);
_map.erase(found);
- return retVal;
+ return op;
} else {
LOG(spam, "Did not find Id %" PRIu64 " in callback map", id);
-
- return std::shared_ptr<Operation>();
+ return {};
}
}
@@ -55,7 +65,6 @@ void
SentMessageMap::insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & callback)
{
LOG(spam, "Inserting callback %p for message %" PRIu64 "", callback.get(), id);
-
_map[id] = callback;
}
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 951ed6a6877..3ad80f4e55d 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -15,6 +15,11 @@ public:
SentMessageMap();
~SentMessageMap();
+ // Find by message ID, or nullptr if not found
+ [[nodiscard]] Operation* find_by_id_or_nullptr(api::StorageMessage::Id id) const noexcept;
+ // Find by message ID, or empty shared_ptr if not found
+ [[nodiscard]] std::shared_ptr<Operation> find_by_id_or_empty(api::StorageMessage::Id id) const noexcept;
+
[[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
[[nodiscard]] std::shared_ptr<Operation> pop();
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
index fd747484ccf..ad8fae9cd74 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
@@ -130,6 +130,7 @@ StripeBucketDBUpdater::sendRequestBucketInfo(
const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
{
+ // TODO assert if cancellation enabled
if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) {
return;
}
@@ -342,14 +343,19 @@ StripeBucketDBUpdater::onMergeBucketReply(
const std::shared_ptr<api::MergeBucketReply>& reply)
{
auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply);
+ auto merge_op = _distributor_interface.maintenance_op_from_message_id(reply->getMsgId());
// In case the merge was unsuccessful somehow, or some nodes weren't
// actually merged (source-only nodes?) we request the bucket info of the
// bucket again to make sure it's ok.
for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
- sendRequestBucketInfo(reply->getNodes()[i].index,
- reply->getBucket(),
- replyGuard);
+ const uint16_t node_index = reply->getNodes()[i].index;
+ // We conditionally omit the node instead of conditionally send to it, as many tests do not
+ // wire their merges through the main distributor maintenance operation tracking locking.
+ if (merge_op && merge_op->cancel_scope().node_is_cancelled(node_index)) {
+ continue;
+ }
+ sendRequestBucketInfo(node_index, reply->getBucket(), replyGuard);
}
return true;
@@ -502,8 +508,9 @@ StripeBucketDBUpdater::processSingleBucketInfoReply(const std::shared_ptr<api::R
BucketRequest req = iter->second;
_sentMessages.erase(iter);
- if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
- // Ignore replies from nodes that are down.
+ // TODO remove explicit node check in favor of cancellation only
+ if (req.cancelled || !_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
+ // Ignore replies from nodes that are cancelled/down.
return true;
}
if (repl->getResult().getResult() != api::ReturnCode::OK) {
@@ -516,6 +523,17 @@ StripeBucketDBUpdater::processSingleBucketInfoReply(const std::shared_ptr<api::R
return true;
}
+bool
+StripeBucketDBUpdater::cancel_message_by_id(uint64_t msg_id)
+{
+ auto iter = _sentMessages.find(msg_id);
+ if (iter == _sentMessages.end()) {
+ return false;
+ }
+ iter->second.cancelled = true;
+ return true;
+}
+
void
StripeBucketDBUpdater::addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
BucketListMerger::BucketList& existing)
@@ -652,12 +670,10 @@ StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
_nonOwnedBuckets(),
_removed_buckets(0),
_removed_documents(0),
- _localIndex(localIndex),
_distribution(distribution),
_upStates(upStates),
- _track_non_owned_entries(track_non_owned_entries),
- _cachedDecisionSuperbucket(UINT64_MAX),
- _cachedOwned(false)
+ _ownership_calc(_state, _distribution, localIndex),
+ _track_non_owned_entries(track_non_owned_entries)
{
const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
_available_nodes.resize(storage_count);
@@ -678,45 +694,15 @@ StripeBucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& b
LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
}
-namespace {
-
-uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
- // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
- return id.getRawId() & ~(UINT64_MAX << distribution_bits);
-}
-
-}
-
bool
StripeBucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
const document::BucketId& bucketId) const
{
- // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
- // TODO "too few bits used" case can be cheaply checked without needing exception
- try {
- const auto bits = _state.getDistributionBitCount();
- const auto this_superbucket = superbucket_from_id(bucketId, bits);
- if (_cachedDecisionSuperbucket == this_superbucket) {
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor (cached)");
- }
- return _cachedOwned;
- }
-
- uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim");
- _cachedDecisionSuperbucket = this_superbucket;
- _cachedOwned = (distributor == _localIndex);
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor");
- return false;
- }
- return true;
- } catch (lib::TooFewBucketBitsInUseException& exc) {
- logRemove(bucketId, "using too few distribution bits now");
- } catch (lib::NoDistributorsAvailableException& exc) {
- logRemove(bucketId, "no distributors are available");
+ const bool owns_bucket = _ownership_calc.this_distributor_owns_bucket(bucketId);
+ if (!owns_bucket) {
+ logRemove(bucketId, "bucket now owned by another distributor");
}
- return false;
+ return owns_bucket;
}
void
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 2e4ef2a7543..9536c84691d 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "bucket_ownership_calculator.h"
#include "bucketlistmerger.h"
#include "distributor_stripe_component.h"
#include "distributormessagesender.h"
@@ -49,6 +50,7 @@ public:
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
void resendDelayedMessages();
+ [[nodiscard]] bool cancel_message_by_id(uint64_t msg_id);
vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
@@ -75,7 +77,8 @@ public:
private:
class MergeReplyGuard {
public:
- MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
+ MergeReplyGuard(DistributorStripeInterface& distributor_interface,
+ const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
: _distributor_interface(distributor_interface), _reply(reply) {}
~MergeReplyGuard();
@@ -89,22 +92,23 @@ private:
};
struct BucketRequest {
- BucketRequest()
- : targetNode(0), bucket(), timestamp(0) {};
+ BucketRequest() noexcept : targetNode(0), bucket(), timestamp(0), cancelled(false) {}
BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b,
- const std::shared_ptr<MergeReplyGuard>& guard)
+ const std::shared_ptr<MergeReplyGuard>& guard) noexcept
: targetNode(t),
bucket(b),
timestamp(currentTime),
- _mergeReplyGuard(guard) {};
+ _mergeReplyGuard(guard),
+ cancelled(false)
+ {}
void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const;
uint16_t targetNode;
document::Bucket bucket;
uint64_t timestamp;
-
std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
+ bool cancelled;
};
struct EnqueuedBucketRecheck {
@@ -148,7 +152,7 @@ private:
static void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
uint16_t targetNode, BucketListMerger::BucketList& newList);
void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
- const std::shared_ptr<MergeReplyGuard>& mergeReplystatic );
+ const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard);
static void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
BucketListMerger::BucketList& existing);
void clearReadOnlyBucketRepoDatabases();
@@ -218,12 +222,10 @@ private:
std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
size_t _removed_buckets;
size_t _removed_documents;
- uint16_t _localIndex;
const lib::Distribution& _distribution;
const char* _upStates;
+ BucketOwnershipCalculator _ownership_calc;
bool _track_non_owned_entries;
- mutable uint64_t _cachedDecisionSuperbucket;
- mutable bool _cachedOwned;
};
using DistributionContexts = std::unordered_map<document::BucketSpace,