summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-07-18 13:12:29 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-08-18 13:32:45 +0000
commit538d79ce480069322c04ca87436b80e0146e2dfa (patch)
tree4ef0e03edced933286650b7e4e7ed713617c80c6 /storage
parent5a2dd2d26e3c518e7c54ccd29c9be27e8accb7df (diff)
Implement edge-triggered distributor operation cancelling
Will be used for ensuring active operations do not mutate the bucket database upon completion with stale entries for buckets that may no longer be valid for that distributor to handle. Removes the need for today's "always-on" implicit checks for node state and bucket ownership upon every single DB update (which is potentially ABA-susceptible). Moving to edge-triggering is intentionally done to avoid ABA problems. Cancellation cases are: * Distributor ownership of bucket changed * Subset of target nodes has become unavailable Note: cancellation is not yet wired in; this code is cold.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp17
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp60
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp155
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp40
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp169
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp40
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp19
-rw-r--r--storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h17
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/operations/cancel_scope.cpp52
-rw-r--r--storage/src/vespa/storage/distributor/operations/cancel_scope.h59
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp76
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h21
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp30
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h12
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h25
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp46
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h33
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h12
32 files changed, 862 insertions, 89 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
index 757a9329ea6..92c3b120da2 100644
--- a/storage/src/tests/distributor/check_condition_test.cpp
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
#include <vespa/storage/distributor/node_supported_features.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/check_condition.h>
#include <vespa/storage/distributor/persistence_operation_metric_set.h>
#include <vespa/storageapi/message/persistence.h>
@@ -227,6 +228,20 @@ TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) {
});
}
+TEST_F(CheckConditionTest, check_fails_if_condition_explicitly_cancelled) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.cancel(_sender, CancelScope::of_ownership_change());
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::ABORTED);
+ });
+}
+
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -242,6 +257,7 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -255,6 +271,7 @@ TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 9b5056f2066..c3cec32b279 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -113,9 +113,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
ASSERT_EQ(entry->getNodeCount(), info.size());
EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
for (size_t i = 0; i < info.size(); ++i) {
- EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo())
- << "Mismatching info for node " << i << ": " << info[i] << " vs "
- << entry->getNode(i)->getBucketInfo();
+ auto& node = entry->getNodeRef(i);
+ EXPECT_EQ(info[i], node.getBucketInfo()) << "Mismatching DB bucket info for node " << node.getNode();
}
}
@@ -172,6 +171,51 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until
EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics
}
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_if_operation_fully_canceled) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_ownership_change());
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged. Note that in a real scenario, the DB entry will have been removed
+ // as part of the ownership change, but there are already non-cancellation behaviors that
+ // avoid creating buckets from scratch in the DB if they do not exist, so just checking to
+ // see if the bucket exists or not risks hiding missing cancellation edge handling.
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+ // However, we still update our metrics if we _did_ remove documents on one or more nodes
+ EXPECT_EQ(70u, gc_removed_documents_metric());
+}
+
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_for_cancelled_node) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for node 0, changed for node 1
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(4567, 90, 500)}, 34));
+}
+
+TEST_F(GarbageCollectionOperationTest, node_cancellation_is_cumulative) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for both nodes
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+}
+
TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
auto op = create_op();
op->start(_sender);
@@ -363,6 +407,16 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in
receive_phase1_replies_and_assert_no_phase_2_started();
}
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_fully_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_ownership_change());
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_partially_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_node_subset({0}));
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
enable_two_phase_gc();
auto op = create_op();
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 76b6741442e..431b7595571 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
@@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R
_sender.getCommands(true, true, 4));
}
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_ownership_change());
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+
+ // DB is not touched (note: normally node 1 would be removed at the cancel-edge).
+ ASSERT_EQ("BucketId(0x4000000000008f09) : "
+ "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), "
+ "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)",
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
+ // No new requests sent
+ ASSERT_EQ("", _sender.getCommands(true, true, 4));
+}
+
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+ sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0
+
+ // Bucket info recheck only sent to node 1, as it's not cancelled
+ ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1",
+ _sender.getCommands(true, true, 4));
+}
+
TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cfg = make_config();
@@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck
_sender.getLastReply());
}
+TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ createAndSendSampleDocument(TIMEOUT);
+
+ ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 0,"
+ "Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 1",
+ _sender.getCommands(true, true));
+
+ op->cancel(_sender, CancelScope::of_ownership_change());
+
+ sendReply(0);
+ sendReply(1);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, storage_failed) {
setup_stripe(2, 1, "storage:1 distributor:1");
@@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
- std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
ASSERT_TRUE(sreply);
sreply->remapBucketId(document::BucketId(17, 13));
@@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
dumpBucket(document::BucketId(17, 13)));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) {
setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
@@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
@@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
// TODO probably also do this for updates and removes
// TODO consider if we should use the pending state verbatim for computing targets if it exists
+// TODO make this redundant through operation cancelling
+// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this??
TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "test");
@@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state
_sender.getLastReply(true));
}
+TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2});
+ op->cancel(_sender, CancelScope::of_ownership_change());
+
+ // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless
+ // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path.
+ {
+ std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
+ auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
+ ASSERT_TRUE(sreply);
+ sreply->remapBucketId(remap_bucket);
+ sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5));
+ op->receive(_sender, reply);
+ }
+
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9));
+
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket));
+ EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket));
+}
+
+TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7));
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)",
+ dumpBucket(bucket));
+}
+
TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
setup_stripe(Redundancy(2), NodeCount(2),
"distributor:1 storage:2 .0.s:r .1.s:r");
@@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) {
_sender.getLastReply());
}
+TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->cancel(_sender, CancelScope::of_ownership_change());
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill
+ // the write phase for all replicas.
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) {
setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
auto doc = createDummyDocument("test", "test");
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index d169c80a95d..762f9857fce 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -68,6 +68,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
std::unique_ptr<api::StorageReply> reply(removec->makeReply());
auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get());
removeR->setOldTimestamp(oldTimestamp);
+ removeR->setBucketInfo(api::BucketInfo(1,2,3,4,5));
callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release()));
}
@@ -307,6 +308,45 @@ TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_err
_sender.getLastReply());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(ExtRemoveOperationTest, cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 50, false, true));
+ op->cancel(_sender, CancelScope::of_ownership_change());
+ reply_with(make_get_reply(1, 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(ExtRemoveOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT));
+ ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucketId), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, 0, 50);
+ replyToMessage(*op, 1, 50);
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(bucketId));
+ // Reply is still OK since the operation went through on the content nodes
+ ASSERT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, timestamp 100, removed doc from 50) ReturnCode(NONE)",
+ _sender.getLastReply());
+
+}
+
TEST_F(ExtRemoveOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) {
ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index da32225cde3..1907335545a 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -4,16 +4,16 @@
#include <vespa/config/helper/configgetter.h>
#include <vespa/document/base/testdocrepo.h>
#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/intfieldvalue.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/update/arithmeticvalueupdate.h>
-#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h>
+#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storageapi/message/persistence.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
#include <gmock/gmock.h>
namespace storage::distributor {
@@ -30,8 +30,9 @@ using namespace ::testing;
struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
document::TestDocRepo _testRepo;
std::shared_ptr<const DocumentTypeRepo> _repo;
- const DocumentType* _doc_type;
+ const DocumentType* _doc_type{nullptr};
DistributorMessageSenderStub _sender;
+ BucketId _bucket_id{0x400000000000cac4};
TwoPhaseUpdateOperationTest();
~TwoPhaseUpdateOperationTest() override;
@@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
void checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const;
- std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
+ static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
void SetUp() override {
_repo = _testRepo.getTypeRepoSp();
@@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
close();
}
- void replyToMessage(Operation& callback,
- DistributorMessageSenderStub& sender,
- uint32_t index,
- uint64_t oldTimestamp,
- api::ReturnCode::Result result = api::ReturnCode::OK);
+ static void replyToMessage(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t oldTimestamp,
+ api::ReturnCode::Result result = api::ReturnCode::OK);
- void replyToPut(
+ static void replyToPut(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void replyToCreateBucket(
+ static void replyToCreateBucket(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void reply_to_metadata_get(
+ static void reply_to_metadata_get(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& trace_msg = "");
- void reply_to_get_with_tombstone(
+ static void reply_to_get_with_tombstone(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
Timestamp highest_get_timestamp,
Timestamp expected_response_timestamp);
- std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
- setup_stripe(2, 2, "storage:2 distributor:1");
+ void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas);
+
+ void enable_3phase_updates(bool enable = true) {
auto cfg = make_config();
- cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
+ cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable);
configure_stripe(cfg);
+ }
+
+ std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates(enable_3phase);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
cb->start(_sender);
return cb;
@@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<PutCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
+ dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5));
if (!traceMsg.empty()) {
MBUS_TRACE(reply->getTrace(), 1, traceMsg);
}
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
update->setCreateIfNonExistent(options._createIfNonExistent);
document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId());
+ assert(id == _bucket_id);
document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId());
if (bucketState.length()) {
@@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste
_sender.getLastReply(true));
}
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=1/2/3");
+ op->start(_sender);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
+
+ replyToMessage(*op, _sender, 0, 110);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, _sender, 1, 110);
+
+ // Client operation itself should return success since the update went through on all replica nodes
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 110) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
void
TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const
@@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1);
}
+void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
+ op->start(_sender);
+
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*op, _sender, 0, 70);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO custom cancellation failure metric?
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true);
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false);
+}
+
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setup_stripe(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
@@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
EXPECT_EQ(1, m.ok.getValue());
}
+TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 2, 2000U);
+ ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO cancellation metrics?
+}
+
+TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+ replyToGet(*op, _sender, 2, 2000U);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0});
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+ replyToPut(*op, _sender, 3);
+ replyToPut(*op, _sender, 4);
+
+ // Update itself is ACKed
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 2000) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ // But cancelled replicas are not reintroduced into the bucket DB
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
+
TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
auto cb = set_up_2_inconsistent_replicas_and_start_update();
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
cb->start(_sender);
// Add new replica to deterministic test bucket after gets have been sent
- BucketId bucket(0x400000000000cac4); // Always the same in the test.
- addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+ addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3");
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index d0ae31b9524..fefc88a27c2 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -11,7 +11,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
using config::ConfigGetter;
using config::FileSpec;
@@ -29,11 +29,14 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil {
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _html_type;
+ UpdateOperationTest()
+ : _repo(std::make_shared<DocumentTypeRepo>(*ConfigGetter<DocumenttypesConfig>::
+ getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))),
+ _html_type(_repo->getDocumentType("text/html"))
+ {
+ }
+
void SetUp() override {
- _repo.reset(
- new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>::
- getConfig("config-doctypes", FileSpec("../config-doctypes.cfg"))));
- _html_type = _repo->getDocumentType("text/html");
createLinks();
}
@@ -241,4 +244,31 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest
EXPECT_EQ(2, m.diverging_timestamp_updates.getValue());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ std::shared_ptr<UpdateOperation> op = sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3");
+ DistributorMessageSenderStub sender;
+ op->start(sender);
+
+ ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bId), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ replyToMessage(*op, sender, 0, 120);
+ replyToMessage(*op, sender, 1, 120);
+ replyToMessage(*op, sender, 2, 120);
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x2,docs=4/4,bytes=6/6,trusted=true,active=false,ready=false)",
+ dumpBucket(_bId));
+}
+
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
index a8c21efa793..d2ff7b53403 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
@@ -142,7 +142,7 @@ BucketInfo::addNode(const BucketCopy& newCopy, const std::vector<uint16_t>& reco
bool
BucketInfo::removeNode(unsigned short node, TrustedUpdate update)
{
- for (auto iter = _nodes.begin(); iter != _nodes.end(); iter++) {
+ for (auto iter = _nodes.begin(); iter != _nodes.end(); ++iter) {
if (iter->getNode() == node) {
_nodes.erase(iter);
if (update == TrustedUpdate::UPDATE) {
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 184fee5d2c9..c889afcc77c 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -10,6 +10,7 @@ vespa_add_library(storage_distributor OBJECT
bucket_spaces_stats_provider.cpp
bucketgctimecalculator.cpp
bucketlistmerger.cpp
+ cancelled_replicas_pruner.cpp
clusterinformation.cpp
crypto_uuid_generator.cpp
distributor_bucket_space.cpp
diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp
new file mode 100644
index 00000000000..5e1bc402fc4
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp
@@ -0,0 +1,19 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cancelled_replicas_pruner.h"
+
+namespace storage::distributor {
+
+std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope) {
+ std::vector<BucketCopy> pruned_replicas;
+ // Expect that there will be an input entry for each cancelled node in the common case.
+ pruned_replicas.reserve((replicas.size() >= cancel_scope.cancelled_nodes().size())
+ ? replicas.size() - cancel_scope.cancelled_nodes().size() : 0);
+ for (auto& candidate : replicas) {
+ if (!cancel_scope.node_is_cancelled(candidate.getNode())) {
+ pruned_replicas.emplace_back(candidate);
+ }
+ }
+ return pruned_replicas;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h
new file mode 100644
index 00000000000..f12f78e569f
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h
@@ -0,0 +1,17 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/bucketdb/bucketcopy.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
+#include <span>
+#include <vector>
+
+namespace storage::distributor {
+
+/**
+ * Returns a new vector that contains all entries of `replicas` whose nodes are _not_ tagged as
+ * cancelled in `cancel_scope`. Returned entry ordering is identical to input ordering.
+ */
+[[nodiscard]] std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope);
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 243b3c5ecb2..ad1cce46bea 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -281,6 +281,7 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state
enterRecoveryMode();
// Clear all active messages on nodes that are down.
+ // TODO this should also be done on nodes that are no longer part of the config!
const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp
index 7b7c9f431f7..c92544c8cb5 100644
--- a/storage/src/vespa/storage/distributor/operationowner.cpp
+++ b/storage/src/vespa/storage/distributor/operationowner.cpp
@@ -73,7 +73,7 @@ OperationOwner::onClose()
void
OperationOwner::erase(api::StorageMessage::Id msgId)
{
- _sentMessageMap.pop(msgId);
+ (void)_sentMessageMap.pop(msgId);
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
index 5c6a1f3d84c..8cf0470f674 100644
--- a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_distributoroperation OBJECT
SOURCES
+ cancel_scope.cpp
operation.cpp
DEPENDS
)
diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp
new file mode 100644
index 00000000000..ef4d99aa58b
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp
@@ -0,0 +1,52 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cancel_scope.h"
+
+namespace storage::distributor {
+
+CancelScope::CancelScope()
+ : _cancelled_nodes(),
+ _ownership_lost(false)
+{
+}
+
+CancelScope::CancelScope(ownership_change_ctor_tag) noexcept
+ : _cancelled_nodes(),
+ _ownership_lost(true)
+{
+}
+
+CancelScope::CancelScope(CancelledNodeSet nodes) noexcept
+ : _cancelled_nodes(std::move(nodes)),
+ _ownership_lost(false)
+{
+}
+
+CancelScope::~CancelScope() = default;
+
+CancelScope::CancelScope(const CancelScope&) = default;
+CancelScope& CancelScope::operator=(const CancelScope&) = default;
+
+CancelScope::CancelScope(CancelScope&&) noexcept = default;
+CancelScope& CancelScope::operator=(CancelScope&&) noexcept = default;
+
+void CancelScope::add_cancelled_node(uint16_t node) {
+ _cancelled_nodes.insert(node);
+}
+
+void CancelScope::merge(const CancelScope& other) {
+ _ownership_lost |= other._ownership_lost;
+ // Not using iterator insert(first, last) since that explicitly resizes,
+ for (uint16_t node : other._cancelled_nodes) {
+ _cancelled_nodes.insert(node);
+ }
+}
+
+CancelScope CancelScope::of_ownership_change() noexcept {
+ return CancelScope(ownership_change_ctor_tag{});
+}
+
+CancelScope CancelScope::of_node_subset(CancelledNodeSet nodes) noexcept {
+ return CancelScope(std::move(nodes));
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.h b/storage/src/vespa/storage/distributor/operations/cancel_scope.h
new file mode 100644
index 00000000000..e7da26c600a
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.h
@@ -0,0 +1,59 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/hash_set.h>
+
+namespace storage::distributor {
+
+/**
+ * In the face of concurrent cluster state changes, cluster topology reconfigurations etc.,
+ * it's possible for there to be pending mutating operations to nodes that the distributor
+ * no longer should keep track of. Such operations must therefore be _cancelled_, either
+ * fully or partially. A CancelScope represents the granularity at which an operation should
+ * be cancelled.
+ *
+ * In the case of one or more nodes becoming unavailable, `fully_cancelled()` will be false
+ * and `node_is_cancelled(x)` will return whether node `x` is explicitly cancelled.
+ *
+ * In the case of ownership transfers, `fully_cancelled()` will be true since the distributor
+ * should no longer have any knowledge of the bucket. `node_is_cancelled(x)` is always
+ * implicitly true for all values of `x` for full cancellations.
+ */
+class CancelScope {
+public:
+ using CancelledNodeSet = vespalib::hash_set<uint16_t>;
+private:
+ CancelledNodeSet _cancelled_nodes;
+ bool _ownership_lost;
+
+ struct ownership_change_ctor_tag {};
+
+ explicit CancelScope(ownership_change_ctor_tag) noexcept;
+ explicit CancelScope(CancelledNodeSet nodes) noexcept;
+public:
+ CancelScope();
+ ~CancelScope();
+
+ CancelScope(const CancelScope&);
+ CancelScope& operator=(const CancelScope&);
+
+ CancelScope(CancelScope&&) noexcept;
+ CancelScope& operator=(CancelScope&&) noexcept;
+
+ void add_cancelled_node(uint16_t node);
+ void merge(const CancelScope& other);
+
+ [[nodiscard]] bool fully_cancelled() const noexcept { return _ownership_lost; }
+ [[nodiscard]] bool node_is_cancelled(uint16_t node) const noexcept {
+ return (fully_cancelled() || _cancelled_nodes.contains(node));
+ }
+
+ [[nodiscard]] const CancelledNodeSet& cancelled_nodes() const noexcept {
+ return _cancelled_nodes;
+ }
+
+ static CancelScope of_ownership_change() noexcept;
+ static CancelScope of_node_subset(CancelledNodeSet nodes) noexcept;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index 0e12e3e3019..bd7f3709575 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -104,7 +104,12 @@ CheckCondition::handle_reply(DistributorStripeMessageSender& sender,
}
}
-void CheckCondition::cancel(DistributorStripeMessageSender& sender) {
+void CheckCondition::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
+ _cond_get_op->cancel(proxy_sender, cancel_scope);
+}
+
+void CheckCondition::close(DistributorStripeMessageSender& sender) {
IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
_cond_get_op->onClose(proxy_sender);
// We don't propagate any generated reply from the GetOperation, as its existence
@@ -163,6 +168,12 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
reply->steal_trace());
return;
}
+ if (_cond_get_op->is_cancelled()) {
+ _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED,
+ "Operation has been cancelled (likely due to a cluster state change)"),
+ reply->steal_trace());
+ return;
+ }
auto state_version_now = _bucket_space.getClusterState().getVersion();
if (_bucket_space.has_pending_cluster_state()) {
state_version_now = _bucket_space.get_pending_cluster_state().getVersion();
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
index 999b79adc3d..92a8bc62ae6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
@@ -17,6 +17,7 @@ namespace storage::api { class StorageReply; }
namespace storage::distributor {
+class CancelScope;
class DistributorBucketSpace;
class DistributorNodeContext;
class DistributorStripeMessageSender;
@@ -122,7 +123,8 @@ public:
void start_and_send(DistributorStripeMessageSender& sender);
void handle_reply(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply);
- void cancel(DistributorStripeMessageSender& sender);
+ void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+ void close(DistributorStripeMessageSender& sender);
[[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept {
return _outcome;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 854e7d15f82..e7832fd19e5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -109,6 +109,8 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc
bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const {
// TODO handle this explicitly as part of operation abort/cancel edge
+ // -> we have yet to send anything at this point
+ // -> shouldn't ExternalOperationHandler deal with this before starting the op?
auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace());
if (!pending_state) {
return false;
@@ -245,6 +247,15 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen
_msg = std::shared_ptr<api::PutCommand>();
}
+void
+PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope)
+{
+ if (_check_condition) {
+ _check_condition->cancel(sender, cancel_scope);
+ }
+ _tracker.cancel(cancel_scope);
+}
+
bool
PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const
{
@@ -302,7 +313,7 @@ void
PutOperation::onClose(DistributorStripeMessageSender& sender)
{
if (_check_condition) {
- _check_condition->cancel(sender);
+ _check_condition->close(sender);
}
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 635accc1865..8b8e3e15375 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -60,6 +60,8 @@ private:
void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId,
uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch);
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
+
[[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const;
[[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 42d8e318f47..be43aac3d9e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -156,9 +156,21 @@ void RemoveOperation::on_completed_check_condition(CheckCondition::Outcome& outc
void
RemoveOperation::onClose(DistributorStripeMessageSender& sender)
{
+ if (_check_condition) {
+ _check_condition->close(sender);
+ }
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+void
+RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope)
+{
+ if (_check_condition) {
+ _check_condition->cancel(sender, cancel_scope);
+ }
+ _tracker.cancel(cancel_scope);
+}
+
bool RemoveOperation::has_condition() const noexcept {
return _msg->hasTestAndSetCondition();
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 9f3a98294ea..221def81fdc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -29,6 +29,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
private:
PersistenceMessageTrackerImpl _tracker_instance;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 73c65f54b21..2d1c469d072 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -13,6 +13,7 @@
#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/vespalib/stllike/hash_set.hpp>
#include <cinttypes>
#include <vespa/log/log.h>
@@ -68,10 +69,8 @@ TwoPhaseUpdateOperation::stateToString(SendState state) noexcept
case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT";
case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT";
case SendState::PUTS_SENT: return "PUTS_SENT";
- default:
- assert(!"Unknown state");
- return "";
}
+ abort();
}
void
@@ -130,7 +129,7 @@ TwoPhaseUpdateOperation::get_bucket_database_entries() const
}
bool
-TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const
+TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries)
{
// Fast path iff bucket exists AND is consistent (split and copies).
if (entries.size() != 1) {
@@ -245,6 +244,16 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorStripeM
}
void
+TwoPhaseUpdateOperation::send_operation_cancelled_reply(DistributorStripeMessageSender& sender)
+{
+ sendReplyWithResult(sender,
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
+ "The update operation was cancelled due to a cluster state change "
+ "between executing the read and write phases of a write-repair "
+ "update"));
+}
+
+void
TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorStripeMessageSender& sender)
{
sendReplyWithResult(sender,
@@ -257,7 +266,8 @@ void
TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc,
api::Timestamp putTimestamp, DistributorStripeMessageSender& sender)
{
- if (lostBucketOwnershipBetweenPhases()) {
+ assert(!is_cancelled());
+ if (lostBucketOwnershipBetweenPhases()) { // TODO deprecate with cancellation
sendLostOwnershipTransientErrorReply(sender);
return;
}
@@ -281,6 +291,8 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
void
TwoPhaseUpdateOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
+ // In the case of cancellations, we let existing operations complete, but must not
+ // start new ones that are unaware of the cancellations.
if (_mode == Mode::FAST_PATH) {
handleFastPathReceive(sender, msg);
} else {
@@ -304,7 +316,10 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
sendReplyWithResult(sender, getReply.getResult());
return;
}
-
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
if (!getReply.getDocument().get()) {
// Weird, document is no longer there ... Just fail.
sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, ""));
@@ -316,7 +331,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
assert(callback.get());
- Operation & callbackOp = *callback;
+ Operation& callbackOp = *callback;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender);
callbackOp.receive(intermediate, msg);
@@ -326,12 +341,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
addTraceFromReply(*intermediate._reply);
auto& cb = dynamic_cast<UpdateOperation&>(callbackOp);
- std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation();
+ auto [newest_bucket, newest_node] = cb.getNewestTimestampLocation();
auto intermediate_update_reply = std::dynamic_pointer_cast<api::UpdateReply>(intermediate._reply);
assert(intermediate_update_reply);
if (!intermediate_update_reply->getResult().success() ||
- bestNode.first == document::BucketId(0))
+ (newest_bucket == document::BucketId(0)))
{
if (intermediate_update_reply->getResult().success() &&
(intermediate_update_reply->getOldTimestamp() == 0))
@@ -343,9 +358,14 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
} else {
LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str());
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
+
_updateReply = std::move(intermediate_update_reply);
- _fast_path_repair_source_node = bestNode.second;
- document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first);
+ _fast_path_repair_source_node = newest_node;
+ document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_bucket);
auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), document::AllFields::NAME);
copyMessageSettings(*_updateCmd, *cmd);
@@ -383,7 +403,7 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorStripeMessageSender& s
callbackOp.receive(intermediate, msg);
if (!intermediate._reply.get()) {
- return; // Not enough replies received yet or we're draining callbacks.
+ return; // Not enough replies received yet, or we're draining callbacks.
}
addTraceFromReply(*intermediate._reply);
if (_sendState == SendState::METADATA_GETS_SENT) {
@@ -445,6 +465,13 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
"One or more metadata Get operations failed; aborting Update"));
return;
}
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
+ // Replicas _removed_ is handled by cancellation, but a concurrent state change may happen
+ // that _adds_ one or more available content nodes, which we cannot then blindly write to.
+ // So we have to explicitly check this edge case.
if (!replica_set_unchanged_after_get_operation()) {
// Use BUCKET_NOT_FOUND to trigger a silent retry.
LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str());
@@ -490,6 +517,10 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSende
sendReplyWithResult(sender, reply.getResult());
return;
}
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
// Single Get could technically be considered consistent with itself, so make
// sure we never treat that as sufficient for restarting in the fast path.
if ((_sendState != SendState::SINGLE_GET_SENT) && may_restart_with_fast_path(reply)) {
@@ -558,7 +589,8 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender) {
LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
update_doc_id().c_str());
- if (lostBucketOwnershipBetweenPhases()) {
+ assert(!is_cancelled());
+ if (lostBucketOwnershipBetweenPhases()) { // TODO remove once cancellation is wired
sendLostOwnershipTransientErrorReply(sender);
return;
}
@@ -579,7 +611,7 @@ TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorStripeMessageSen
std::unique_ptr<document::select::Node> selection;
try {
- selection = _parser.parse_selection(_updateCmd->getCondition().getSelection());
+ selection = _parser.parse_selection(_updateCmd->getCondition().getSelection());
} catch (const document::select::ParsingFailedException & e) {
sendReplyWithResult(sender, api::ReturnCode(
api::ReturnCode::ILLEGAL_PARAMETERS,
@@ -679,6 +711,22 @@ TwoPhaseUpdateOperation::onClose(DistributorStripeMessageSender& sender) {
}
}
+void
+TwoPhaseUpdateOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ // We have to explicitly cancel any and all pending Operation instances that have been
+ // launched by this operation. This is to ensure any DB updates they may transitively
+ // perform are aware of all cancellations that have occurred.
+ // There may be many messages pending for any given operation, so unique-ify them prior
+ // to avoid duplicate cancellation invocations.
+ vespalib::hash_set<Operation*> ops;
+ for (auto& msg_op : _sentMessageMap) {
+ ops.insert(msg_op.second.get());
+ }
+ for (auto* op : ops) {
+ op->cancel(sender, cancel_scope);
+ }
+}
+
vespalib::string TwoPhaseUpdateOperation::update_doc_id() const {
assert(_updateCmd.get() != nullptr);
return _updateCmd->getDocumentId().toString();
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index d2ad5359fa6..7f64bb8d56c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -71,6 +71,8 @@ public:
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
+
private:
enum class SendState {
NONE_SENT,
@@ -94,19 +96,20 @@ private:
void sendReplyWithResult(DistributorStripeMessageSender&, const api::ReturnCode&);
void ensureUpdateReplyCreated();
- std::vector<BucketDatabase::Entry> get_bucket_database_entries() const;
- bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const;
+ [[nodiscard]] std::vector<BucketDatabase::Entry> get_bucket_database_entries() const;
+ [[nodiscard]] static bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries);
void startFastPathUpdate(DistributorStripeMessageSender& sender, std::vector<BucketDatabase::Entry> entries);
void startSafePathUpdate(DistributorStripeMessageSender&);
- bool lostBucketOwnershipBetweenPhases() const;
+ [[nodiscard]] bool lostBucketOwnershipBetweenPhases() const;
void sendLostOwnershipTransientErrorReply(DistributorStripeMessageSender&);
+ void send_operation_cancelled_reply(DistributorStripeMessageSender& sender);
void send_feed_blocked_error_reply(DistributorStripeMessageSender& sender);
void schedulePutsWithUpdatedDocument(
std::shared_ptr<document::Document>,
api::Timestamp,
DistributorStripeMessageSender&);
void applyUpdateToDocument(document::Document&) const;
- std::shared_ptr<document::Document> createBlankDocument() const;
+ [[nodiscard]] std::shared_ptr<document::Document> createBlankDocument() const;
void setUpdatedForTimestamp(api::Timestamp);
void handleFastPathReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>&);
@@ -120,20 +123,20 @@ private:
void handle_safe_path_received_single_full_get(DistributorStripeMessageSender&, api::GetReply&);
void handleSafePathReceivedGet(DistributorStripeMessageSender&, api::GetReply&);
void handleSafePathReceivedPut(DistributorStripeMessageSender&, const api::PutReply&);
- bool shouldCreateIfNonExistent() const;
+ [[nodiscard]] bool shouldCreateIfNonExistent() const;
bool processAndMatchTasCondition(
DistributorStripeMessageSender& sender,
const document::Document& candidateDoc);
- bool satisfiesUpdateTimestampConstraint(api::Timestamp) const;
+ [[nodiscard]] bool satisfiesUpdateTimestampConstraint(api::Timestamp) const;
void addTraceFromReply(api::StorageReply& reply);
- bool hasTasCondition() const noexcept;
+ [[nodiscard]] bool hasTasCondition() const noexcept;
void replyWithTasFailure(DistributorStripeMessageSender& sender,
vespalib::stringref message);
bool may_restart_with_fast_path(const api::GetReply& reply);
- bool replica_set_unchanged_after_get_operation() const;
+ [[nodiscard]] bool replica_set_unchanged_after_get_operation() const;
void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender);
// Precondition: reply has not yet been sent.
- vespalib::string update_doc_id() const;
+ [[nodiscard]] vespalib::string update_doc_id() const;
using ReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index f43a6092372..60bddebbb89 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -207,6 +207,13 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender)
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+void
+UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope)
+{
+ _tracker.cancel(cancel_scope);
+}
+
+
// The backend behavior of "create-if-missing" updates is to return the timestamp of the
// _new_ update operation if the document was created from scratch. The two-phase update
// operation logic auto-detects unexpected inconsistencies and tries to reconcile
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 96fd878a324..7d2131d426d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -31,6 +31,7 @@ public:
std::string getStatus() const override { return ""; };
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const {
return _newestTimestampLocation;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 2e6d0e95ec9..5d47fb4c337 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "garbagecollectionoperation.h"
+#include <vespa/storage/distributor/cancelled_replicas_pruner.h>
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -22,6 +23,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
_cluster_state_version_at_phase1_start_time(0),
_remove_candidates(),
_replica_info(),
+ _cancel_scope(),
_max_documents_removed(0),
_is_done(false)
{}
@@ -148,6 +150,14 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender,
}
}
+void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) {
+ if (!_cancel_scope) {
+ _cancel_scope = cancel_scope;
+ } else {
+ _cancel_scope->merge(cancel_scope);
+ }
+}
+
void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
_replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
from_node, reply.getBucketInfo());
@@ -186,6 +196,11 @@ bool GarbageCollectionOperation::may_start_write_phase() const {
if (!_ok) {
return false; // Already broken, no reason to proceed.
}
+ if (is_cancelled()) {
+ LOG(debug, "GC(%s): not sending write phase; operation has been explicitly cancelled",
+ getBucket().toString().c_str());
+ return false;
+ }
const auto state_version_now = _bucketSpace->getClusterState().getVersion();
if ((state_version_now != _cluster_state_version_at_phase1_start_time) ||
_bucketSpace->has_pending_cluster_state())
@@ -250,9 +265,18 @@ void GarbageCollectionOperation::update_last_gc_timestamp_in_db() {
}
void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
- // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
- _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
- update_last_gc_timestamp_in_db();
+ if (_cancel_scope) {
+ if (_cancel_scope->fully_cancelled()) {
+ return;
+ } else {
+ _replica_info = prune_cancelled_nodes(_replica_info, *_cancel_scope);
+ }
+ }
+ if (!_replica_info.empty()) {
+ // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
+ _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
+ update_last_gc_timestamp_in_db();
+ } // else: effectively fully cancelled, no touching the DB.
}
void GarbageCollectionOperation::update_gc_metrics() {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 27dc519dcc2..098ae50368b 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -3,11 +3,13 @@
#include "idealstateoperation.h"
#include <vespa/document/base/documentid.h>
+#include <vespa/persistence/spi/id_and_timestamp.h>
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
-#include <vespa/persistence/spi/id_and_timestamp.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/vespalib/stllike/hash_map.h>
+#include <optional>
#include <vector>
namespace storage::distributor {
@@ -22,13 +24,14 @@ public:
void onStart(DistributorStripeMessageSender& sender) override;
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
const char* getName() const noexcept override { return "garbagecollection"; };
Type getType() const noexcept override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
- bool is_two_phase() const noexcept {
+ [[nodiscard]] bool is_two_phase() const noexcept {
return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase));
}
- bool is_done() const noexcept { return _is_done; }
+ [[nodiscard]] bool is_done() const noexcept { return _is_done; }
protected:
MessageTracker _tracker;
@@ -54,13 +57,14 @@ private:
RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
+ std::optional<CancelScope> _cancel_scope;
uint32_t _max_documents_removed;
bool _is_done;
static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply);
void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
- std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
+ [[nodiscard]] std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
void handle_ok_phase1_reply(api::RemoveLocationReply& reply);
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index 4d82de170ae..9f944a94178 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -12,7 +12,8 @@ LOG_SETUP(".distributor.callback");
namespace storage::distributor {
Operation::Operation()
- : _startTime()
+ : _startTime(),
+ _cancelled(false)
{
}
@@ -45,6 +46,11 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo
target.setPriority(source.getPriority());
}
+void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ _cancelled = true;
+ on_cancel(sender, cancel_scope);
+}
+
void
Operation::on_blocked()
{
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index bc7e510a5b6..1154cead61f 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -16,6 +16,7 @@ class StorageComponent;
namespace distributor {
+class CancelScope;
class DistributorStripeOperationContext;
class PendingMessageTracker;
class OperationSequencer;
@@ -40,7 +41,7 @@ public:
on the owner of the message that was replied to.
*/
virtual void receive(DistributorStripeMessageSender& sender,
- const std::shared_ptr<api::StorageReply> & msg)
+ const std::shared_ptr<api::StorageReply> & msg)
{
onReceive(sender, msg);
}
@@ -60,6 +61,21 @@ public:
void start(DistributorStripeMessageSender& sender);
/**
+ * Explicitly cancel the operation. Cancelled operations may or may not (depending on
+ * the operation implementation) be immediately aborted, but they should either way
+ * never insert any bucket information into the bucket DB after cancel() has been called.
+ */
+ void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+
+ /**
+ * Whether cancel() has been invoked at least once on this instance. This does not
+ * distinguish between cancellations caused by ownership transfers and those caused
+ * by nodes becoming unavailable; Operation implementations that care about this need
+ * to implement cancel() themselves and inspect the provided CancelScope.
+ */
+ [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; }
+
+ /**
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
@@ -93,8 +109,15 @@ private:
const std::shared_ptr<api::StorageReply> & msg) = 0;
protected:
+ virtual void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ (void)sender;
+ (void)cancel_scope;
+ }
+
static constexpr vespalib::duration MAX_TIMEOUT = 3600s;
+
vespalib::system_time _startTime;
+ bool _cancelled;
};
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index a4295613fd2..bcb849ac840 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -1,10 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "persistencemessagetracker.h"
+#include "cancelled_replicas_pruner.h"
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/storageapi/message/persistence.h>
+#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".persistencemessagetracker");
@@ -18,12 +20,15 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
DistributorStripeOperationContext& op_ctx,
api::Timestamp revertTimestamp)
: MessageTracker(node_ctx),
+ _remapBucketInfo(),
+ _bucketInfo(),
_metric(metric),
_reply(std::move(reply)),
_op_ctx(op_ctx),
_revertTimestamp(revertTimestamp),
_trace(_reply->getTrace().getLevel()),
_requestTimer(node_ctx.clock()),
+ _cancel_scope(),
_n_persistence_replies_total(0),
_n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
@@ -34,8 +39,37 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default;
void
+PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope)
+{
+ if (!_cancel_scope) {
+ _cancel_scope = cancel_scope;
+ } else {
+ _cancel_scope->merge(cancel_scope);
+ }
+}
+
+void
+PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present(
+ BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope)
+{
+ for (auto& info : bucket_and_replicas) {
+ info.second = prune_cancelled_nodes(info.second, cancel_scope);
+ }
+}
+
+void
PersistenceMessageTrackerImpl::updateDB()
{
+ if (_cancel_scope) {
+ if (_cancel_scope->fully_cancelled()) {
+ return; // Fully cancelled ops cannot mutate the DB at all
+ } else {
+ prune_cancelled_nodes_if_present(_bucketInfo, *_cancel_scope);
+ prune_cancelled_nodes_if_present(_remapBucketInfo, *_cancel_scope);
+ }
+ }
+
for (const auto & entry : _bucketInfo) {
_op_ctx.update_bucket_database(entry.first, entry.second);
}
@@ -229,12 +263,22 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r
_success = false;
}
+bool
+PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept
+{
+ if (!_cancel_scope) {
+ return false;
+ }
+ return _cancel_scope->node_is_cancelled(node); // Implicitly covers the fully cancelled case
+}
+
void
PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node)
{
LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node);
if (!reply.getResult().success()
- && reply.getResult().getResult() != api::ReturnCode::EXISTS)
+ && (reply.getResult().getResult() != api::ReturnCode::EXISTS)
+ && !node_is_effectively_cancelled(node))
{
LOG(spam, "Create bucket reply failed, so deleting it from bucket db");
// We don't know if the bucket exists at this point, so we remove it from the DB.
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 9b06547dd98..36f35ddb2b4 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -4,9 +4,11 @@
#include "distributor_stripe_component.h"
#include "distributormetricsset.h"
#include "messagetracker.h"
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageapi/messageapi/bucketinfocommand.h>
#include <vespa/storageapi/messageapi/bucketinforeply.h>
+#include <optional>
namespace storage::distributor {
@@ -14,6 +16,7 @@ struct PersistenceMessageTracker {
virtual ~PersistenceMessageTracker() = default;
using ToSend = MessageTracker::ToSend;
+ virtual void cancel(const CancelScope& cancel_scope) = 0;
virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
virtual void queueMessageBatch(std::vector<ToSend> messages) = 0;
virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0;
@@ -26,14 +29,9 @@ struct PersistenceMessageTracker {
};
class PersistenceMessageTrackerImpl final
- : public PersistenceMessageTracker,
- public MessageTracker
+ : public PersistenceMessageTracker,
+ public MessageTracker
{
-private:
- using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
- BucketInfoMap _remapBucketInfo;
- BucketInfoMap _bucketInfo;
-
public:
PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric,
std::shared_ptr<api::BucketInfoReply> reply,
@@ -42,6 +40,8 @@ public:
api::Timestamp revertTimestamp = 0);
~PersistenceMessageTrackerImpl() override;
+ void cancel(const CancelScope& cancel_scope) override;
+
void updateDB();
void updateMetrics();
[[nodiscard]] bool success() const noexcept { return _success; }
@@ -67,8 +67,11 @@ public:
void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override;
private:
- using MessageBatch = std::vector<uint64_t>;
+ using MessageBatch = std::vector<uint64_t>;
+ using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
+ BucketInfoMap _remapBucketInfo;
+ BucketInfoMap _bucketInfo;
std::vector<MessageBatch> _messageBatches;
PersistenceOperationMetricSet& _metric;
std::shared_ptr<api::BucketInfoReply> _reply;
@@ -77,20 +80,24 @@ private:
std::vector<BucketNodePair> _revertNodes;
mbus::Trace _trace;
framework::MilliSecTimer _requestTimer;
+ std::optional<CancelScope> _cancel_scope;
uint32_t _n_persistence_replies_total;
uint32_t _n_successful_persistence_replies;
uint8_t _priority;
bool _success;
- bool canSendReplyEarly() const;
+ static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope);
+ [[nodiscard]] bool canSendReplyEarly() const;
void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply);
void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const;
- bool hasSentReply() const noexcept { return !_reply; }
- bool shouldRevert() const;
- bool has_majority_successful_replies() const noexcept;
- bool has_minority_test_and_set_failure() const noexcept;
+ [[nodiscard]] bool hasSentReply() const noexcept { return !_reply; }
+ [[nodiscard]] bool shouldRevert() const;
+ [[nodiscard]] bool has_majority_successful_replies() const noexcept;
+ [[nodiscard]] bool has_minority_test_and_set_failure() const noexcept;
void sendReply(MessageSender& sender);
void updateFailureResult(const api::BucketInfoReply& reply);
+ [[nodiscard]] bool node_is_effectively_cancelled(uint16_t node) const noexcept;
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);
void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node);
void transfer_trace_state_to_reply();
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 70bee311f78..951ed6a6877 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -10,19 +10,23 @@ class Operation;
class SentMessageMap {
public:
+ using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
+
SentMessageMap();
~SentMessageMap();
- std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
- std::shared_ptr<Operation> pop();
+ [[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
+ [[nodiscard]] std::shared_ptr<Operation> pop();
void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg);
void clear();
- uint32_t size() const { return _map.size(); }
+ [[nodiscard]] uint32_t size() const { return _map.size(); }
[[nodiscard]] bool empty() const noexcept { return _map.empty(); }
std::string toString() const;
+
+ Map::const_iterator begin() const noexcept { return _map.cbegin(); }
+ Map::const_iterator end() const noexcept { return _map.cend(); }
private:
- using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
Map _map;
};