aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-08-24 09:20:19 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-09-07 13:25:35 +0000
commit6f3baf1022f84a11d136e4f04d1a8b35eb5d79e4 (patch)
tree88ba6ba3d9023af407a90aeb941af308f1999e37
parentdaba552c567f1fcb9e300ae65825c1d97cedbb5e (diff)
Wire distributor operation cancelling to state change edges
This introduces cancellation of pending operations/messages to content nodes in the following scenarios: * One or more content nodes become unavailable in a newly received cluster state version (triggered when first received, i.e. at the pending state start edge). * One or more nodes are removed from the distribution config. * The set of available distributors changes, which in turn changes the ownership of a fraction of the set of super buckets. Pending operations to buckets that were owned by the current distributor in the previous state, but not in the new state, are all cancelled. Introduce cancellation support for internal maintenance operations. As part of this, move `CancelScope` tracking out into the parent `Operation` class to unify cancellation tracking across both client and maintenance operations. Remove interface vs. impl indirection for `PersistenceMessageTracker` since it's only ever had a single implementation and it likely never will have another.
-rw-r--r--storage/src/tests/distributor/bucketstateoperationtest.cpp23
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test.cpp298
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp30
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h3
-rw-r--r--storage/src/tests/distributor/joinbuckettest.cpp28
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp24
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp51
-rw-r--r--storage/src/tests/distributor/removelocationtest.cpp2
-rw-r--r--storage/src/tests/distributor/splitbuckettest.cpp64
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp91
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h7
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def5
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp41
-rw-r--r--storage/src/vespa/storage/distributor/bucket_ownership_calculator.h44
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp115
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_interface.h7
-rw-r--r--storage/src/vespa/storage/distributor/messagetracker.h2
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.h16
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h14
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h21
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h9
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp68
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h62
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h5
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp72
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h22
48 files changed, 965 insertions, 327 deletions
diff --git a/storage/src/tests/distributor/bucketstateoperationtest.cpp b/storage/src/tests/distributor/bucketstateoperationtest.cpp
index c9fab0b37e5..44da88b4587 100644
--- a/storage/src/tests/distributor/bucketstateoperationtest.cpp
+++ b/storage/src/tests/distributor/bucketstateoperationtest.cpp
@@ -201,4 +201,27 @@ TEST_F(BucketStateOperationTest, bucket_db_not_updated_on_failure) {
EXPECT_FALSE(op.ok());
}
+TEST_F(BucketStateOperationTest, cancelled_node_does_not_update_bucket_db) {
+ document::BucketId bid(16, 1);
+ insertBucketInfo(bid, 0, 0xabc, 10, 1100, true, false);
+
+ BucketAndNodes bucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(0));
+ std::vector<uint16_t> active = {0};
+ SetBucketStateOperation op(dummy_cluster_context, bucketAndNodes, active);
+
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+ op.cancel(_sender, CancelScope::of_node_subset({0}));
+
+ ASSERT_EQ(_sender.commands().size(), 1);
+ std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ op.receive(_sender, reply);
+
+ BucketDatabase::Entry entry = getBucket(bid);
+ ASSERT_TRUE(entry.valid());
+ EXPECT_FALSE(entry->getNodeRef(0).active()); // Should not be updated
+ EXPECT_FALSE(op.ok());
+}
+
} // namespace storage::distributor
diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp
index 566fb704105..c87f5133997 100644
--- a/storage/src/tests/distributor/distributor_stripe_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test.cpp
@@ -14,6 +14,7 @@
#include <vespa/storageapi/message/visitor.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <gmock/gmock.h>
using document::Bucket;
@@ -47,8 +48,8 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
// Simple type aliases to make interfacing with certain utility functions
// easier. Note that this is only for readability and does not provide any
// added type safety.
- using NodeCount = int;
- using Redundancy = int;
+ using NodeCount = uint16_t;
+ using Redundancy = uint16_t;
using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
@@ -137,82 +138,114 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
return _stripe->handleMessage(msg);
}
- void configure_stale_reads_enabled(bool enabled) {
+ template <typename Func>
+ void configure_stripe_with(Func f) {
ConfigBuilder builder;
- builder.allowStaleReadsDuringClusterStateTransitions = enabled;
+ f(builder);
configure_stripe(builder);
}
+ void configure_stale_reads_enabled(bool enabled) {
+ configure_stripe_with([&](auto& builder) {
+ builder.allowStaleReadsDuringClusterStateTransitions = enabled;
+ });
+ }
+
void configure_update_fast_path_restart_enabled(bool enabled) {
- ConfigBuilder builder;
- builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
+ });
}
void configure_merge_operations_disabled(bool disabled) {
- ConfigBuilder builder;
- builder.mergeOperationsDisabled = disabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.mergeOperationsDisabled = disabled;
+ });
}
void configure_use_weak_internal_read_consistency(bool use_weak) {
- ConfigBuilder builder;
- builder.useWeakInternalReadConsistencyForClientGets = use_weak;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.useWeakInternalReadConsistencyForClientGets = use_weak;
+ });
}
void configure_metadata_update_phase_enabled(bool enabled) {
- ConfigBuilder builder;
- builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
+ });
}
void configure_prioritize_global_bucket_merges(bool enabled) {
- ConfigBuilder builder;
- builder.prioritizeGlobalBucketMerges = enabled;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.prioritizeGlobalBucketMerges = enabled;
+ });
}
void configure_max_activation_inhibited_out_of_sync_groups(uint32_t n_groups) {
- ConfigBuilder builder;
- builder.maxActivationInhibitedOutOfSyncGroups = n_groups;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.maxActivationInhibitedOutOfSyncGroups = n_groups;
+ });
}
void configure_implicitly_clear_priority_on_schedule(bool implicitly_clear) {
- ConfigBuilder builder;
- builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.implicitlyClearBucketPriorityOnSchedule = implicitly_clear;
+ });
}
void configure_use_unordered_merge_chaining(bool use_unordered) {
- ConfigBuilder builder;
- builder.useUnorderedMergeChaining = use_unordered;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.useUnorderedMergeChaining = use_unordered;
+ });
}
void configure_enable_two_phase_garbage_collection(bool use_two_phase) {
- ConfigBuilder builder;
- builder.enableTwoPhaseGarbageCollection = use_two_phase;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableTwoPhaseGarbageCollection = use_two_phase;
+ });
}
void configure_enable_condition_probing(bool enable_probing) {
- ConfigBuilder builder;
- builder.enableConditionProbing = enable_probing;
- configure_stripe(builder);
+ configure_stripe_with([&](auto& builder) {
+ builder.enableConditionProbing = enable_probing;
+ });
+ }
+
+ void configure_enable_operation_cancellation(bool enable_cancellation) {
+ configure_stripe_with([&](auto& builder) {
+ builder.enableOperationCancellation = enable_cancellation;
+ });
}
- bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
+ [[nodiscard]] bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
+ [[nodiscard]] bool distributor_owns_bucket_in_current_and_pending_states(document::BucketId bucket_id) const {
+ return (getDistributorBucketSpace().get_bucket_ownership_flags(bucket_id).owned_in_pending_state() &&
+ getDistributorBucketSpace().check_ownership_in_pending_and_current_state(bucket_id).isOwned());
+ }
+
void configureMaxClusterClockSkew(int seconds);
void configure_mutation_sequencing(bool enabled);
void configure_merge_busy_inhibit_duration(int seconds);
void set_up_and_start_get_op_with_stale_reads_enabled(bool enabled);
+ void simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending);
+ static std::shared_ptr<api::RemoveReply> make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd);
+
+ // TODO dedupe
+ auto sent_get_command(size_t idx) { return sent_command<api::GetCommand>(idx); }
+
+ auto make_get_reply(size_t idx, api::Timestamp ts, bool is_tombstone, bool condition_matched) {
+ return std::make_shared<api::GetReply>(*sent_get_command(idx), std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ void set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx);
+ void do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state);
+ void do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state);
};
DistributorStripeTest::DistributorStripeTest()
@@ -224,6 +257,34 @@ DistributorStripeTest::DistributorStripeTest()
DistributorStripeTest::~DistributorStripeTest() = default;
+void
+DistributorStripeTest::simulate_cluster_state_transition(const vespalib::string& state_str, bool clear_pending)
+{
+ simulate_set_pending_cluster_state(state_str);
+ if (clear_pending) {
+ enable_cluster_state(state_str);
+ clear_pending_cluster_state_bundle();
+ }
+}
+
+std::shared_ptr<api::RemoveReply>
+DistributorStripeTest::make_remove_reply_with_bucket_remap(api::StorageCommand& originator_cmd)
+{
+ auto& cmd_as_remove = dynamic_cast<api::RemoveCommand&>(originator_cmd);
+ auto reply = std::dynamic_pointer_cast<api::RemoveReply>(std::shared_ptr<api::StorageReply>(cmd_as_remove.makeReply()));
+ reply->setOldTimestamp(100);
+ // Including a bucket remapping as part of the response is a pragmatic way to avoid false
+ // negatives when testing whether cancelled operations may mutate the DB. This is because
+ // non-remapped buckets are not created in the DB if they are already removed (which will
+ // be the case after bucket pruning on a cluster state change), but remapped buckets _are_
+ // implicitly created upon insert.
+ // We expect the original bucket is 16 bits and fake a remap to a split bucket one level
+ // below the original bucket.
+ reply->remapBucketId(BucketId(17, (cmd_as_remove.getBucketId().getId() & 0xFFFF) | 0x10000));
+ reply->setBucketInfo(api::BucketInfo(0x1234, 2, 300));
+ return reply;
+}
+
TEST_F(DistributorStripeTest, operation_generation)
{
setup_stripe(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
@@ -455,8 +516,7 @@ TEST_F(DistributorStripeTest, no_db_resurrection_for_bucket_not_owned_in_pending
simulate_set_pending_cluster_state("storage:10 distributor:10");
document::BucketId nonOwnedBucket(16, 3);
- EXPECT_FALSE(getDistributorBucketSpace().get_bucket_ownership_flags(nonOwnedBucket).owned_in_pending_state());
- EXPECT_FALSE(getDistributorBucketSpace().check_ownership_in_pending_and_current_state(nonOwnedBucket).isOwned());
+ ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(nonOwnedBucket));
std::vector<BucketCopy> copies;
copies.emplace_back(1234, 0, api::BucketInfo(0x567, 1, 2));
@@ -1052,4 +1112,168 @@ TEST_F(DistributorStripeTest, enable_condition_probing_config_is_propagated_to_i
EXPECT_TRUE(getConfig().enable_condition_probing());
}
+TEST_F(DistributorStripeTest, enable_operation_cancellation_config_is_propagated_to_internal_config) {
+ setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ EXPECT_FALSE(getConfig().enable_operation_cancellation()); // TODO switch default once ready
+
+ configure_enable_operation_cancellation(false);
+ EXPECT_FALSE(getConfig().enable_operation_cancellation());
+
+ configure_enable_operation_cancellation(true);
+ EXPECT_TRUE(getConfig().enable_operation_cancellation());
+}
+
+TEST_F(DistributorStripeTest, cluster_state_node_down_edge_cancels_pending_operations_on_unavailable_nodes) {
+ setup_stripe(Redundancy(1), NodeCount(1), "version:1 distributor:1 storage:1");
+ configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled
+ addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t");
+
+ stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1}
+ ASSERT_EQ(_sender.getCommands(true), "Remove => 0");
+
+ // Oh no, node 0 goes down while we have a pending operation!
+ simulate_cluster_state_transition("version:2 distributor:1 storage:1 .0.s:d", true);
+ EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(16, 1))); // Implicitly cleared
+
+ auto reply = make_remove_reply_with_bucket_remap(*_sender.command(0));
+ // Before we receive the reply, node 0 is back online. Even though the node is available in the
+ // cluster state, we should not apply its bucket info to our DB, as it may represent stale
+ // information (it's possible that our node froze up and that another distributor took over and
+ // mutated the bucket in the meantime; a classic ABA scenario).
+ simulate_cluster_state_transition("version:5 distributor:1 storage:1", true);
+
+ _stripe->handleReply(std::move(reply));
+ EXPECT_EQ("NONEXISTING", dumpBucket(BucketId(17, 0x10001)));
+}
+
+TEST_F(DistributorStripeTest, distribution_config_change_edge_cancels_pending_operations_on_unavailable_nodes) {
+ setup_stripe(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+ configure_enable_operation_cancellation(true); // Test will fail without cancellation enabled
+ addNodesToBucketDB(BucketId(16, 1), "0=3/4/5/t,1=3/4/5/t");
+
+ stripe_handle_message(makeDummyRemoveCommand()); // Remove is for bucket {16, 1}
+ ASSERT_EQ(_sender.getCommands(true), "Remove => 0,Remove => 1");
+
+ // Node 1 is configured away; only node 0 remains. This is expected to be closely followed by
+ // (or--depending on the timing of operations in the cluster--preceded by) a cluster state with
+ // the node marked as down, but the ordering is not guaranteed.
+ auto new_config = make_default_distribution_config(1, 1);
+ simulate_distribution_config_change(std::move(new_config));
+
+ auto node_0_reply = make_remove_reply_with_bucket_remap(*_sender.command(0));
+ auto node_1_reply = make_remove_reply_with_bucket_remap(*_sender.command(1));
+
+ _stripe->handleReply(std::move(node_0_reply));
+ _stripe->handleReply(std::move(node_1_reply));
+
+ // Only node 0 should be present in the DB
+ EXPECT_EQ("BucketId(0x4000000000000001) : " // Original bucket
+ "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=true,active=false,ready=false)",
+ dumpBucket(BucketId(16, 1)));
+ EXPECT_EQ("BucketId(0x4400000000010001) : " // Remapped bucket
+ "node(idx=0,crc=0x1234,docs=2/2,bytes=300/300,trusted=true,active=false,ready=false)",
+ dumpBucket(BucketId(17, 0x10001)));
+}
+
+void DistributorStripeTest::set_up_for_bucket_ownership_cancellation(uint32_t superbucket_idx) {
+ setup_stripe(Redundancy(1), NodeCount(10), "version:1 distributor:2 storage:2");
+ configure_stripe_with([](auto& builder) {
+ builder.enableConditionProbing = true;
+ builder.enableOperationCancellation = true;
+ });
+
+ NodeSupportedFeatures features;
+ features.document_condition_probe = true;
+ set_node_supported_features(0, features);
+ set_node_supported_features(1, features);
+
+ // Note: replicas are intentionally out of sync to trigger a write-repair.
+ addNodesToBucketDB(BucketId(16, superbucket_idx), "0=3/4/5,1=4/5/6");
+}
+
+namespace {
+
+std::shared_ptr<api::RemoveCommand> make_conditional_remove_request(uint32_t superbucket_idx) {
+ auto client_remove = std::make_shared<api::RemoveCommand>(
+ makeDocumentBucket(document::BucketId(0)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%u:foo", superbucket_idx)),
+ api::Timestamp(0));
+ client_remove->setCondition(documentapi::TestAndSetCondition("foo.bar==baz"));
+ return client_remove;
+}
+
+}
+
+void DistributorStripeTest::do_test_cancelled_pending_op_with_bucket_ownership_change(bool clear_pending_state) {
+ constexpr uint32_t superbucket_idx = 3;
+ const BucketId bucket_id(16, superbucket_idx);
+ set_up_for_bucket_ownership_cancellation(superbucket_idx);
+ // To actually check if cancellation is happening, we need to trigger a code path that
+ // is only covered by cancellation and not the legacy "check buckets at DB insert time"
+ // logic. The latter would give a false negative.
+ stripe_handle_message(make_conditional_remove_request(superbucket_idx));
+ ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1"); // Condition probes, thunder cats go!
+
+ simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state);
+ ASSERT_FALSE(distributor_owns_bucket_in_current_and_pending_states(bucket_id));
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // Should have been pruned
+
+ _stripe->handleReply(make_get_reply(0, 100, false, true));
+ _stripe->handleReply(make_get_reply(1, 100, false, true));
+
+ // Condition probe was successful, but operation is cancelled and shall not continue.
+ ASSERT_EQ(_sender.getCommands(true, false, 2), "");
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:foo:testdoctype1:n=3:foo, timestamp 1, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket_id)); // And definitely no resurrection
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_pending_case) {
+ do_test_cancelled_pending_op_with_bucket_ownership_change(false);
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_cancels_pending_operations_for_non_owned_buckets_not_pending_case) {
+ do_test_cancelled_pending_op_with_bucket_ownership_change(true);
+}
+
+void DistributorStripeTest::do_test_not_cancelled_pending_op_without_bucket_ownership_change(bool clear_pending_state) {
+ constexpr uint32_t superbucket_idx = 14;
+ const BucketId bucket_id(16, superbucket_idx);
+ set_up_for_bucket_ownership_cancellation(superbucket_idx);
+
+ stripe_handle_message(make_conditional_remove_request(superbucket_idx));
+ ASSERT_EQ(_sender.getCommands(true), "Get => 0,Get => 1");
+
+ simulate_cluster_state_transition("version:2 distributor:10 storage:10", clear_pending_state);
+ ASSERT_TRUE(distributor_owns_bucket_in_current_and_pending_states(bucket_id));
+ EXPECT_EQ("BucketId(0x400000000000000e) : "
+ "node(idx=0,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x4,docs=5/5,bytes=6/6,trusted=false,active=false,ready=false)",
+ dumpBucket(bucket_id)); // Should _not_ have been pruned
+
+ _stripe->handleReply(make_get_reply(0, 100, false, true));
+ _stripe->handleReply(make_get_reply(1, 100, false, true));
+
+ // Operation can proceed as planned as it has not been cancelled
+ ASSERT_EQ(_sender.getCommands(true, false, 2), "Remove => 0,Remove => 1");
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_pending_case) {
+ do_test_not_cancelled_pending_op_without_bucket_ownership_change(false);
+}
+
+TEST_F(DistributorStripeTest, bucket_ownership_change_does_not_cancel_pending_operations_for_owned_buckets_not_pending_case) {
+ do_test_not_cancelled_pending_op_without_bucket_ownership_change(true);
+}
+
+// TODO we do not have good handling of bucket ownership changes combined with
+// distribution config changes... Hard to remove all such edge cases unless we
+// make state+config change an atomic operation initiated by the cluster controller
+// (hint: we should do this).
+
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index 5babde49380..ba6c4cb4ac4 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -72,7 +72,7 @@ DistributorStripeTestUtil::setup_stripe(int redundancy, int node_count, const li
// explicitly (which is what happens in "real life"), that is what would
// take place.
// The inverse case of this can be explicitly accomplished by calling
- // triggerDistributionChange().
+ // trigger_distribution_change().
// This isn't pretty, folks, but it avoids breaking the world for now,
// as many tests have implicit assumptions about this being the behavior.
auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution));
@@ -93,10 +93,31 @@ void
DistributorStripeTestUtil::trigger_distribution_change(lib::Distribution::SP distr)
{
_node->getComponentRegister().setDistribution(distr);
- auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(distr);
+ auto new_config = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distr));
_stripe->update_distribution_config(new_config);
}
+void
+DistributorStripeTestUtil::simulate_distribution_config_change(std::shared_ptr<lib::Distribution> new_config)
+{
+ trigger_distribution_change(std::move(new_config));
+ _stripe->notifyDistributionChangeEnabled();
+ for (auto& space : _stripe->getBucketSpaceRepo()) {
+ auto cur_state = current_cluster_state_bundle().getDerivedClusterState(space.first); // no change in state itself
+ _stripe->remove_superfluous_buckets(space.first, *cur_state, true);
+ }
+}
+
+std::shared_ptr<lib::Distribution>
+DistributorStripeTestUtil::make_default_distribution_config(uint16_t redundancy, uint16_t node_count)
+{
+ lib::Distribution::DistributionConfigBuilder config(lib::Distribution::getDefaultDistributionConfig(redundancy, node_count).get());
+ config.redundancy = redundancy;
+ config.initialRedundancy = redundancy;
+ config.ensurePrimaryPersisted = true;
+ return std::make_shared<lib::Distribution>(config);
+}
+
std::shared_ptr<DistributorConfiguration>
DistributorStripeTestUtil::make_config() const
{
@@ -416,6 +437,11 @@ DistributorStripeTestUtil::getDistributorBucketSpace() {
return getBucketSpaceRepo().get(makeBucketSpace());
}
+const DistributorBucketSpace&
+DistributorStripeTestUtil::getDistributorBucketSpace() const {
+ return getBucketSpaceRepo().get(makeBucketSpace());
+}
+
BucketDatabase&
DistributorStripeTestUtil::getBucketDatabase() {
return getDistributorBucketSpace().getBucketDatabase();
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index 272301bf4a6..2892cec6fcf 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -138,6 +138,7 @@ public:
// TODO explicit notion of bucket spaces for tests
DistributorBucketSpace& getDistributorBucketSpace();
+ const DistributorBucketSpace& getDistributorBucketSpace() const;
BucketDatabase& getBucketDatabase(); // Implicit default space only
BucketDatabase& getBucketDatabase(document::BucketSpace space);
const BucketDatabase& getBucketDatabase() const; // Implicit default space only
@@ -175,6 +176,8 @@ public:
void set_redundancy(uint32_t redundancy);
void trigger_distribution_change(std::shared_ptr<lib::Distribution> distr);
+ void simulate_distribution_config_change(std::shared_ptr<lib::Distribution> new_config);
+ static std::shared_ptr<lib::Distribution> make_default_distribution_config(uint16_t redundancy, uint16_t node_count);
using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
diff --git a/storage/src/tests/distributor/joinbuckettest.cpp b/storage/src/tests/distributor/joinbuckettest.cpp
index 570fe24679e..ae389ecd6c2 100644
--- a/storage/src/tests/distributor/joinbuckettest.cpp
+++ b/storage/src/tests/distributor/joinbuckettest.cpp
@@ -109,4 +109,32 @@ TEST_F(JoinOperationTest, send_sparse_joins_to_nodes_without_both_source_buckets
ASSERT_NO_FATAL_FAILURE(checkSourceBucketsAndSendReply(op, 1, {{33, 1}, {33, 1}}));
}
+TEST_F(JoinOperationTest, cancelled_node_does_not_update_bucket_db) {
+ auto cfg = make_config();
+ cfg->setJoinCount(100);
+ cfg->setJoinSize(1000);
+ configure_stripe(cfg);
+
+ addNodesToBucketDB(document::BucketId(33, 1), "0=250/50/300");
+ addNodesToBucketDB(document::BucketId(33, 0x100000001), "0=300/40/200");
+ enable_cluster_state("distributor:1 storage:1");
+
+ JoinOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(32, 0)), toVector<uint16_t>(0)),
+ {document::BucketId(33, 1), document::BucketId(33, 0x100000001)});
+
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+
+ op.cancel(_sender, CancelScope::of_node_subset({0}));
+
+ checkSourceBucketsAndSendReply(op, 0, {{33, 1}, {33, 0x100000001}});
+
+ // DB is not touched, so source buckets remain unchanged and target buckets is not created
+ EXPECT_TRUE(getBucket(document::BucketId(33, 0x100000001)).valid());
+ EXPECT_TRUE(getBucket(document::BucketId(33, 1)).valid());
+ EXPECT_FALSE(getBucket(document::BucketId(32, 0)).valid());
+ EXPECT_FALSE(op.ok());
+}
+
}
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 512c092d8ae..12280980998 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -548,8 +548,7 @@ TEST_F(MergeOperationTest, merge_operation_is_not_blocked_by_request_bucket_info
EXPECT_FALSE(op.isBlocked(operation_context(), _operation_sequencer));
}
-TEST_F(MergeOperationTest, on_blocked_updates_metrics)
-{
+TEST_F(MergeOperationTest, on_blocked_updates_metrics) {
auto op = setup_minimal_merge_op();
auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET];
EXPECT_EQ(0, metrics->blocked.getValue());
@@ -557,8 +556,7 @@ TEST_F(MergeOperationTest, on_blocked_updates_metrics)
EXPECT_EQ(1, metrics->blocked.getValue());
}
-TEST_F(MergeOperationTest, on_throttled_updates_metrics)
-{
+TEST_F(MergeOperationTest, on_throttled_updates_metrics) {
auto op = setup_minimal_merge_op();
auto metrics = getIdealStateManager().getMetrics().operations[IdealStateOperation::MERGE_BUCKET];
EXPECT_EQ(0, metrics->throttled.getValue());
@@ -628,4 +626,22 @@ TEST_F(MergeOperationTest, delete_bucket_priority_is_capped_to_feed_pri_120) {
EXPECT_EQ(int(del_cmd->getPriority()), 120);
}
+TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_fully_cancelled) {
+ auto op = setup_simple_merge_op();
+ ASSERT_NO_FATAL_FAILURE(assert_simple_merge_bucket_command());
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ sendReply(*op);
+ EXPECT_EQ(_sender.getCommands(true, false, 1), ""); // nothing more
+ EXPECT_FALSE(op->ok());
+}
+
+TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) {
+ auto op = setup_simple_merge_op(); // to nodes, 0, 2, 1 (source only)
+ ASSERT_NO_FATAL_FAILURE(assert_simple_merge_bucket_command());
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+ sendReply(*op);
+ EXPECT_EQ(_sender.getCommands(true, false, 1), ""); // nothing more
+ EXPECT_FALSE(op->ok());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index 68d86884036..b9d1d804761 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -24,6 +24,14 @@ struct RemoveBucketOperationTest : Test, DistributorStripeTestUtil {
void TearDown() override {
close();
}
+
+ void reject_with_bucket_info(RemoveBucketOperation& op, size_t msg_index) {
+ std::shared_ptr<api::StorageCommand> msg2 = _sender.command(msg_index);
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
+ dynamic_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(api::BucketInfo(10, 200, 1));
+ reply->setResult(api::ReturnCode::REJECTED);
+ op.receive(_sender, reply);
+ }
};
TEST_F(RemoveBucketOperationTest, simple) {
@@ -36,11 +44,10 @@ TEST_F(RemoveBucketOperationTest, simple) {
RemoveBucketOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
- toVector<uint16_t>(1,2)));
+ toVector<uint16_t>(1, 2)));
op.setIdealStateManager(&getIdealStateManager());
op.start(_sender);
-
ASSERT_EQ("Delete bucket => 1,"
"Delete bucket => 2",
_sender.getCommands(true));
@@ -75,16 +82,11 @@ TEST_F(RemoveBucketOperationTest, bucket_info_mismatch_failure) {
ASSERT_EQ("Delete bucket => 1", _sender.getCommands(true));
ASSERT_EQ(1, _sender.commands().size());
- std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
- std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
- dynamic_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(
- api::BucketInfo(10, 100, 1));
- reply->setResult(api::ReturnCode::REJECTED);
- op.receive(_sender, reply);
+ reject_with_bucket_info(op, 0);
// RemoveBucketOperation should reinsert bucketinfo into database
ASSERT_EQ("BucketId(0x4000000000000001) : "
- "node(idx=1,crc=0xa,docs=100/100,bytes=1/1,trusted=true,active=false,ready=false)",
+ "node(idx=1,crc=0xa,docs=200/200,bytes=1/1,trusted=true,active=false,ready=false)",
dumpBucket(document::BucketId(16, 1)));
}
@@ -130,4 +132,35 @@ TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_targ
EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120));
}
+TEST_F(RemoveBucketOperationTest, cancelled_node_does_not_update_bucket_db_upon_rejection) {
+ addNodesToBucketDB(document::BucketId(16, 1),
+ "0=10/100/1/t,"
+ "1=10/100/1/t,"
+ "2=10/100/1/t");
+ set_redundancy(1);
+ enable_cluster_state("distributor:1 storage:3");
+
+ RemoveBucketOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ toVector<uint16_t>(1,2)));
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+
+ ASSERT_EQ("Delete bucket => 1,"
+ "Delete bucket => 2",
+ _sender.getCommands(true));
+
+ op.cancel(_sender, CancelScope::of_node_subset({1}));
+
+ // Rejections will by default reinsert the bucket into the DB with the bucket info contained
+ // in the reply, but here the node is cancelled and should therefore not be reinserted.
+ reject_with_bucket_info(op, 0);
+ sendReply(op, 1);
+ // Node 1 not reinserted
+ ASSERT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1/1,trusted=true,active=false,ready=false)",
+ dumpBucket(document::BucketId(16, 1)));
+ EXPECT_FALSE(op.ok());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp
index b19a448199b..be688e64a8b 100644
--- a/storage/src/tests/distributor/removelocationtest.cpp
+++ b/storage/src/tests/distributor/removelocationtest.cpp
@@ -67,4 +67,6 @@ TEST_F(RemoveLocationOperationTest, simple) {
_sender.getLastReply());
}
+// TODO test cancellation (implicitly covered via operation PersistenceMessageTracker)
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/splitbuckettest.cpp b/storage/src/tests/distributor/splitbuckettest.cpp
index edb392d9532..05a13f67bc9 100644
--- a/storage/src/tests/distributor/splitbuckettest.cpp
+++ b/storage/src/tests/distributor/splitbuckettest.cpp
@@ -48,14 +48,14 @@ SplitOperationTest::SplitOperationTest()
}
namespace {
- api::StorageMessageAddress _Storage0Address(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 0);
+ api::StorageMessageAddress _storage0Address(dummy_cluster_context.cluster_name_ptr(), lib::NodeType::STORAGE, 0);
}
TEST_F(SplitOperationTest, simple) {
enable_cluster_state("distributor:1 storage:1");
insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000,
- tooLargeBucketSize, 250);
+ tooLargeBucketSize, true);
SplitOperation op(dummy_cluster_context,
BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
@@ -72,10 +72,10 @@ TEST_F(SplitOperationTest, simple) {
std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET);
- EXPECT_EQ(_Storage0Address.toString(),
+ EXPECT_EQ(_storage0Address.toString(),
msg->getAddress()->toString());
- std::shared_ptr<api::StorageReply> reply(msg->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
auto* sreply = static_cast<api::SplitBucketReply*>(reply.get());
sreply->getSplitInfo().emplace_back(document::BucketId(17, 1),
@@ -142,19 +142,15 @@ TEST_F(SplitOperationTest, multi_node_failure) {
{
std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET);
- EXPECT_EQ(_Storage0Address.toString(),
- msg->getAddress()->toString());
+ EXPECT_EQ(_storage0Address.toString(), msg->getAddress()->toString());
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ auto& sreply = dynamic_cast<api::SplitBucketReply&>(*reply);
- auto* sreply = static_cast<api::SplitBucketReply*>(msg->makeReply().release());
- sreply->setResult(api::ReturnCode::OK);
+ sreply.setResult(api::ReturnCode::OK);
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000));
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000));
- sreply->getSplitInfo().emplace_back(document::BucketId(17, 1),
- api::BucketInfo(100, 600, 5000000));
-
- sreply->getSplitInfo().emplace_back(document::BucketId(17, 0x10001),
- api::BucketInfo(110, 400, 6000000));
-
- op.receive(_sender, std::shared_ptr<api::StorageReply>(sreply));
+ op.receive(_sender, reply);
}
sendReply(op, 1, api::ReturnCode::NOT_CONNECTED);
@@ -230,7 +226,7 @@ TEST_F(SplitOperationTest, copy_trusted_status_not_carried_over_after_split) {
for (int i = 0; i < 2; ++i) {
std::shared_ptr<api::StorageCommand> msg = _sender.command(i);
ASSERT_EQ(msg->getType(), api::MessageType::SPLITBUCKET);
- std::shared_ptr<api::StorageReply> reply(msg->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
auto* sreply = static_cast<api::SplitBucketReply*>(reply.get());
// Make sure copies differ so they cannot become implicitly trusted.
@@ -271,7 +267,7 @@ TEST_F(SplitOperationTest, operation_blocked_by_pending_join) {
};
auto joinCmd = std::make_shared<api::JoinBucketsCommand>(makeDocumentBucket(joinTarget));
joinCmd->getSourceBuckets() = joinSources;
- joinCmd->setAddress(_Storage0Address);
+ joinCmd->setAddress(_storage0Address);
pending_message_tracker().insert(joinCmd);
@@ -307,7 +303,7 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
enable_cluster_state("distributor:1 storage:2");
document::BucketId source_bucket(16, 1);
- insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, 250);
+ insertBucketInfo(source_bucket, 0, 0xabc, 1000, tooLargeBucketSize, true);
SplitOperation op(dummy_cluster_context, BucketAndNodes(makeDocumentBucket(source_bucket), toVector<uint16_t>(0)),
maxSplitBits, splitCount, splitByteSize);
@@ -318,4 +314,36 @@ TEST_F(SplitOperationTest, split_is_blocked_by_locked_bucket) {
EXPECT_TRUE(op.isBlocked(operation_context(), op_seq));
}
+TEST_F(SplitOperationTest, cancelled_node_does_not_update_bucket_db) {
+ enable_cluster_state("distributor:1 storage:1");
+ insertBucketInfo(document::BucketId(16, 1), 0, 0xabc, 1000, tooLargeBucketSize, true);
+
+ SplitOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0)),
+ maxSplitBits, splitCount, splitByteSize);
+
+ op.setIdealStateManager(&getIdealStateManager());
+ op.start(_sender);
+
+ op.cancel(_sender, CancelScope::of_node_subset({0}));
+
+ {
+ ASSERT_EQ(_sender.commands().size(), 1);
+ std::shared_ptr<api::StorageCommand> msg = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ auto& sreply = dynamic_cast<api::SplitBucketReply&>(*reply);
+
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 1), api::BucketInfo(100, 600, 5000000));
+ sreply.getSplitInfo().emplace_back(document::BucketId(17, 0x10001), api::BucketInfo(110, 400, 6000000));
+ op.receive(_sender, reply);
+ }
+
+ // DB is not touched, so source bucket remains (will be removed during actual operation)
+ // while target buckets are not created
+ EXPECT_TRUE(getBucket(document::BucketId(16, 1)).valid());
+ EXPECT_FALSE(getBucket(document::BucketId(17, 0x00001)).valid());
+ EXPECT_FALSE(getBucket(document::BucketId(17, 0x10001)).valid());
+ EXPECT_FALSE(op.ok());
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index d3af4cd564a..f3aa9c2eb92 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -1056,7 +1056,32 @@ TEST_F(TopLevelBucketDBUpdaterTest, recheck_node) {
const BucketCopy* copy = entry->getNode(1);
ASSERT_TRUE(copy != nullptr);
- EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo());
+ EXPECT_EQ(api::BucketInfo(20, 10, 12, 50, 60, true, true), copy->getBucketInfo());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, cancelled_pending_recheck_command_does_not_update_db) {
+ ASSERT_NO_FATAL_FAILURE(initialize_nodes_and_buckets(3, 5));
+ _sender.clear();
+
+ auto bucket = makeDocumentBucket(document::BucketId(16, 3));
+ auto& stripe_bucket_db_updater = stripe_of_bucket(bucket.getBucketId()).bucket_db_updater();
+ stripe_bucket_db_updater.recheckBucketInfo(0, bucket);
+
+ ASSERT_EQ(_sender.getCommands(true), "Request bucket info => 0");
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0));
+
+ ASSERT_TRUE(stripe_bucket_db_updater.cancel_message_by_id(req.getMsgId()));
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(req);
+ reply->getBucketInfo().emplace_back(document::BucketId(16, 3), api::BucketInfo(20, 10, 12, 50, 60, true, true));
+ stripe_bucket_db_updater.onRequestBucketInfoReply(reply);
+
+ BucketDatabase::Entry entry = get_bucket(bucket);
+ ASSERT_TRUE(entry.valid());
+ const BucketCopy* copy = entry->getNode(0);
+ ASSERT_TRUE(copy != nullptr);
+ // Existing bucket info not modified by reply (0xa ... is the initialized test state).
+ EXPECT_EQ(api::BucketInfo(0xa, 1, 1, 1, 1, false, false), copy->getBucketInfo());
}
TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) {
@@ -1217,11 +1242,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) {
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- std::vector<api::MergeBucketCommand::Node> nodes;
- nodes.emplace_back(0);
- nodes.emplace_back(1);
- nodes.emplace_back(2);
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
@@ -1248,19 +1269,15 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) {
"node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), "
"node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)",
dump_bucket(bucket_id));
-};
+}
TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) {
enable_distributor_cluster_state("distributor:1 storage:3");
- std::vector<api::MergeBucketCommand::Node> nodes;
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- for (uint32_t i = 0; i < 3; ++i) {
- nodes.emplace_back(i);
- }
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
@@ -1287,19 +1304,15 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) {
"node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
"node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
dump_bucket(bucket_id));
-};
+}
TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
enable_distributor_cluster_state("distributor:1 storage:3");
- std::vector<api::MergeBucketCommand::Node> nodes;
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- for (uint32_t i = 0; i < 3; ++i) {
- nodes.emplace_back(i);
- }
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
@@ -1326,7 +1339,41 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
"node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
"node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
dump_bucket(bucket_id));
-};
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_triggered_bucket_info_request_not_sent_to_cancelled_nodes) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ document::BucketId bucket_id(16, 1234);
+ // DB has one bucket with 3 mutually out of sync replicas. Node 0 is explicitly tagged as Active;
+ // this is done to prevent the distributor from scheduling a bucket activation as its first task.
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=10/1/1/t/a,1=20/1/1/t,2=30/1/1/t");
+
+ // Poke at the business end of the stripe until it has scheduled a merge for the inconsistent bucket
+ ASSERT_EQ(_sender.commands().size(), 0u);
+ const int max_tick_tries = 20;
+ for (int i = 0; i <= max_tick_tries; ++i) {
+ stripe_of_bucket(bucket_id).tick();
+ if (!_sender.commands().empty()) {
+ continue;
+ }
+ if (i == max_tick_tries) {
+ FAIL() << "no merge sent after ticking " << max_tick_tries << " times";
+ }
+ }
+ ASSERT_EQ(_sender.getCommands(true), "Merge bucket => 0");
+
+ auto cmd = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]);
+ _sender.commands().clear();
+ auto reply = std::make_shared<api::MergeBucketReply>(*cmd);
+
+ auto op = stripe_of_bucket(bucket_id).maintenance_op_from_message_id(cmd->getMsgId());
+ ASSERT_TRUE(op);
+
+ op->cancel(_sender, CancelScope::of_node_subset({0, 2}));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+ // RequestBucketInfo only sent to node 1
+ ASSERT_EQ(_sender.getCommands(true), "Request bucket info => 1");
+}
TEST_F(TopLevelBucketDBUpdaterTest, flush) {
enable_distributor_cluster_state("distributor:1 storage:3");
@@ -1335,11 +1382,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, flush) {
document::BucketId bucket_id(16, 1234);
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
- std::vector<api::MergeBucketCommand::Node> nodes;
- for (uint32_t i = 0; i < 3; ++i) {
- nodes.emplace_back(i);
- }
-
+ std::vector<api::MergeBucketCommand::Node> nodes{{0}, {1}, {2}};
api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
auto reply = std::make_shared<api::MergeBucketReply>(cmd);
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index d6d21c89b68..959fdc612cb 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -54,6 +54,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_inhibit_default_merges_when_global_merges_pending(false),
_enable_two_phase_garbage_collection(false),
_enable_condition_probing(false),
+ _enable_operation_cancellation(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -179,6 +180,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_inhibit_default_merges_when_global_merges_pending = config.inhibitDefaultMergesWhenGlobalMergesPending;
_enable_two_phase_garbage_collection = config.enableTwoPhaseGarbageCollection;
_enable_condition_probing = config.enableConditionProbing;
+ _enable_operation_cancellation = config.enableOperationCancellation;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index e3664276518..6056f0a1d5c 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -293,6 +293,12 @@ public:
[[nodiscard]] bool enable_condition_probing() const noexcept {
return _enable_condition_probing;
}
+ void set_enable_operation_cancellation(bool enable) noexcept {
+ _enable_operation_cancellation = enable;
+ }
+ [[nodiscard]] bool enable_operation_cancellation() const noexcept {
+ return _enable_operation_cancellation;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -354,6 +360,7 @@ private:
bool _inhibit_default_merges_when_global_merges_pending;
bool _enable_two_phase_garbage_collection;
bool _enable_condition_probing;
+ bool _enable_operation_cancellation;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index 95461eb5dc2..debbe443b31 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -313,3 +313,8 @@ enable_two_phase_garbage_collection bool default=true
## replicas will trigger an implicit distributed condition probe to resolve the outcome of
## the condition across all divergent replicas.
enable_condition_probing bool default=true
+
+## If true, changes in the cluster where a subset of the nodes become unavailable or buckets
+## change ownership between distributors will trigger an explicit cancellation of all pending
+## requests partially or fully "invalidated" by such a change.
+enable_operation_cancellation bool default=false
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index c889afcc77c..16a4fb5691f 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(storage_distributor OBJECT
activecopy.cpp
blockingoperationstarter.cpp
bucket_db_prune_elision.cpp
+ bucket_ownership_calculator.cpp
bucket_space_distribution_configs.cpp
bucket_space_distribution_context.cpp
bucket_space_state_map.cpp
diff --git a/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp
new file mode 100644
index 00000000000..6f94235e548
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.cpp
@@ -0,0 +1,41 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "bucket_ownership_calculator.h"
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+
+namespace storage::distributor {
+
+namespace {
+
+uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
+ // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
+ return id.getRawId() & ~(UINT64_MAX << distribution_bits);
+}
+
+}
+
+bool
+BucketOwnershipCalculator::this_distributor_owns_bucket(const document::BucketId& bucket_id) const
+{
+ // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
+ // TODO "too few bits used" case can be cheaply checked without needing exception
+ try {
+ const auto bits = _state.getDistributionBitCount();
+ const auto this_superbucket = superbucket_from_id(bucket_id, bits);
+ if (_cached_decision_superbucket == this_superbucket) {
+ return _cached_owned;
+ }
+ uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucket_id, "uim");
+ _cached_decision_superbucket = this_superbucket;
+ _cached_owned = (distributor == _this_node_index);
+ return _cached_owned;
+ } catch (lib::TooFewBucketBitsInUseException&) {
+ // Ignore; implicitly not owned
+ } catch (lib::NoDistributorsAvailableException&) {
+ // Ignore; implicitly not owned
+ }
+ return false;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h
new file mode 100644
index 00000000000..b67bb41e85d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/bucket_ownership_calculator.h
@@ -0,0 +1,44 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <cstdint>
+
+namespace document { class BucketId; }
+
+namespace storage::lib {
+class ClusterState;
+class Distribution;
+}
+
+namespace storage::distributor {
+
+/**
+ * Calculator for determining if a bucket is owned by the current distributor.
+ * Ideal state calculations are cached and reused for all consecutive sub-buckets
+ * under the same super bucket. The cache is invalidated when a new super bucket
+ * is encountered, so it only provides a benefit when invoked in bucket ID order.
+ *
+ * Not thread safe due to internal caching.
+ */
+class BucketOwnershipCalculator {
+ const lib::ClusterState& _state;
+ const lib::Distribution& _distribution;
+ mutable uint64_t _cached_decision_superbucket;
+ const uint16_t _this_node_index;
+ mutable bool _cached_owned;
+public:
+ BucketOwnershipCalculator(const lib::ClusterState& state,
+ const lib::Distribution& distribution,
+ uint16_t this_node_index) noexcept
+ : _state(state),
+ _distribution(distribution),
+ _cached_decision_superbucket(UINT64_MAX),
+ _this_node_index(this_node_index),
+ _cached_owned(false)
+ {
+ }
+
+ [[nodiscard]] bool this_distributor_owns_bucket(const document::BucketId& bucket_id) const;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index ad1cce46bea..ac5cb740361 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -16,6 +16,8 @@
#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
+#include <vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h>
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/memoryusage.h>
@@ -177,6 +179,12 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM
return true;
}
+std::shared_ptr<Operation>
+DistributorStripe::maintenance_op_from_message_id(uint64_t msg_id) const noexcept
+{
+ return _maintenanceOperationOwner.find_by_id(msg_id);
+}
+
void
DistributorStripe::handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>& reply)
{
@@ -210,6 +218,8 @@ DistributorStripe::handleReply(const std::shared_ptr<api::StorageReply>& reply)
bucket.getBucketId() != document::BucketId(0) &&
reply->getAddress())
{
+ // Won't be triggered for replies of cancelled ops since they will be missing
+ // from `_pendingMessageTracker` and thus `bucket` will be zero.
recheckBucketInfo(reply->getAddress()->getIndex(), bucket);
}
@@ -271,18 +281,82 @@ DistributorStripe::getClusterStateBundle() const
}
void
-DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state)
+DistributorStripe::cancel_single_message_by_id_if_found(uint64_t msg_id, const CancelScope& cancel_scope)
{
- lib::ClusterStateBundle oldState = _clusterStateBundle;
- _clusterStateBundle = state;
- propagateClusterStates();
+ // In descending order of likelihood:
+ if (_operationOwner.try_cancel_by_id(msg_id, cancel_scope)) {
+ return;
+ }
+ if (_maintenanceOperationOwner.try_cancel_by_id(msg_id, cancel_scope)) {
+ return;
+ }
+ (void)_bucketDBUpdater.cancel_message_by_id(msg_id);
+}
- const auto& baseline_state = *state.getBaselineClusterState();
- enterRecoveryMode();
+void
+DistributorStripe::handle_node_down_edge_with_cancellations(uint16_t node_index, std::span<const uint64_t> msg_ids)
+{
+ auto cancel_scope = CancelScope::of_node_subset({node_index});
+ for (const auto msg_id : msg_ids) {
+ cancel_single_message_by_id_if_found(msg_id, cancel_scope);
+ }
+}
+
+void
+DistributorStripe::cancel_ops_for_buckets_no_longer_owned(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state)
+{
+ // Note: we explicitly do not simply reuse the set of buckets removed from the bucket database
+ // when deciding which operations to cancel. This is because that would depend on every candidate
+ // bucket to cancel already being present in the DB, which is hard to guarantee always holds.
+ const auto& distribution = _bucketSpaceRepo->get(bucket_space).getDistribution();
+ BucketOwnershipCalculator ownership_calc(new_state, distribution, getDistributorIndex());
+
+ auto bucket_not_owned_in_new_state = [&](const document::Bucket& bucket) {
+ return !ownership_calc.this_distributor_owns_bucket(bucket.getBucketId());
+ };
+ auto cancel_op_by_msg_id = [&](uint64_t msg_id) {
+ cancel_single_message_by_id_if_found(msg_id, CancelScope::of_fully_cancelled());
+ };
+ _pendingMessageTracker.enumerate_matching_pending_bucket_ops(bucket_not_owned_in_new_state, cancel_op_by_msg_id);
+}
+
+void
+DistributorStripe::cancel_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle)
+{
+ // TODO we should probably only consider a node as unavailable if it is unavailable in
+ // _all_ bucket spaces. Consider: implicit maintenance mode for global merges (although
+ // that _should_ only be triggered by the CC when the node was already down...).
+ const auto& baseline_state = *new_state_bundle.getBaselineClusterState();
+ const uint16_t old_node_count = old_state_bundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
+ const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
+ const auto& distribution = _bucketSpaceRepo->get(document::FixedBucketSpaces::default_space()).getDistribution();
+ for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
+ // Handle both the case where a node may be gone from the cluster state and from the config.
+ // These are not atomic, so one may happen before the other.
+ const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState();
+ const auto* node_group = distribution.getNodeGraph().getGroupForNode(i);
+ if (!node_state.oneOf(storage_node_up_states()) || !node_group) {
+ // Note: this also clears _non-maintenance_ operations from the pending message tracker, but
+ // the client operation mapping (_operationOwner) is _not_ cleared, so replies from the
+ // unavailable node(s) will still be processed as expected.
+ std::vector<uint64_t> msg_ids = _pendingMessageTracker.clearMessagesForNode(i);
+ LOG(debug, "Node %u is unavailable, cancelling %zu pending operations", i, msg_ids.size());
+ handle_node_down_edge_with_cancellations(i, msg_ids);
+ }
+ }
+}
+
+// TODO remove once cancellation support has proven itself worthy of prime time
+void
+DistributorStripe::legacy_erase_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle)
+{
+ const auto& baseline_state = *new_state_bundle.getBaselineClusterState();
// Clear all active messages on nodes that are down.
- // TODO this should also be done on nodes that are no longer part of the config!
- const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
+ const uint16_t old_node_count = old_state_bundle.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
const auto& node_state = baseline_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState();
@@ -291,12 +365,28 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state
LOG(debug, "Node %u is down, clearing %zu pending maintenance operations", i, msgIds.size());
for (const auto & msgId : msgIds) {
- _maintenanceOperationOwner.erase(msgId);
+ (void)_maintenanceOperationOwner.erase(msgId);
}
}
}
}
+void
+DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state)
+{
+ lib::ClusterStateBundle old_state = _clusterStateBundle;
+ _clusterStateBundle = state;
+
+ propagateClusterStates();
+ enterRecoveryMode();
+
+ if (_total_config->enable_operation_cancellation()) {
+ cancel_ops_for_unavailable_nodes(old_state, state);
+ } else {
+ legacy_erase_ops_for_unavailable_nodes(old_state, state);
+ }
+}
+
OperationRoutingSnapshot DistributorStripe::read_snapshot_for_bucket(const document::Bucket& bucket) const {
return _bucketDBUpdater.read_snapshot_for_bucket(bucket);
}
@@ -308,6 +398,10 @@ DistributorStripe::notifyDistributionChangeEnabled()
// Trigger a re-scan of bucket database, just like we do when a new cluster
// state has been enabled.
enterRecoveryMode();
+
+ if (_total_config->enable_operation_cancellation()) {
+ cancel_ops_for_unavailable_nodes(_clusterStateBundle, _clusterStateBundle);
+ }
}
void
@@ -850,6 +944,9 @@ DistributorStripe::remove_superfluous_buckets(document::BucketSpace bucket_space
const lib::ClusterState& new_state,
bool is_distribution_change)
{
+ if (_total_config->enable_operation_cancellation()) {
+ cancel_ops_for_buckets_no_longer_owned(bucket_space, new_state);
+ }
return bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 338a6c72125..566e6ed454a 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -26,6 +26,7 @@
#include <atomic>
#include <mutex>
#include <queue>
+#include <span>
#include <unordered_map>
namespace storage {
@@ -135,6 +136,8 @@ public:
const lib::ClusterStateBundle& getClusterStateBundle() const override;
+ std::shared_ptr<Operation> maintenance_op_from_message_id(uint64_t msg_id) const noexcept override;
+
/**
* Called by bucket db updater after a merge has finished, and all the
* request bucket info operations have been performed as well. Passes the
@@ -251,6 +254,16 @@ private:
void enterRecoveryMode();
void leaveRecoveryMode();
+ void cancel_single_message_by_id_if_found(uint64_t msg_id, const CancelScope& cancel_scope);
+ void handle_node_down_edge_with_cancellations(uint16_t node_index, std::span<const uint64_t> msg_ids);
+ void cancel_ops_for_buckets_no_longer_owned(document::BucketSpace bucket_space, const lib::ClusterState& new_state);
+ // Note: old and new state bundles may be the same if this is called for distribution config changes
+ void cancel_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle);
+
+ void legacy_erase_ops_for_unavailable_nodes(const lib::ClusterStateBundle& old_state_bundle,
+ const lib::ClusterStateBundle& new_state_bundle);
+
// Tries to generate an operation from the given message. Returns true
// if we either returned an operation, or the message was otherwise handled
// (for instance, wrong distribution).
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
index dfed59499c6..14888de961e 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h
@@ -18,6 +18,7 @@ namespace storage::distributor {
class DistributorMetricSet;
class NodeSupportedFeaturesRepo;
class PendingMessageTracker;
+class Operation;
/**
* TODO STRIPE add class comment.
@@ -27,7 +28,7 @@ class DistributorStripeInterface : public DistributorStripeMessageSender
public:
virtual DistributorMetricSet& getMetrics() = 0;
virtual void enableClusterStateBundle(const lib::ClusterStateBundle& state) = 0;
- virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0;
+ [[nodiscard]] virtual const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const = 0;
virtual void notifyDistributionChangeEnabled() = 0;
/**
@@ -57,7 +58,9 @@ public:
/**
* Returns true if the node is currently initializing.
*/
- virtual bool initializing() const = 0;
+ [[nodiscard]] virtual bool initializing() const = 0;
+
+ [[nodiscard]] virtual std::shared_ptr<Operation> maintenance_op_from_message_id(uint64_t msg_id) const noexcept = 0;
virtual void handleCompletedMerge(const std::shared_ptr<api::MergeBucketReply>&) = 0;
virtual const DistributorConfiguration& getConfig() const = 0;
virtual ChainedMessageSender& getMessageSender() = 0;
diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h
index a0234f425a0..92cc921d91c 100644
--- a/storage/src/vespa/storage/distributor/messagetracker.h
+++ b/storage/src/vespa/storage/distributor/messagetracker.h
@@ -25,7 +25,7 @@ public:
uint16_t _target;
};
- MessageTracker(const ClusterContext& cluster_context);
+ explicit MessageTracker(const ClusterContext& cluster_context);
MessageTracker(MessageTracker&&) noexcept = default;
MessageTracker& operator=(MessageTracker&&) noexcept = delete;
MessageTracker(const MessageTracker &) = delete;
diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp
index c92544c8cb5..16bbc36e4bc 100644
--- a/storage/src/vespa/storage/distributor/operationowner.cpp
+++ b/storage/src/vespa/storage/distributor/operationowner.cpp
@@ -70,10 +70,27 @@ OperationOwner::onClose()
}
}
-void
+std::shared_ptr<Operation>
+OperationOwner::find_by_id(api::StorageMessage::Id msg_id) const noexcept
+{
+ return _sentMessageMap.find_by_id_or_empty(msg_id);
+}
+
+bool
+OperationOwner::try_cancel_by_id(api::StorageMessage::Id id, const CancelScope& cancel_scope)
+{
+ auto* op = _sentMessageMap.find_by_id_or_nullptr(id);
+ if (!op) {
+ return false;
+ }
+ op->cancel(_sender, cancel_scope);
+ return true;
+}
+
+std::shared_ptr<Operation>
OperationOwner::erase(api::StorageMessage::Id msgId)
{
- (void)_sentMessageMap.pop(msgId);
+ return _sentMessageMap.pop(msgId);
}
}
diff --git a/storage/src/vespa/storage/distributor/operationowner.h b/storage/src/vespa/storage/distributor/operationowner.h
index 009bb5b80aa..828d776f1a6 100644
--- a/storage/src/vespa/storage/distributor/operationowner.h
+++ b/storage/src/vespa/storage/distributor/operationowner.h
@@ -9,6 +9,7 @@ namespace storage::framework { struct Clock; }
namespace storage::distributor {
+class CancelScope;
class Operation;
/**
@@ -87,10 +88,19 @@ public:
bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;
/**
- If the given message exists, create a reply and pass it to the
- appropriate callback.
+ * If the given message exists, remove it from the internal operation mapping.
+ *
+ * Returns the operation the message belonged to, if any.
*/
- void erase(api::StorageMessage::Id msgId);
+ [[nodiscard]] std::shared_ptr<Operation> erase(api::StorageMessage::Id msgId);
+
+ /**
+ * Returns a strong ref to the pending operation with the given msg_id if it exists.
+ * Otherwise returns an empty shared_ptr.
+ */
+ [[nodiscard]] std::shared_ptr<Operation> find_by_id(api::StorageMessage::Id msg_id) const noexcept;
+
+ [[nodiscard]] bool try_cancel_by_id(api::StorageMessage::Id msg_id, const CancelScope& cancel_scope);
[[nodiscard]] DistributorStripeMessageSender& sender() noexcept { return _sender; }
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index bd7f3709575..03be507f467 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -178,6 +178,8 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
if (_bucket_space.has_pending_cluster_state()) {
state_version_now = _bucket_space.get_pending_cluster_state().getVersion();
}
+ // TODO disable these explicit (and possibly costly) checks when cancellation is enabled,
+ // as cancellation shall cover a superset of the cases that can be detected here.
if ((state_version_now != _cluster_state_version_at_creation_time)
&& (replica_set_changed_after_get_operation()
|| distributor_no_longer_owns_bucket()))
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index c7f858de608..5735e47ec87 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -28,8 +28,8 @@ PutOperation::PutOperation(const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencing_handle)
: SequencedOperation(std::move(sequencing_handle)),
- _tracker_instance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_tracker_instance),
+ _tracker(metric, std::make_shared<api::PutReply>(*msg), node_ctx,
+ op_ctx, _cancel_scope, msg->getTimestamp()),
_msg(std::move(msg)),
_doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
_node_ctx(node_ctx),
@@ -253,7 +253,6 @@ PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScop
if (_check_condition) {
_check_condition->cancel(sender, cancel_scope);
}
- _tracker.cancel(cancel_scope);
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 8b8e3e15375..4d26ffda61e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -39,8 +39,7 @@ public:
void onClose(DistributorStripeMessageSender& sender) override;
private:
- PersistenceMessageTrackerImpl _tracker_instance;
- PersistenceMessageTracker& _tracker;
+ PersistenceMessageTracker _tracker;
std::shared_ptr<api::PutCommand> _msg;
document::BucketId _doc_id_bucket_id;
const DistributorNodeContext& _node_ctx;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
index 5f52a8208fc..ab3855b2687 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
@@ -23,12 +23,7 @@ RemoveLocationOperation::RemoveLocationOperation(
std::shared_ptr<api::RemoveLocationCommand> msg,
PersistenceOperationMetricSet& metric)
: Operation(),
- _trackerInstance(metric,
- std::make_shared<api::RemoveLocationReply>(*msg),
- node_ctx,
- op_ctx,
- 0),
- _tracker(_trackerInstance),
+ _tracker(metric, std::make_shared<api::RemoveLocationReply>(*msg), node_ctx, op_ctx, _cancel_scope, 0),
_msg(std::move(msg)),
_node_ctx(node_ctx),
_parser(parser),
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
index d177676ff03..1ac4af0997a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
@@ -10,8 +10,7 @@ namespace storage::distributor {
class DistributorBucketSpace;
-class RemoveLocationOperation : public Operation
-{
+class RemoveLocationOperation : public Operation {
public:
RemoveLocationOperation(const DistributorNodeContext& node_ctx,
DistributorStripeOperationContext& op_ctx,
@@ -32,14 +31,11 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>&) override;
void onClose(DistributorStripeMessageSender& sender) override;
private:
- PersistenceMessageTrackerImpl _trackerInstance;
- PersistenceMessageTracker& _tracker;
-
+ PersistenceMessageTracker _tracker;
std::shared_ptr<api::RemoveLocationCommand> _msg;
-
- const DistributorNodeContext& _node_ctx;
- const DocumentSelectionParser& _parser;
- DistributorBucketSpace &_bucketSpace;
+ const DistributorNodeContext& _node_ctx;
+ const DocumentSelectionParser& _parser;
+ DistributorBucketSpace& _bucketSpace;
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index be43aac3d9e..c7cfea017ac 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -19,10 +19,8 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& condition_probe_metrics,
SequencingHandle sequencingHandle)
: SequencedOperation(std::move(sequencingHandle)),
- _tracker_instance(metric,
- std::make_shared<api::RemoveReply>(*msg),
- node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_tracker_instance),
+ _tracker(metric, std::make_shared<api::RemoveReply>(*msg), node_ctx,
+ op_ctx, _cancel_scope, msg->getTimestamp()),
_msg(std::move(msg)),
_doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
_node_ctx(node_ctx),
@@ -168,7 +166,6 @@ RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelS
if (_check_condition) {
_check_condition->cancel(sender, cancel_scope);
}
- _tracker.cancel(cancel_scope);
}
bool RemoveOperation::has_condition() const noexcept {
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 221def81fdc..772047b96ca 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -32,8 +32,7 @@ public:
void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
private:
- PersistenceMessageTrackerImpl _tracker_instance;
- PersistenceMessageTracker& _tracker;
+ PersistenceMessageTracker _tracker;
std::shared_ptr<api::RemoveCommand> _msg;
document::BucketId _doc_id_bucket_id;
const DistributorNodeContext& _node_ctx;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 60bddebbb89..90f3f8c9c43 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -25,9 +25,8 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx,
std::vector<BucketDatabase::Entry> entries,
UpdateMetricSet& metric)
: Operation(),
- _trackerInstance(metric, std::make_shared<api::UpdateReply>(*msg),
- node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_trackerInstance),
+ _tracker(metric, std::make_shared<api::UpdateReply>(*msg), node_ctx,
+ op_ctx, _cancel_scope, msg->getTimestamp()),
_msg(msg),
_entries(std::move(entries)),
_new_timestamp(_msg->getTimestamp()),
@@ -207,13 +206,6 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender)
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
-void
-UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope)
-{
- _tracker.cancel(cancel_scope);
-}
-
-
// The backend behavior of "create-if-missing" updates is to return the timestamp of the
// _new_ update operation if the document was created from scratch. The two-phase update
// operation logic auto-detects unexpected inconsistencies and tries to reconcile
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 7d2131d426d..750e50aeae5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -31,25 +31,22 @@ public:
std::string getStatus() const override { return ""; };
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
void onClose(DistributorStripeMessageSender& sender) override;
- void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const {
return _newestTimestampLocation;
}
private:
- PersistenceMessageTrackerImpl _trackerInstance;
- PersistenceMessageTracker& _tracker;
- std::shared_ptr<api::UpdateCommand> _msg;
- std::vector<BucketDatabase::Entry> _entries;
- const api::Timestamp _new_timestamp;
- const bool _is_auto_create_update;
-
- const DistributorNodeContext& _node_ctx;
- DistributorStripeOperationContext& _op_ctx;
- DistributorBucketSpace &_bucketSpace;
+ PersistenceMessageTracker _tracker;
+ std::shared_ptr<api::UpdateCommand> _msg;
+ std::vector<BucketDatabase::Entry> _entries;
+ const api::Timestamp _new_timestamp;
+ const bool _is_auto_create_update;
+ const DistributorNodeContext& _node_ctx;
+ DistributorStripeOperationContext& _op_ctx;
+ DistributorBucketSpace& _bucketSpace;
std::pair<document::BucketId, uint16_t> _newestTimestampLocation;
- api::BucketInfo _infoAtSendTime; // Should be same across all replicas
+ api::BucketInfo _infoAtSendTime; // Should be same across all replicas
bool anyStorageNodesAvailable() const;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index bf64fa2eb82..e384163f421 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -23,7 +23,6 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
_cluster_state_version_at_phase1_start_time(0),
_remove_candidates(),
_replica_info(),
- _cancel_scope(),
_max_documents_removed(0),
_is_done(false)
{}
@@ -150,10 +149,6 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender,
}
}
-void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) {
- _cancel_scope.merge(cancel_scope);
-}
-
void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
_replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
from_node, reply.getBucketInfo());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 97efbe694de..d5c6d655857 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -7,7 +7,6 @@
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
-#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>
@@ -23,7 +22,6 @@ public:
void onStart(DistributorStripeMessageSender& sender) override;
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
- void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
const char* getName() const noexcept override { return "garbagecollection"; };
Type getType() const noexcept override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
@@ -56,7 +54,6 @@ private:
RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
- CancelScope _cancel_scope;
uint32_t _max_documents_removed;
bool _is_done;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index a69b7739e07..09082e718e0 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -28,6 +28,7 @@ IdealStateOperation::IdealStateOperation(const BucketAndNodes& bucketAndNodes)
: _manager(nullptr),
_bucketSpace(nullptr),
_bucketAndNodes(bucketAndNodes),
+ _detailedReason(),
_priority(255),
_ok(true)
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index 6c52bdb738d..ba4a2f95686 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/storage/distributor/maintenance/maintenanceoperation.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageapi/messageapi/storagereply.h>
#include <vespa/storageapi/messageapi/maintenancecommand.h>
@@ -110,7 +111,7 @@ public:
using Vector = std::vector<SP>;
using Map = std::map<document::BucketId, SP>;
- IdealStateOperation(const BucketAndNodes& bucketAndNodes);
+ explicit IdealStateOperation(const BucketAndNodes& bucketAndNodes);
~IdealStateOperation() override;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 616c4962dca..6153306861c 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -93,14 +93,18 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
void
JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
- auto& rep = static_cast<api::JoinBucketsReply&>(*msg);
+ auto& rep = dynamic_cast<api::JoinBucketsReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
if (node == 0xffff) {
LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons");
return;
}
- if (rep.getResult().success()) {
+ _ok = rep.getResult().success();
+ if (_cancel_scope.node_is_cancelled(node)) {
+ LOG(debug, "Join operation for %s has been cancelled", getBucketId().toString().c_str());
+ _ok = false;
+ } else if (rep.getResult().success()) {
const std::vector<document::BucketId>& sourceBuckets(
rep.getSourceBuckets());
for (auto bucket : sourceBuckets) {
@@ -133,7 +137,6 @@ JoinOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRepl
LOG(debug, "Join failed for %s with non-critical failure: %s",
getBucketId().toString().c_str(), rep.getResult().toString().c_str());
}
- _ok = rep.getResult().success();
LOG(debug, "Bucket %s join finished", getBucketId().toString().c_str());
if (_tracker.finished()) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 9469403daae..0a11a8233aa 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -177,7 +177,7 @@ MergeOperation::sourceOnlyCopyChangedDuringMerge(
const BucketDatabase::Entry& currentState) const
{
assert(currentState.valid());
- for (const auto & mnode : _mnodes) {
+ for (const auto& mnode : _mnodes) {
const BucketCopy* copyBefore(_infoBefore.getNode(mnode.index));
if (!copyBefore) {
continue;
@@ -205,7 +205,7 @@ MergeOperation::deleteSourceOnlyNodes(
{
assert(currentState.valid());
std::vector<uint16_t> sourceOnlyNodes;
- for (const auto & mnode : _mnodes) {
+ for (const auto& mnode : _mnodes) {
const uint16_t nodeIndex = mnode.index;
const BucketCopy* copy = currentState->getNode(nodeIndex);
if (!copy) {
@@ -272,7 +272,14 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::sha
api::ReturnCode result = reply.getResult();
_ok = result.success();
- if (_ok) {
+ // We avoid replica deletion entirely if _any_ aspect of the merge has been cancelled.
+ // It is, for instance, possible that a node that was previously considered source-only
+ // now is part of the #redundancy ideal copies because another node became unavailable.
+ // Leave it up to the maintenance state checkers to figure this out.
+ if (_cancel_scope.is_cancelled()) {
+ LOG(debug, "Merge operation for %s has been cancelled", getBucketId().toString().c_str());
+ _ok = false;
+ } else if (_ok) {
BucketDatabase::Entry entry(_bucketSpace->getBucketDatabase().get(getBucketId()));
if (!entry.valid()) {
LOG(debug, "Bucket %s no longer exists after merge", getBucketId().toString().c_str());
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 41767f0e3af..2184739f82c 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -59,13 +59,14 @@ RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply
{
auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get());
- uint16_t node = _tracker.handleReply(*rep);
+ const uint16_t node = _tracker.handleReply(*rep);
- LOG(debug, "Got DeleteBucket reply for %s from node %u",
- getBucketId().toString().c_str(),
- node);
+ LOG(debug, "Got DeleteBucket reply for %s from node %u", getBucketId().toString().c_str(), node);
- if (rep->getResult().failed()) {
+ if (_cancel_scope.node_is_cancelled(node)) {
+ LOG(debug, "DeleteBucket operation for %s has been cancelled", getBucketId().toString().c_str());
+ _ok = false;
+ } else if (rep->getResult().failed()) {
if (rep->getResult().getResult() == api::ReturnCode::REJECTED
&& rep->getBucketInfo().valid())
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
index 531f7f64b68..5ddf082a544 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
@@ -69,13 +69,17 @@ void
SetBucketStateOperation::onReceive(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply)
{
- auto& rep = static_cast<api::SetBucketStateReply&>(*reply);
+ auto& rep = dynamic_cast<api::SetBucketStateReply&>(*reply);
const uint16_t node = _tracker.handleReply(rep);
LOG(debug, "Got %s from node %u", reply->toString(true).c_str(), node);
bool deactivate = false;
- if (reply->getResult().success()) {
+
+ if (_cancel_scope.node_is_cancelled(node)) {
+ LOG(debug, "SetBucketState for %s has been cancelled", rep.getBucketId().toString().c_str());
+ _ok = false;
+ } else if (reply->getResult().success()) {
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId());
if (entry.valid()) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index d704a42e96b..c894deeecd8 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -50,7 +50,7 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender)
void
SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
- auto & rep = static_cast<api::SplitBucketReply&>(*msg);
+ auto& rep = dynamic_cast<api::SplitBucketReply&>(*msg);
uint16_t node = _tracker.handleReply(rep);
@@ -61,7 +61,9 @@ SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageRep
std::ostringstream ost;
- if (rep.getResult().success()) {
+ if (_cancel_scope.node_is_cancelled(node)) {
+ _ok = false;
+ } else if (rep.getResult().success()) {
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId());
if (entry.valid()) {
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index 9f944a94178..f60dc8eecff 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -13,7 +13,7 @@ namespace storage::distributor {
Operation::Operation()
: _startTime(),
- _cancelled(false)
+ _cancel_scope()
{
}
@@ -47,7 +47,7 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo
}
void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
- _cancelled = true;
+ _cancel_scope.merge(cancel_scope);
on_cancel(sender, cancel_scope);
}
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index 64caacfc642..c742f918c30 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "cancel_scope.h"
#include <vespa/vdslib/state/nodetype.h>
#include <vespa/storage/distributor/distributormessagesender.h>
#include <vespa/vespalib/util/time.h>
@@ -68,13 +69,15 @@ public:
*/
void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+ [[nodiscard]] const CancelScope& cancel_scope() const noexcept { return _cancel_scope; }
+
/**
* Whether cancel() has been invoked at least once on this instance. This does not
* distinguish between cancellations caused by ownership transfers and those caused
* by nodes becoming unavailable; Operation implementations that care about this need
- * to implement cancel() themselves and inspect the provided CancelScope.
+ * to inspect cancel_scope() themselves.
*/
- [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; }
+ [[nodiscard]] bool is_cancelled() const noexcept { return _cancel_scope.is_cancelled(); }
/**
* Returns true if we are blocked to start this operation given
@@ -118,7 +121,7 @@ protected:
static constexpr vespalib::duration MAX_TIMEOUT = 3600s;
vespalib::system_time _startTime;
- bool _cancelled;
+ CancelScope _cancel_scope;
};
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index 498f3a5feab..638f44eb5a7 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -13,11 +13,12 @@ LOG_SETUP(".persistencemessagetracker");
namespace storage::distributor {
-PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
+PersistenceMessageTracker::PersistenceMessageTracker(
PersistenceOperationMetricSet& metric,
std::shared_ptr<api::BucketInfoReply> reply,
const DistributorNodeContext& node_ctx,
DistributorStripeOperationContext& op_ctx,
+ CancelScope& cancel_scope,
api::Timestamp revertTimestamp)
: MessageTracker(node_ctx),
_remapBucketInfo(),
@@ -28,7 +29,7 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
_revertTimestamp(revertTimestamp),
_trace(_reply->getTrace().getLevel()),
_requestTimer(node_ctx.clock()),
- _cancel_scope(),
+ _cancel_scope(cancel_scope),
_n_persistence_replies_total(0),
_n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
@@ -36,33 +37,34 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
{
}
-PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default;
+PersistenceMessageTracker::~PersistenceMessageTracker() = default;
-void
-PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope)
-{
- _cancel_scope.merge(cancel_scope);
-}
-
-void
-PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present(
+bool
+PersistenceMessageTracker::prune_cancelled_nodes_if_present(
BucketInfoMap& bucket_and_replicas,
const CancelScope& cancel_scope)
{
+ bool any_replicas = false;
for (auto& info : bucket_and_replicas) {
info.second = prune_cancelled_nodes(info.second, cancel_scope);
+ any_replicas |= !info.second.empty();
}
+ return any_replicas;
}
void
-PersistenceMessageTrackerImpl::updateDB()
+PersistenceMessageTracker::updateDB()
{
if (_cancel_scope.is_cancelled()) {
if (_cancel_scope.fully_cancelled()) {
return; // Fully cancelled ops cannot mutate the DB at all
}
- prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope);
- prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope);
+ const bool any_infos = prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope);
+ const bool any_remapped = prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope);
+ if (!(any_infos || any_remapped)) {
+ LOG(spam, "No usable bucket info left after pruning; returning without updating DB");
+ return;
+ }
}
for (const auto & entry : _bucketInfo) {
@@ -75,7 +77,7 @@ PersistenceMessageTrackerImpl::updateDB()
}
void
-PersistenceMessageTrackerImpl::updateMetrics()
+PersistenceMessageTracker::updateMetrics()
{
const api::ReturnCode& result(_reply->getResult());
_metric.updateFromResult(result);
@@ -83,7 +85,7 @@ PersistenceMessageTrackerImpl::updateMetrics()
}
void
-PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode& result) {
+PersistenceMessageTracker::fail(MessageSender& sender, const api::ReturnCode& result) {
if (_reply.get()) {
_reply->setResult(result);
updateMetrics();
@@ -94,7 +96,7 @@ PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode
}
uint16_t
-PersistenceMessageTrackerImpl::receiveReply(MessageSender& sender, api::BucketInfoReply& reply)
+PersistenceMessageTracker::receiveReply(MessageSender& sender, api::BucketInfoReply& reply)
{
uint16_t node = handleReply(reply);
@@ -106,7 +108,7 @@ PersistenceMessageTrackerImpl::receiveReply(MessageSender& sender, api::BucketIn
}
void
-PersistenceMessageTrackerImpl::revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes)
+PersistenceMessageTracker::revert(MessageSender& sender, const std::vector<BucketNodePair>& revertNodes)
{
if (_revertTimestamp != 0) {
// Since we're reverting, all received bucket info is voided.
@@ -126,7 +128,7 @@ PersistenceMessageTrackerImpl::revert(MessageSender& sender, const std::vector<B
}
void
-PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) {
+PersistenceMessageTracker::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) {
_messageBatches.emplace_back();
auto & batch = _messageBatches.back();
batch.reserve(messages.size());
@@ -142,7 +144,7 @@ PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToS
}
bool
-PersistenceMessageTrackerImpl::canSendReplyEarly() const
+PersistenceMessageTracker::canSendReplyEarly() const
{
if (!_reply.get() || !_reply->getResult().success()) {
LOG(spam, "Can't return early because we have already replied or failed");
@@ -181,7 +183,7 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const
}
void
-PersistenceMessageTrackerImpl::addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply)
+PersistenceMessageTracker::addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply)
{
document::Bucket bucket(reply.getBucket());
const api::BucketInfo& bucketInfo(reply.getBucketInfo());
@@ -198,7 +200,7 @@ PersistenceMessageTrackerImpl::addBucketInfoFromReply(uint16_t node, const api::
}
void
-PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const
+PersistenceMessageTracker::logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const
{
LOG(spam, "Bucket %s: Received successful reply %s",
reply.getBucketId().toString().c_str(), reply.toString().c_str());
@@ -211,26 +213,26 @@ PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::Buck
}
bool
-PersistenceMessageTrackerImpl::shouldRevert() const
+PersistenceMessageTracker::shouldRevert() const
{
return _op_ctx.distributor_config().enable_revert()
&& !_revertNodes.empty() && !_success && _reply;
}
-bool PersistenceMessageTrackerImpl::has_majority_successful_replies() const noexcept {
+bool PersistenceMessageTracker::has_majority_successful_replies() const noexcept {
// FIXME this has questionable interaction with early client ACK since we only count
// the number of observed replies rather than the number of total requests sent.
// ... but the early ACK-feature dearly needs a redesign anyway.
return (_n_successful_persistence_replies >= (_n_persistence_replies_total / 2 + 1));
}
-bool PersistenceMessageTrackerImpl::has_minority_test_and_set_failure() const noexcept {
+bool PersistenceMessageTracker::has_minority_test_and_set_failure() const noexcept {
return ((_reply->getResult().getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED)
&& has_majority_successful_replies());
}
void
-PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
+PersistenceMessageTracker::sendReply(MessageSender& sender)
{
// If we've observed _partial_ TaS failures but have had a majority of good ACKs,
// treat the reply as successful. This is because the ACKed write(s) will eventually
@@ -247,7 +249,7 @@ PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
}
void
-PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& reply)
+PersistenceMessageTracker::updateFailureResult(const api::BucketInfoReply& reply)
{
LOG(debug, "Bucket %s: Received failed reply %s with result %s",
reply.getBucketId().toString().c_str(), reply.toString().c_str(), reply.getResult().toString().c_str());
@@ -259,13 +261,13 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r
}
bool
-PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept
+PersistenceMessageTracker::node_is_effectively_cancelled(uint16_t node) const noexcept
{
return _cancel_scope.node_is_cancelled(node); // Implicitly covers the fully cancelled case
}
void
-PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node)
+PersistenceMessageTracker::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node)
{
LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node);
if (!reply.getResult().success()
@@ -285,7 +287,7 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& rep
}
void
-PersistenceMessageTrackerImpl::handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node)
+PersistenceMessageTracker::handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node)
{
++_n_persistence_replies_total;
if (reply.getBucketInfo().valid()) {
@@ -301,7 +303,7 @@ PersistenceMessageTrackerImpl::handlePersistenceReply(api::BucketInfoReply& repl
}
void
-PersistenceMessageTrackerImpl::transfer_trace_state_to_reply()
+PersistenceMessageTracker::transfer_trace_state_to_reply()
{
if (!_trace.isEmpty()) {
_trace.setStrict(false);
@@ -310,7 +312,7 @@ PersistenceMessageTrackerImpl::transfer_trace_state_to_reply()
}
void
-PersistenceMessageTrackerImpl::updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node)
+PersistenceMessageTracker::updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node)
{
_trace.addChild(reply.steal_trace());
@@ -338,7 +340,7 @@ PersistenceMessageTrackerImpl::updateFromReply(MessageSender& sender, api::Bucke
}
void
-PersistenceMessageTrackerImpl::add_trace_tree_to_reply(vespalib::Trace trace)
+PersistenceMessageTracker::add_trace_tree_to_reply(vespalib::Trace trace)
{
_trace.addChild(std::move(trace));
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 8c44d70062c..a20bcb2c905 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -11,47 +11,29 @@
namespace storage::distributor {
-struct PersistenceMessageTracker {
- virtual ~PersistenceMessageTracker() = default;
- using ToSend = MessageTracker::ToSend;
-
- virtual void cancel(const CancelScope& cancel_scope) = 0;
- virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
- virtual void queueMessageBatch(std::vector<ToSend> messages) = 0;
- virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0;
- virtual std::shared_ptr<api::BucketInfoReply>& getReply() = 0;
- virtual void updateFromReply(MessageSender&, api::BucketInfoReply&, uint16_t node) = 0;
- virtual void queueCommand(api::BucketCommand::SP, uint16_t target) = 0;
- virtual void flushQueue(MessageSender&) = 0;
- virtual uint16_t handleReply(api::BucketReply& reply) = 0;
- virtual void add_trace_tree_to_reply(vespalib::Trace trace) = 0;
-};
-
-class PersistenceMessageTrackerImpl final
- : public PersistenceMessageTracker,
- public MessageTracker
-{
+class PersistenceMessageTracker final : public MessageTracker {
public:
- PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric,
- std::shared_ptr<api::BucketInfoReply> reply,
- const DistributorNodeContext& node_ctx,
- DistributorStripeOperationContext& op_ctx,
- api::Timestamp revertTimestamp = 0);
- ~PersistenceMessageTrackerImpl() override;
+ using ToSend = MessageTracker::ToSend;
- void cancel(const CancelScope& cancel_scope) override;
+ PersistenceMessageTracker(PersistenceOperationMetricSet& metric,
+ std::shared_ptr<api::BucketInfoReply> reply,
+ const DistributorNodeContext& node_ctx,
+ DistributorStripeOperationContext& op_ctx,
+ CancelScope& cancel_scope,
+ api::Timestamp revertTimestamp = 0);
+ ~PersistenceMessageTracker();
void updateDB();
void updateMetrics();
[[nodiscard]] bool success() const noexcept { return _success; }
- void fail(MessageSender& sender, const api::ReturnCode& result) override;
+ void fail(MessageSender& sender, const api::ReturnCode& result);
/**
Returns the node the reply was from.
*/
- uint16_t receiveReply(MessageSender& sender, api::BucketInfoReply& reply) override;
- void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node) override;
- std::shared_ptr<api::BucketInfoReply>& getReply() override { return _reply; }
+ uint16_t receiveReply(MessageSender& sender, api::BucketInfoReply& reply);
+ void updateFromReply(MessageSender& sender, api::BucketInfoReply& reply, uint16_t node);
+ std::shared_ptr<api::BucketInfoReply>& getReply() { return _reply; }
using BucketNodePair = std::pair<document::Bucket, uint16_t>;
@@ -63,7 +45,9 @@ public:
have at most (messages.size() - initial redundancy) messages left in the
queue and have it's first message be done.
*/
- void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override;
+ void queueMessageBatch(std::vector<MessageTracker::ToSend> messages);
+
+ void add_trace_tree_to_reply(vespalib::Trace trace);
private:
using MessageBatch = std::vector<uint64_t>;
@@ -79,14 +63,15 @@ private:
std::vector<BucketNodePair> _revertNodes;
mbus::Trace _trace;
framework::MilliSecTimer _requestTimer;
- CancelScope _cancel_scope;
+ CancelScope& _cancel_scope;
uint32_t _n_persistence_replies_total;
uint32_t _n_successful_persistence_replies;
uint8_t _priority;
bool _success;
- static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas,
- const CancelScope& cancel_scope);
+ // Returns true iff `bucket_and_replicas` have at least 1 usable entry after pruning
+ [[nodiscard]]static bool prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope);
[[nodiscard]] bool canSendReplyEarly() const;
void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply);
void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const;
@@ -100,13 +85,6 @@ private:
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);
void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node);
void transfer_trace_state_to_reply();
-
- void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) override {
- MessageTracker::queueCommand(std::move(msg), target);
- }
- void flushQueue(MessageSender& s) override { MessageTracker::flushQueue(s); }
- uint16_t handleReply(api::BucketReply& r) override { return MessageTracker::handleReply(r); }
- void add_trace_tree_to_reply(vespalib::Trace trace) override;
};
}
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.cpp b/storage/src/vespa/storage/distributor/sentmessagemap.cpp
index 4b7292c1e81..2ae70f417d1 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.cpp
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.cpp
@@ -18,18 +18,30 @@ SentMessageMap::SentMessageMap()
SentMessageMap::~SentMessageMap() = default;
+Operation*
+SentMessageMap::find_by_id_or_nullptr(api::StorageMessage::Id id) const noexcept
+{
+ auto iter = _map.find(id);
+ return ((iter != _map.end()) ? iter->second.get() : nullptr);
+}
+
+std::shared_ptr<Operation>
+SentMessageMap::find_by_id_or_empty(api::StorageMessage::Id id) const noexcept
+{
+ auto iter = _map.find(id);
+ return ((iter != _map.end()) ? iter->second : std::shared_ptr<Operation>());
+}
std::shared_ptr<Operation>
SentMessageMap::pop()
{
auto found = _map.begin();
-
if (found != _map.end()) {
- std::shared_ptr<Operation> retVal = found->second;
+ std::shared_ptr<Operation> op = std::move(found->second);
_map.erase(found);
- return retVal;
+ return op;
} else {
- return std::shared_ptr<Operation>();
+ return {};
}
}
@@ -37,17 +49,15 @@ std::shared_ptr<Operation>
SentMessageMap::pop(api::StorageMessage::Id id)
{
auto found = _map.find(id);
-
if (found != _map.end()) {
LOG(spam, "Found Id %" PRIu64 " in callback map: %p", id, found->second.get());
- std::shared_ptr<Operation> retVal = found->second;
+ std::shared_ptr<Operation> op = std::move(found->second);
_map.erase(found);
- return retVal;
+ return op;
} else {
LOG(spam, "Did not find Id %" PRIu64 " in callback map", id);
-
- return std::shared_ptr<Operation>();
+ return {};
}
}
@@ -55,7 +65,6 @@ void
SentMessageMap::insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & callback)
{
LOG(spam, "Inserting callback %p for message %" PRIu64 "", callback.get(), id);
-
_map[id] = callback;
}
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 951ed6a6877..3ad80f4e55d 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -15,6 +15,11 @@ public:
SentMessageMap();
~SentMessageMap();
+ // Find by message ID, or nullptr if not found
+ [[nodiscard]] Operation* find_by_id_or_nullptr(api::StorageMessage::Id id) const noexcept;
+ // Find by message ID, or empty shared_ptr if not found
+ [[nodiscard]] std::shared_ptr<Operation> find_by_id_or_empty(api::StorageMessage::Id id) const noexcept;
+
[[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
[[nodiscard]] std::shared_ptr<Operation> pop();
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
index fd747484ccf..ad8fae9cd74 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp
@@ -130,6 +130,7 @@ StripeBucketDBUpdater::sendRequestBucketInfo(
const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
{
+ // TODO assert if cancellation enabled
if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) {
return;
}
@@ -342,14 +343,19 @@ StripeBucketDBUpdater::onMergeBucketReply(
const std::shared_ptr<api::MergeBucketReply>& reply)
{
auto replyGuard = std::make_shared<MergeReplyGuard>(_distributor_interface, reply);
+ auto merge_op = _distributor_interface.maintenance_op_from_message_id(reply->getMsgId());
// In case the merge was unsuccessful somehow, or some nodes weren't
// actually merged (source-only nodes?) we request the bucket info of the
// bucket again to make sure it's ok.
for (uint32_t i = 0; i < reply->getNodes().size(); i++) {
- sendRequestBucketInfo(reply->getNodes()[i].index,
- reply->getBucket(),
- replyGuard);
+ const uint16_t node_index = reply->getNodes()[i].index;
+ // We conditionally omit the node instead of conditionally send to it, as many tests do not
+ // wire their merges through the main distributor maintenance operation tracking locking.
+ if (merge_op && merge_op->cancel_scope().node_is_cancelled(node_index)) {
+ continue;
+ }
+ sendRequestBucketInfo(node_index, reply->getBucket(), replyGuard);
}
return true;
@@ -502,8 +508,9 @@ StripeBucketDBUpdater::processSingleBucketInfoReply(const std::shared_ptr<api::R
BucketRequest req = iter->second;
_sentMessages.erase(iter);
- if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
- // Ignore replies from nodes that are down.
+ // TODO remove explicit node check in favor of cancellation only
+ if (req.cancelled || !_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
+ // Ignore replies from nodes that are cancelled/down.
return true;
}
if (repl->getResult().getResult() != api::ReturnCode::OK) {
@@ -516,6 +523,17 @@ StripeBucketDBUpdater::processSingleBucketInfoReply(const std::shared_ptr<api::R
return true;
}
+bool
+StripeBucketDBUpdater::cancel_message_by_id(uint64_t msg_id)
+{
+ auto iter = _sentMessages.find(msg_id);
+ if (iter == _sentMessages.end()) {
+ return false;
+ }
+ iter->second.cancelled = true;
+ return true;
+}
+
void
StripeBucketDBUpdater::addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
BucketListMerger::BucketList& existing)
@@ -652,12 +670,10 @@ StripeBucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
_nonOwnedBuckets(),
_removed_buckets(0),
_removed_documents(0),
- _localIndex(localIndex),
_distribution(distribution),
_upStates(upStates),
- _track_non_owned_entries(track_non_owned_entries),
- _cachedDecisionSuperbucket(UINT64_MAX),
- _cachedOwned(false)
+ _ownership_calc(_state, _distribution, localIndex),
+ _track_non_owned_entries(track_non_owned_entries)
{
const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
_available_nodes.resize(storage_count);
@@ -678,45 +694,15 @@ StripeBucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& b
LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
}
-namespace {
-
-uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution_bits) noexcept {
- // The n LSBs of the bucket ID contain the superbucket number. Mask off the rest.
- return id.getRawId() & ~(UINT64_MAX << distribution_bits);
-}
-
-}
-
bool
StripeBucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
const document::BucketId& bucketId) const
{
- // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
- // TODO "too few bits used" case can be cheaply checked without needing exception
- try {
- const auto bits = _state.getDistributionBitCount();
- const auto this_superbucket = superbucket_from_id(bucketId, bits);
- if (_cachedDecisionSuperbucket == this_superbucket) {
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor (cached)");
- }
- return _cachedOwned;
- }
-
- uint16_t distributor = _distribution.getIdealDistributorNode(_state, bucketId, "uim");
- _cachedDecisionSuperbucket = this_superbucket;
- _cachedOwned = (distributor == _localIndex);
- if (!_cachedOwned) {
- logRemove(bucketId, "bucket now owned by another distributor");
- return false;
- }
- return true;
- } catch (lib::TooFewBucketBitsInUseException& exc) {
- logRemove(bucketId, "using too few distribution bits now");
- } catch (lib::NoDistributorsAvailableException& exc) {
- logRemove(bucketId, "no distributors are available");
+ const bool owns_bucket = _ownership_calc.this_distributor_owns_bucket(bucketId);
+ if (!owns_bucket) {
+ logRemove(bucketId, "bucket now owned by another distributor");
}
- return false;
+ return owns_bucket;
}
void
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 2e4ef2a7543..9536c84691d 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "bucket_ownership_calculator.h"
#include "bucketlistmerger.h"
#include "distributor_stripe_component.h"
#include "distributormessagesender.h"
@@ -49,6 +50,7 @@ public:
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
void resendDelayedMessages();
+ [[nodiscard]] bool cancel_message_by_id(uint64_t msg_id);
vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
@@ -75,7 +77,8 @@ public:
private:
class MergeReplyGuard {
public:
- MergeReplyGuard(DistributorStripeInterface& distributor_interface, const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
+ MergeReplyGuard(DistributorStripeInterface& distributor_interface,
+ const std::shared_ptr<api::MergeBucketReply>& reply) noexcept
: _distributor_interface(distributor_interface), _reply(reply) {}
~MergeReplyGuard();
@@ -89,22 +92,23 @@ private:
};
struct BucketRequest {
- BucketRequest()
- : targetNode(0), bucket(), timestamp(0) {};
+ BucketRequest() noexcept : targetNode(0), bucket(), timestamp(0), cancelled(false) {}
BucketRequest(uint16_t t, uint64_t currentTime, const document::Bucket& b,
- const std::shared_ptr<MergeReplyGuard>& guard)
+ const std::shared_ptr<MergeReplyGuard>& guard) noexcept
: targetNode(t),
bucket(b),
timestamp(currentTime),
- _mergeReplyGuard(guard) {};
+ _mergeReplyGuard(guard),
+ cancelled(false)
+ {}
void print_xml_tag(vespalib::xml::XmlOutputStream &xos, const vespalib::xml::XmlAttribute &timestampAttribute) const;
uint16_t targetNode;
document::Bucket bucket;
uint64_t timestamp;
-
std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
+ bool cancelled;
};
struct EnqueuedBucketRecheck {
@@ -148,7 +152,7 @@ private:
static void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
uint16_t targetNode, BucketListMerger::BucketList& newList);
void sendRequestBucketInfo(uint16_t node, const document::Bucket& bucket,
- const std::shared_ptr<MergeReplyGuard>& mergeReplystatic );
+ const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard);
static void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
BucketListMerger::BucketList& existing);
void clearReadOnlyBucketRepoDatabases();
@@ -218,12 +222,10 @@ private:
std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
size_t _removed_buckets;
size_t _removed_documents;
- uint16_t _localIndex;
const lib::Distribution& _distribution;
const char* _upStates;
+ BucketOwnershipCalculator _ownership_calc;
bool _track_non_owned_entries;
- mutable uint64_t _cachedDecisionSuperbucket;
- mutable bool _cachedOwned;
};
using DistributionContexts = std::unordered_map<document::BucketSpace,