aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-08-22 10:40:58 +0200
committerGitHub <noreply@github.com>2023-08-22 10:40:58 +0200
commitf816fdb94db2a8cd2cdbcb4f639caa5fad769a39 (patch)
tree358ca77f9869cb634f9cf2b03b4c55318b6beb9f /storage
parent49fc314d4366d0ef011aeff82d721679080f2d9f (diff)
parent15e28e09b7ef78530f3821a82a3743b9dcd3c284 (diff)
Merge pull request #28086 from vespa-engine/vekterli/distributor-operation-cancelling
Implement edge-triggered distributor operation cancelling
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp17
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp60
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp155
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp40
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp169
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp40
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp22
-rw-r--r--storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h17
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operationowner.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/operations/cancel_scope.cpp52
-rw-r--r--storage/src/vespa/storage/distributor/operations/cancel_scope.h62
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp76
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h21
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp25
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h11
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/operation.h26
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp38
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h32
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h12
32 files changed, 854 insertions, 89 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
index 757a9329ea6..617401dd271 100644
--- a/storage/src/tests/distributor/check_condition_test.cpp
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
#include <vespa/storage/distributor/node_supported_features.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/check_condition.h>
#include <vespa/storage/distributor/persistence_operation_metric_set.h>
#include <vespa/storageapi/message/persistence.h>
@@ -227,6 +228,20 @@ TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) {
});
}
+TEST_F(CheckConditionTest, check_fails_if_condition_explicitly_cancelled) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.cancel(_sender, CancelScope::of_fully_cancelled());
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::ABORTED);
+ });
+}
+
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -242,6 +257,7 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -255,6 +271,7 @@ TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 9b5056f2066..b1cf1cbc636 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -113,9 +113,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
ASSERT_EQ(entry->getNodeCount(), info.size());
EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
for (size_t i = 0; i < info.size(); ++i) {
- EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo())
- << "Mismatching info for node " << i << ": " << info[i] << " vs "
- << entry->getNode(i)->getBucketInfo();
+ auto& node = entry->getNodeRef(i);
+ EXPECT_EQ(info[i], node.getBucketInfo()) << "Mismatching DB bucket info for node " << node.getNode();
}
}
@@ -172,6 +171,51 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until
EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics
}
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_if_operation_fully_canceled) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged. Note that in a real scenario, the DB entry will have been removed
+ // as part of the ownership change, but there are already non-cancellation behaviors that
+ // avoid creating buckets from scratch in the DB if they do not exist, so just checking to
+ // see if the bucket exists or not risks hiding missing cancellation edge handling.
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+ // However, we still update our metrics if we _did_ remove documents on one or more nodes
+ EXPECT_EQ(70u, gc_removed_documents_metric());
+}
+
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_for_cancelled_node) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for node 0, changed for node 1
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(4567, 90, 500)}, 34));
+}
+
+TEST_F(GarbageCollectionOperationTest, node_cancellation_is_cumulative) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for both nodes
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+}
+
TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
auto op = create_op();
op->start(_sender);
@@ -363,6 +407,16 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in
receive_phase1_replies_and_assert_no_phase_2_started();
}
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_fully_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_fully_cancelled());
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_partially_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_node_subset({0}));
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
enable_two_phase_gc();
auto op = create_op();
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 76b6741442e..ee87fe84df6 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
@@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R
_sender.getCommands(true, true, 4));
}
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+
+ // DB is not touched (note: normally node 1 would be removed at the cancel-edge).
+ ASSERT_EQ("BucketId(0x4000000000008f09) : "
+ "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), "
+ "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)",
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
+ // No new requests sent
+ ASSERT_EQ("", _sender.getCommands(true, true, 4));
+}
+
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+ sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0
+
+ // Bucket info recheck only sent to node 1, as it's not cancelled
+ ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1",
+ _sender.getCommands(true, true, 4));
+}
+
TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cfg = make_config();
@@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck
_sender.getLastReply());
}
+TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ createAndSendSampleDocument(TIMEOUT);
+
+ ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 0,"
+ "Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 1",
+ _sender.getCommands(true, true));
+
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+
+ sendReply(0);
+ sendReply(1);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, storage_failed) {
setup_stripe(2, 1, "storage:1 distributor:1");
@@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
- std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
ASSERT_TRUE(sreply);
sreply->remapBucketId(document::BucketId(17, 13));
@@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
dumpBucket(document::BucketId(17, 13)));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) {
setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
@@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
@@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
// TODO probably also do this for updates and removes
// TODO consider if we should use the pending state verbatim for computing targets if it exists
+// TODO make this redundant through operation cancelling
+// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this??
TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "test");
@@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state
_sender.getLastReply(true));
}
+TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2});
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+
+ // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless
+ // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path.
+ {
+ std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
+ auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
+ ASSERT_TRUE(sreply);
+ sreply->remapBucketId(remap_bucket);
+ sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5));
+ op->receive(_sender, reply);
+ }
+
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9));
+
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket));
+ EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket));
+}
+
+TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7));
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)",
+ dumpBucket(bucket));
+}
+
TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
setup_stripe(Redundancy(2), NodeCount(2),
"distributor:1 storage:2 .0.s:r .1.s:r");
@@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) {
_sender.getLastReply());
}
+TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill
+ // the write phase for all replicas.
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) {
setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
auto doc = createDummyDocument("test", "test");
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index d169c80a95d..3fad2c194a2 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -68,6 +68,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
std::unique_ptr<api::StorageReply> reply(removec->makeReply());
auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get());
removeR->setOldTimestamp(oldTimestamp);
+ removeR->setBucketInfo(api::BucketInfo(1,2,3,4,5));
callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release()));
}
@@ -307,6 +308,45 @@ TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_err
_sender.getLastReply());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(ExtRemoveOperationTest, cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 50, false, true));
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ reply_with(make_get_reply(1, 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(ExtRemoveOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT));
+ ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucketId), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, 0, 50);
+ replyToMessage(*op, 1, 50);
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(bucketId));
+ // Reply is still OK since the operation went through on the content nodes
+ ASSERT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, timestamp 100, removed doc from 50) ReturnCode(NONE)",
+ _sender.getLastReply());
+
+}
+
TEST_F(ExtRemoveOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) {
ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index da32225cde3..1907335545a 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -4,16 +4,16 @@
#include <vespa/config/helper/configgetter.h>
#include <vespa/document/base/testdocrepo.h>
#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/intfieldvalue.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/update/arithmeticvalueupdate.h>
-#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h>
+#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storageapi/message/persistence.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
#include <gmock/gmock.h>
namespace storage::distributor {
@@ -30,8 +30,9 @@ using namespace ::testing;
struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
document::TestDocRepo _testRepo;
std::shared_ptr<const DocumentTypeRepo> _repo;
- const DocumentType* _doc_type;
+ const DocumentType* _doc_type{nullptr};
DistributorMessageSenderStub _sender;
+ BucketId _bucket_id{0x400000000000cac4};
TwoPhaseUpdateOperationTest();
~TwoPhaseUpdateOperationTest() override;
@@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
void checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const;
- std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
+ static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
void SetUp() override {
_repo = _testRepo.getTypeRepoSp();
@@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
close();
}
- void replyToMessage(Operation& callback,
- DistributorMessageSenderStub& sender,
- uint32_t index,
- uint64_t oldTimestamp,
- api::ReturnCode::Result result = api::ReturnCode::OK);
+ static void replyToMessage(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t oldTimestamp,
+ api::ReturnCode::Result result = api::ReturnCode::OK);
- void replyToPut(
+ static void replyToPut(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void replyToCreateBucket(
+ static void replyToCreateBucket(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void reply_to_metadata_get(
+ static void reply_to_metadata_get(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& trace_msg = "");
- void reply_to_get_with_tombstone(
+ static void reply_to_get_with_tombstone(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
Timestamp highest_get_timestamp,
Timestamp expected_response_timestamp);
- std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
- setup_stripe(2, 2, "storage:2 distributor:1");
+ void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas);
+
+ void enable_3phase_updates(bool enable = true) {
auto cfg = make_config();
- cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
+ cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable);
configure_stripe(cfg);
+ }
+
+ std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates(enable_3phase);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
cb->start(_sender);
return cb;
@@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<PutCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
+ dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5));
if (!traceMsg.empty()) {
MBUS_TRACE(reply->getTrace(), 1, traceMsg);
}
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
update->setCreateIfNonExistent(options._createIfNonExistent);
document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId());
+ assert(id == _bucket_id);
document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId());
if (bucketState.length()) {
@@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste
_sender.getLastReply(true));
}
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=1/2/3");
+ op->start(_sender);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
+
+ replyToMessage(*op, _sender, 0, 110);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, _sender, 1, 110);
+
+ // Client operation itself should return success since the update went through on all replica nodes
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 110) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
void
TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const
@@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1);
}
+void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
+ op->start(_sender);
+
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*op, _sender, 0, 70);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO custom cancellation failure metric?
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true);
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false);
+}
+
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setup_stripe(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
@@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
EXPECT_EQ(1, m.ok.getValue());
}
+TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 2, 2000U);
+ ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO cancellation metrics?
+}
+
+TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+ replyToGet(*op, _sender, 2, 2000U);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0});
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+ replyToPut(*op, _sender, 3);
+ replyToPut(*op, _sender, 4);
+
+ // Update itself is ACKed
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 2000) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ // But cancelled replicas are not reintroduced into the bucket DB
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
+
TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
auto cb = set_up_2_inconsistent_replicas_and_start_update();
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
cb->start(_sender);
// Add new replica to deterministic test bucket after gets have been sent
- BucketId bucket(0x400000000000cac4); // Always the same in the test.
- addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+ addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3");
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index d0ae31b9524..fefc88a27c2 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -11,7 +11,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
using config::ConfigGetter;
using config::FileSpec;
@@ -29,11 +29,14 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil {
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _html_type;
+ UpdateOperationTest()
+ : _repo(std::make_shared<DocumentTypeRepo>(*ConfigGetter<DocumenttypesConfig>::
+ getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))),
+ _html_type(_repo->getDocumentType("text/html"))
+ {
+ }
+
void SetUp() override {
- _repo.reset(
- new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>::
- getConfig("config-doctypes", FileSpec("../config-doctypes.cfg"))));
- _html_type = _repo->getDocumentType("text/html");
createLinks();
}
@@ -241,4 +244,31 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest
EXPECT_EQ(2, m.diverging_timestamp_updates.getValue());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ std::shared_ptr<UpdateOperation> op = sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3");
+ DistributorMessageSenderStub sender;
+ op->start(sender);
+
+ ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bId), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ replyToMessage(*op, sender, 0, 120);
+ replyToMessage(*op, sender, 1, 120);
+ replyToMessage(*op, sender, 2, 120);
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x2,docs=4/4,bytes=6/6,trusted=true,active=false,ready=false)",
+ dumpBucket(_bId));
+}
+
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
index a8c21efa793..d2ff7b53403 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
@@ -142,7 +142,7 @@ BucketInfo::addNode(const BucketCopy& newCopy, const std::vector<uint16_t>& reco
bool
BucketInfo::removeNode(unsigned short node, TrustedUpdate update)
{
- for (auto iter = _nodes.begin(); iter != _nodes.end(); iter++) {
+ for (auto iter = _nodes.begin(); iter != _nodes.end(); ++iter) {
if (iter->getNode() == node) {
_nodes.erase(iter);
if (update == TrustedUpdate::UPDATE) {
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 184fee5d2c9..c889afcc77c 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -10,6 +10,7 @@ vespa_add_library(storage_distributor OBJECT
bucket_spaces_stats_provider.cpp
bucketgctimecalculator.cpp
bucketlistmerger.cpp
+ cancelled_replicas_pruner.cpp
clusterinformation.cpp
crypto_uuid_generator.cpp
distributor_bucket_space.cpp
diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp
new file mode 100644
index 00000000000..f453a722d2c
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.cpp
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cancelled_replicas_pruner.h"
+
+namespace storage::distributor {
+
+std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope) {
+ if (cancel_scope.fully_cancelled()) {
+ return {};
+ }
+ std::vector<BucketCopy> pruned_replicas;
+ // Expect that there will be an input entry for each cancelled node in the common case.
+ pruned_replicas.reserve((replicas.size() >= cancel_scope.cancelled_nodes().size())
+ ? replicas.size() - cancel_scope.cancelled_nodes().size() : 0);
+ for (auto& candidate : replicas) {
+ if (!cancel_scope.node_is_cancelled(candidate.getNode())) {
+ pruned_replicas.emplace_back(candidate);
+ }
+ }
+ return pruned_replicas;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h
new file mode 100644
index 00000000000..f12f78e569f
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/cancelled_replicas_pruner.h
@@ -0,0 +1,17 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/bucketdb/bucketcopy.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
+#include <span>
+#include <vector>
+
+namespace storage::distributor {
+
+/**
+ * Returns a new vector that contains all entries of `replicas` whose nodes are _not_ tagged as
+ * cancelled in `cancel_scope`. Returned entry ordering is identical to input ordering.
+ */
+[[nodiscard]] std::vector<BucketCopy> prune_cancelled_nodes(std::span<const BucketCopy> replicas, const CancelScope& cancel_scope);
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 243b3c5ecb2..ad1cce46bea 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -281,6 +281,7 @@ DistributorStripe::enableClusterStateBundle(const lib::ClusterStateBundle& state
enterRecoveryMode();
// Clear all active messages on nodes that are down.
+ // TODO this should also be done on nodes that are no longer part of the config!
const uint16_t old_node_count = oldState.getBaselineClusterState()->getNodeCount(lib::NodeType::STORAGE);
const uint16_t new_node_count = baseline_state.getNodeCount(lib::NodeType::STORAGE);
for (uint16_t i = 0; i < std::max(old_node_count, new_node_count); ++i) {
diff --git a/storage/src/vespa/storage/distributor/operationowner.cpp b/storage/src/vespa/storage/distributor/operationowner.cpp
index 7b7c9f431f7..c92544c8cb5 100644
--- a/storage/src/vespa/storage/distributor/operationowner.cpp
+++ b/storage/src/vespa/storage/distributor/operationowner.cpp
@@ -73,7 +73,7 @@ OperationOwner::onClose()
void
OperationOwner::erase(api::StorageMessage::Id msgId)
{
- _sentMessageMap.pop(msgId);
+ (void)_sentMessageMap.pop(msgId);
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
index 5c6a1f3d84c..8cf0470f674 100644
--- a/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/operations/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_distributoroperation OBJECT
SOURCES
+ cancel_scope.cpp
operation.cpp
DEPENDS
)
diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp
new file mode 100644
index 00000000000..af62b369517
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.cpp
@@ -0,0 +1,52 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "cancel_scope.h"
+
+namespace storage::distributor {
+
+CancelScope::CancelScope()
+ : _cancelled_nodes(),
+ _fully_cancelled(false)
+{
+}
+
+CancelScope::CancelScope(fully_cancelled_ctor_tag) noexcept
+ : _cancelled_nodes(),
+ _fully_cancelled(true)
+{
+}
+
+CancelScope::CancelScope(CancelledNodeSet nodes) noexcept
+ : _cancelled_nodes(std::move(nodes)),
+ _fully_cancelled(false)
+{
+}
+
+CancelScope::~CancelScope() = default;
+
+CancelScope::CancelScope(const CancelScope&) = default;
+CancelScope& CancelScope::operator=(const CancelScope&) = default;
+
+CancelScope::CancelScope(CancelScope&&) noexcept = default;
+CancelScope& CancelScope::operator=(CancelScope&&) noexcept = default;
+
+void CancelScope::add_cancelled_node(uint16_t node) {
+ _cancelled_nodes.insert(node);
+}
+
+void CancelScope::merge(const CancelScope& other) {
+ _fully_cancelled |= other._fully_cancelled;
+ // Not using iterator insert(first, last) since that explicitly resizes,
+ for (uint16_t node : other._cancelled_nodes) {
+ _cancelled_nodes.insert(node);
+ }
+}
+
+CancelScope CancelScope::of_fully_cancelled() noexcept {
+ return CancelScope(fully_cancelled_ctor_tag{});
+}
+
+CancelScope CancelScope::of_node_subset(CancelledNodeSet nodes) noexcept {
+ return CancelScope(std::move(nodes));
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/cancel_scope.h b/storage/src/vespa/storage/distributor/operations/cancel_scope.h
new file mode 100644
index 00000000000..7619a64d39f
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/cancel_scope.h
@@ -0,0 +1,62 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/hash_set.h>
+
+namespace storage::distributor {
+
+/**
+ * In the face of concurrent cluster state changes, cluster topology reconfigurations etc.,
+ * it's possible for there to be pending mutating operations to nodes that the distributor
+ * no longer should keep track of. Such operations must therefore be _cancelled_, either
+ * fully or partially. A CancelScope represents the granularity at which an operation should
+ * be cancelled.
+ *
+ * In the case of one or more nodes becoming unavailable, `fully_cancelled()` will be false
+ * and `node_is_cancelled(x)` will return whether node `x` is explicitly cancelled.
+ *
+ * In the case of ownership transfers, `fully_cancelled()` will be true since the distributor
+ * should no longer have any knowledge of the bucket. `node_is_cancelled(x)` is always
+ * implicitly true for all values of `x` for full cancellations.
+ */
+class CancelScope {
+public:
+ using CancelledNodeSet = vespalib::hash_set<uint16_t>;
+private:
+ CancelledNodeSet _cancelled_nodes;
+ bool _fully_cancelled;
+
+ struct fully_cancelled_ctor_tag {};
+
+ explicit CancelScope(fully_cancelled_ctor_tag) noexcept;
+ explicit CancelScope(CancelledNodeSet nodes) noexcept;
+public:
+ CancelScope();
+ ~CancelScope();
+
+ CancelScope(const CancelScope&);
+ CancelScope& operator=(const CancelScope&);
+
+ CancelScope(CancelScope&&) noexcept;
+ CancelScope& operator=(CancelScope&&) noexcept;
+
+ void add_cancelled_node(uint16_t node);
+ void merge(const CancelScope& other);
+
+ [[nodiscard]] bool fully_cancelled() const noexcept { return _fully_cancelled; }
+ [[nodiscard]] bool is_cancelled() const noexcept {
+ return (_fully_cancelled || !_cancelled_nodes.empty());
+ }
+ [[nodiscard]] bool node_is_cancelled(uint16_t node) const noexcept {
+ return (fully_cancelled() || _cancelled_nodes.contains(node));
+ }
+
+ [[nodiscard]] const CancelledNodeSet& cancelled_nodes() const noexcept {
+ return _cancelled_nodes;
+ }
+
+ static CancelScope of_fully_cancelled() noexcept;
+ static CancelScope of_node_subset(CancelledNodeSet nodes) noexcept;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index 0e12e3e3019..bd7f3709575 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -104,7 +104,12 @@ CheckCondition::handle_reply(DistributorStripeMessageSender& sender,
}
}
-void CheckCondition::cancel(DistributorStripeMessageSender& sender) {
+void CheckCondition::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
+ _cond_get_op->cancel(proxy_sender, cancel_scope);
+}
+
+void CheckCondition::close(DistributorStripeMessageSender& sender) {
IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
_cond_get_op->onClose(proxy_sender);
// We don't propagate any generated reply from the GetOperation, as its existence
@@ -163,6 +168,12 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
reply->steal_trace());
return;
}
+ if (_cond_get_op->is_cancelled()) {
+ _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED,
+ "Operation has been cancelled (likely due to a cluster state change)"),
+ reply->steal_trace());
+ return;
+ }
auto state_version_now = _bucket_space.getClusterState().getVersion();
if (_bucket_space.has_pending_cluster_state()) {
state_version_now = _bucket_space.get_pending_cluster_state().getVersion();
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
index 999b79adc3d..92a8bc62ae6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
@@ -17,6 +17,7 @@ namespace storage::api { class StorageReply; }
namespace storage::distributor {
+class CancelScope;
class DistributorBucketSpace;
class DistributorNodeContext;
class DistributorStripeMessageSender;
@@ -122,7 +123,8 @@ public:
void start_and_send(DistributorStripeMessageSender& sender);
void handle_reply(DistributorStripeMessageSender& sender,
const std::shared_ptr<api::StorageReply>& reply);
- void cancel(DistributorStripeMessageSender& sender);
+ void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+ void close(DistributorStripeMessageSender& sender);
[[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept {
return _outcome;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 854e7d15f82..e7832fd19e5 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -109,6 +109,8 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc
bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const {
// TODO handle this explicitly as part of operation abort/cancel edge
+ // -> we have yet to send anything at this point
+ // -> shouldn't ExternalOperationHandler deal with this before starting the op?
auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace());
if (!pending_state) {
return false;
@@ -245,6 +247,15 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen
_msg = std::shared_ptr<api::PutCommand>();
}
+void
+PutOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope)
+{
+ if (_check_condition) {
+ _check_condition->cancel(sender, cancel_scope);
+ }
+ _tracker.cancel(cancel_scope);
+}
+
bool
PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const
{
@@ -302,7 +313,7 @@ void
PutOperation::onClose(DistributorStripeMessageSender& sender)
{
if (_check_condition) {
- _check_condition->cancel(sender);
+ _check_condition->close(sender);
}
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 635accc1865..8b8e3e15375 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -60,6 +60,8 @@ private:
void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId,
uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch);
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
+
[[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const;
[[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 42d8e318f47..be43aac3d9e 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -156,9 +156,21 @@ void RemoveOperation::on_completed_check_condition(CheckCondition::Outcome& outc
void
RemoveOperation::onClose(DistributorStripeMessageSender& sender)
{
+ if (_check_condition) {
+ _check_condition->close(sender);
+ }
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+void
+RemoveOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope)
+{
+ if (_check_condition) {
+ _check_condition->cancel(sender, cancel_scope);
+ }
+ _tracker.cancel(cancel_scope);
+}
+
bool RemoveOperation::has_condition() const noexcept {
return _msg->hasTestAndSetCondition();
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 9f3a98294ea..221def81fdc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -29,6 +29,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
private:
PersistenceMessageTrackerImpl _tracker_instance;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 73c65f54b21..2d1c469d072 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -13,6 +13,7 @@
#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
+#include <vespa/vespalib/stllike/hash_set.hpp>
#include <cinttypes>
#include <vespa/log/log.h>
@@ -68,10 +69,8 @@ TwoPhaseUpdateOperation::stateToString(SendState state) noexcept
case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT";
case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT";
case SendState::PUTS_SENT: return "PUTS_SENT";
- default:
- assert(!"Unknown state");
- return "";
}
+ abort();
}
void
@@ -130,7 +129,7 @@ TwoPhaseUpdateOperation::get_bucket_database_entries() const
}
bool
-TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const
+TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries)
{
// Fast path iff bucket exists AND is consistent (split and copies).
if (entries.size() != 1) {
@@ -245,6 +244,16 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorStripeM
}
void
+TwoPhaseUpdateOperation::send_operation_cancelled_reply(DistributorStripeMessageSender& sender)
+{
+ sendReplyWithResult(sender,
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
+ "The update operation was cancelled due to a cluster state change "
+ "between executing the read and write phases of a write-repair "
+ "update"));
+}
+
+void
TwoPhaseUpdateOperation::send_feed_blocked_error_reply(DistributorStripeMessageSender& sender)
{
sendReplyWithResult(sender,
@@ -257,7 +266,8 @@ void
TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc,
api::Timestamp putTimestamp, DistributorStripeMessageSender& sender)
{
- if (lostBucketOwnershipBetweenPhases()) {
+ assert(!is_cancelled());
+ if (lostBucketOwnershipBetweenPhases()) { // TODO deprecate with cancellation
sendLostOwnershipTransientErrorReply(sender);
return;
}
@@ -281,6 +291,8 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
void
TwoPhaseUpdateOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
+ // In the case of cancellations, we let existing operations complete, but must not
+ // start new ones that are unaware of the cancellations.
if (_mode == Mode::FAST_PATH) {
handleFastPathReceive(sender, msg);
} else {
@@ -304,7 +316,10 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
sendReplyWithResult(sender, getReply.getResult());
return;
}
-
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
if (!getReply.getDocument().get()) {
// Weird, document is no longer there ... Just fail.
sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, ""));
@@ -316,7 +331,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
assert(callback.get());
- Operation & callbackOp = *callback;
+ Operation& callbackOp = *callback;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender);
callbackOp.receive(intermediate, msg);
@@ -326,12 +341,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
addTraceFromReply(*intermediate._reply);
auto& cb = dynamic_cast<UpdateOperation&>(callbackOp);
- std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation();
+ auto [newest_bucket, newest_node] = cb.getNewestTimestampLocation();
auto intermediate_update_reply = std::dynamic_pointer_cast<api::UpdateReply>(intermediate._reply);
assert(intermediate_update_reply);
if (!intermediate_update_reply->getResult().success() ||
- bestNode.first == document::BucketId(0))
+ (newest_bucket == document::BucketId(0)))
{
if (intermediate_update_reply->getResult().success() &&
(intermediate_update_reply->getOldTimestamp() == 0))
@@ -343,9 +358,14 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
} else {
LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str());
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
+
_updateReply = std::move(intermediate_update_reply);
- _fast_path_repair_source_node = bestNode.second;
- document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first);
+ _fast_path_repair_source_node = newest_node;
+ document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_bucket);
auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), document::AllFields::NAME);
copyMessageSettings(*_updateCmd, *cmd);
@@ -383,7 +403,7 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorStripeMessageSender& s
callbackOp.receive(intermediate, msg);
if (!intermediate._reply.get()) {
- return; // Not enough replies received yet or we're draining callbacks.
+ return; // Not enough replies received yet, or we're draining callbacks.
}
addTraceFromReply(*intermediate._reply);
if (_sendState == SendState::METADATA_GETS_SENT) {
@@ -445,6 +465,13 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
"One or more metadata Get operations failed; aborting Update"));
return;
}
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
+ // Replicas _removed_ is handled by cancellation, but a concurrent state change may happen
+ // that _adds_ one or more available content nodes, which we cannot then blindly write to.
+ // So we have to explicitly check this edge case.
if (!replica_set_unchanged_after_get_operation()) {
// Use BUCKET_NOT_FOUND to trigger a silent retry.
LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str());
@@ -490,6 +517,10 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSende
sendReplyWithResult(sender, reply.getResult());
return;
}
+ if (is_cancelled()) {
+ send_operation_cancelled_reply(sender);
+ return;
+ }
// Single Get could technically be considered consistent with itself, so make
// sure we never treat that as sufficient for restarting in the fast path.
if ((_sendState != SendState::SINGLE_GET_SENT) && may_restart_with_fast_path(reply)) {
@@ -558,7 +589,8 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender) {
LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
update_doc_id().c_str());
- if (lostBucketOwnershipBetweenPhases()) {
+ assert(!is_cancelled());
+ if (lostBucketOwnershipBetweenPhases()) { // TODO remove once cancellation is wired
sendLostOwnershipTransientErrorReply(sender);
return;
}
@@ -579,7 +611,7 @@ TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorStripeMessageSen
std::unique_ptr<document::select::Node> selection;
try {
- selection = _parser.parse_selection(_updateCmd->getCondition().getSelection());
+ selection = _parser.parse_selection(_updateCmd->getCondition().getSelection());
} catch (const document::select::ParsingFailedException & e) {
sendReplyWithResult(sender, api::ReturnCode(
api::ReturnCode::ILLEGAL_PARAMETERS,
@@ -679,6 +711,22 @@ TwoPhaseUpdateOperation::onClose(DistributorStripeMessageSender& sender) {
}
}
+void
+TwoPhaseUpdateOperation::on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ // We have to explicitly cancel any and all pending Operation instances that have been
+ // launched by this operation. This is to ensure any DB updates they may transitively
+ // perform are aware of all cancellations that have occurred.
+ // There may be many messages pending for any given operation, so unique-ify them prior
+ // to avoid duplicate cancellation invocations.
+ vespalib::hash_set<Operation*> ops;
+ for (auto& msg_op : _sentMessageMap) {
+ ops.insert(msg_op.second.get());
+ }
+ for (auto* op : ops) {
+ op->cancel(sender, cancel_scope);
+ }
+}
+
vespalib::string TwoPhaseUpdateOperation::update_doc_id() const {
assert(_updateCmd.get() != nullptr);
return _updateCmd->getDocumentId().toString();
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index d2ad5359fa6..7f64bb8d56c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -71,6 +71,8 @@ public:
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
+
private:
enum class SendState {
NONE_SENT,
@@ -94,19 +96,20 @@ private:
void sendReplyWithResult(DistributorStripeMessageSender&, const api::ReturnCode&);
void ensureUpdateReplyCreated();
- std::vector<BucketDatabase::Entry> get_bucket_database_entries() const;
- bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const;
+ [[nodiscard]] std::vector<BucketDatabase::Entry> get_bucket_database_entries() const;
+ [[nodiscard]] static bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries);
void startFastPathUpdate(DistributorStripeMessageSender& sender, std::vector<BucketDatabase::Entry> entries);
void startSafePathUpdate(DistributorStripeMessageSender&);
- bool lostBucketOwnershipBetweenPhases() const;
+ [[nodiscard]] bool lostBucketOwnershipBetweenPhases() const;
void sendLostOwnershipTransientErrorReply(DistributorStripeMessageSender&);
+ void send_operation_cancelled_reply(DistributorStripeMessageSender& sender);
void send_feed_blocked_error_reply(DistributorStripeMessageSender& sender);
void schedulePutsWithUpdatedDocument(
std::shared_ptr<document::Document>,
api::Timestamp,
DistributorStripeMessageSender&);
void applyUpdateToDocument(document::Document&) const;
- std::shared_ptr<document::Document> createBlankDocument() const;
+ [[nodiscard]] std::shared_ptr<document::Document> createBlankDocument() const;
void setUpdatedForTimestamp(api::Timestamp);
void handleFastPathReceive(DistributorStripeMessageSender&,
const std::shared_ptr<api::StorageReply>&);
@@ -120,20 +123,20 @@ private:
void handle_safe_path_received_single_full_get(DistributorStripeMessageSender&, api::GetReply&);
void handleSafePathReceivedGet(DistributorStripeMessageSender&, api::GetReply&);
void handleSafePathReceivedPut(DistributorStripeMessageSender&, const api::PutReply&);
- bool shouldCreateIfNonExistent() const;
+ [[nodiscard]] bool shouldCreateIfNonExistent() const;
bool processAndMatchTasCondition(
DistributorStripeMessageSender& sender,
const document::Document& candidateDoc);
- bool satisfiesUpdateTimestampConstraint(api::Timestamp) const;
+ [[nodiscard]] bool satisfiesUpdateTimestampConstraint(api::Timestamp) const;
void addTraceFromReply(api::StorageReply& reply);
- bool hasTasCondition() const noexcept;
+ [[nodiscard]] bool hasTasCondition() const noexcept;
void replyWithTasFailure(DistributorStripeMessageSender& sender,
vespalib::stringref message);
bool may_restart_with_fast_path(const api::GetReply& reply);
- bool replica_set_unchanged_after_get_operation() const;
+ [[nodiscard]] bool replica_set_unchanged_after_get_operation() const;
void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorStripeMessageSender& sender);
// Precondition: reply has not yet been sent.
- vespalib::string update_doc_id() const;
+ [[nodiscard]] vespalib::string update_doc_id() const;
using ReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index f43a6092372..60bddebbb89 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -207,6 +207,13 @@ UpdateOperation::onClose(DistributorStripeMessageSender& sender)
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+void
+UpdateOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope)
+{
+ _tracker.cancel(cancel_scope);
+}
+
+
// The backend behavior of "create-if-missing" updates is to return the timestamp of the
// _new_ update operation if the document was created from scratch. The two-phase update
// operation logic auto-detects unexpected inconsistencies and tries to reconcile
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 96fd878a324..7d2131d426d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -31,6 +31,7 @@ public:
std::string getStatus() const override { return ""; };
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
void onClose(DistributorStripeMessageSender& sender) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
std::pair<document::BucketId, uint16_t> getNewestTimestampLocation() const {
return _newestTimestampLocation;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 2e6d0e95ec9..bf64fa2eb82 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "garbagecollectionoperation.h"
+#include <vespa/storage/distributor/cancelled_replicas_pruner.h>
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/idealstatemetricsset.h>
#include <vespa/storage/distributor/top_level_distributor.h>
@@ -22,6 +23,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const ClusterContext& clu
_cluster_state_version_at_phase1_start_time(0),
_remove_candidates(),
_replica_info(),
+ _cancel_scope(),
_max_documents_removed(0),
_is_done(false)
{}
@@ -148,6 +150,10 @@ GarbageCollectionOperation::onReceive(DistributorStripeMessageSender& sender,
}
}
+void GarbageCollectionOperation::on_cancel(DistributorStripeMessageSender&, const CancelScope& cancel_scope) {
+ _cancel_scope.merge(cancel_scope);
+}
+
void GarbageCollectionOperation::update_replica_response_info_from_reply(uint16_t from_node, const api::RemoveLocationReply& reply) {
_replica_info.emplace_back(_manager->operation_context().generate_unique_timestamp(),
from_node, reply.getBucketInfo());
@@ -186,6 +192,11 @@ bool GarbageCollectionOperation::may_start_write_phase() const {
if (!_ok) {
return false; // Already broken, no reason to proceed.
}
+ if (is_cancelled()) {
+ LOG(debug, "GC(%s): not sending write phase; operation has been explicitly cancelled",
+ getBucket().toString().c_str());
+ return false;
+ }
const auto state_version_now = _bucketSpace->getClusterState().getVersion();
if ((state_version_now != _cluster_state_version_at_phase1_start_time) ||
_bucketSpace->has_pending_cluster_state())
@@ -250,9 +261,17 @@ void GarbageCollectionOperation::update_last_gc_timestamp_in_db() {
}
void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
- // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
- _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
- update_last_gc_timestamp_in_db();
+ if (_cancel_scope.is_cancelled()) {
+ if (_cancel_scope.fully_cancelled()) {
+ return;
+ }
+ _replica_info = prune_cancelled_nodes(_replica_info, _cancel_scope);
+ }
+ if (!_replica_info.empty()) {
+ // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
+ _manager->operation_context().update_bucket_database(getBucket(), _replica_info);
+ update_last_gc_timestamp_in_db();
+ } // else: effectively fully cancelled, no touching the DB.
}
void GarbageCollectionOperation::update_gc_metrics() {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 27dc519dcc2..97efbe694de 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -3,10 +3,11 @@
#include "idealstateoperation.h"
#include <vespa/document/base/documentid.h>
+#include <vespa/persistence/spi/id_and_timestamp.h>
#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
#include <vespa/storage/distributor/operation_sequencer.h>
-#include <vespa/persistence/spi/id_and_timestamp.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vector>
@@ -22,13 +23,14 @@ public:
void onStart(DistributorStripeMessageSender& sender) override;
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
+ void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) override;
const char* getName() const noexcept override { return "garbagecollection"; };
Type getType() const noexcept override { return GARBAGE_COLLECTION; }
bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
- bool is_two_phase() const noexcept {
+ [[nodiscard]] bool is_two_phase() const noexcept {
return ((_phase == Phase::ReadMetadataPhase) || (_phase == Phase::WriteRemovesPhase));
}
- bool is_done() const noexcept { return _is_done; }
+ [[nodiscard]] bool is_done() const noexcept { return _is_done; }
protected:
MessageTracker _tracker;
@@ -54,13 +56,14 @@ private:
RemoveCandidates _remove_candidates;
std::vector<SequencingHandle> _gc_write_locks;
std::vector<BucketCopy> _replica_info;
+ CancelScope _cancel_scope;
uint32_t _max_documents_removed;
bool _is_done;
static RemoveCandidates steal_selection_matches_as_candidates(api::RemoveLocationReply& reply);
void send_current_phase_remove_locations(DistributorStripeMessageSender& sender);
- std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
+ [[nodiscard]] std::vector<spi::IdAndTimestamp> compile_phase_two_send_set() const;
void handle_ok_legacy_reply(uint16_t from_node, const api::RemoveLocationReply& reply);
void handle_ok_phase1_reply(api::RemoveLocationReply& reply);
diff --git a/storage/src/vespa/storage/distributor/operations/operation.cpp b/storage/src/vespa/storage/distributor/operations/operation.cpp
index 4d82de170ae..9f944a94178 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/operation.cpp
@@ -12,7 +12,8 @@ LOG_SETUP(".distributor.callback");
namespace storage::distributor {
Operation::Operation()
- : _startTime()
+ : _startTime(),
+ _cancelled(false)
{
}
@@ -45,6 +46,11 @@ Operation::copyMessageSettings(const api::StorageCommand& source, api::StorageCo
target.setPriority(source.getPriority());
}
+void Operation::cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ _cancelled = true;
+ on_cancel(sender, cancel_scope);
+}
+
void
Operation::on_blocked()
{
diff --git a/storage/src/vespa/storage/distributor/operations/operation.h b/storage/src/vespa/storage/distributor/operations/operation.h
index bc7e510a5b6..64caacfc642 100644
--- a/storage/src/vespa/storage/distributor/operations/operation.h
+++ b/storage/src/vespa/storage/distributor/operations/operation.h
@@ -16,6 +16,7 @@ class StorageComponent;
namespace distributor {
+class CancelScope;
class DistributorStripeOperationContext;
class PendingMessageTracker;
class OperationSequencer;
@@ -40,7 +41,7 @@ public:
on the owner of the message that was replied to.
*/
virtual void receive(DistributorStripeMessageSender& sender,
- const std::shared_ptr<api::StorageReply> & msg)
+ const std::shared_ptr<api::StorageReply> & msg)
{
onReceive(sender, msg);
}
@@ -60,6 +61,22 @@ public:
void start(DistributorStripeMessageSender& sender);
/**
+ * Explicitly cancel the operation. Cancelled operations may or may not (depending on
+ * the operation implementation) be immediately aborted, but they should either way
+ * never insert any bucket information _for cancelled nodes_ into the bucket DB after
+ * cancel() has been called.
+ */
+ void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);
+
+ /**
+ * Whether cancel() has been invoked at least once on this instance. This does not
+ * distinguish between cancellations caused by ownership transfers and those caused
+ * by nodes becoming unavailable; Operation implementations that care about this need
+ * to implement cancel() themselves and inspect the provided CancelScope.
+ */
+ [[nodiscard]] bool is_cancelled() const noexcept { return _cancelled; }
+
+ /**
* Returns true if we are blocked to start this operation given
* the pending messages.
*/
@@ -93,8 +110,15 @@ private:
const std::shared_ptr<api::StorageReply> & msg) = 0;
protected:
+ virtual void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
+ (void)sender;
+ (void)cancel_scope;
+ }
+
static constexpr vespalib::duration MAX_TIMEOUT = 3600s;
+
vespalib::system_time _startTime;
+ bool _cancelled;
};
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index a4295613fd2..498f3a5feab 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -1,10 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "persistencemessagetracker.h"
+#include "cancelled_replicas_pruner.h"
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/storageapi/message/persistence.h>
+#include <algorithm>
#include <vespa/log/log.h>
LOG_SETUP(".persistencemessagetracker");
@@ -18,12 +20,15 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
DistributorStripeOperationContext& op_ctx,
api::Timestamp revertTimestamp)
: MessageTracker(node_ctx),
+ _remapBucketInfo(),
+ _bucketInfo(),
_metric(metric),
_reply(std::move(reply)),
_op_ctx(op_ctx),
_revertTimestamp(revertTimestamp),
_trace(_reply->getTrace().getLevel()),
_requestTimer(node_ctx.clock()),
+ _cancel_scope(),
_n_persistence_replies_total(0),
_n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
@@ -34,8 +39,32 @@ PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
PersistenceMessageTrackerImpl::~PersistenceMessageTrackerImpl() = default;
void
+PersistenceMessageTrackerImpl::cancel(const CancelScope& cancel_scope)
+{
+ _cancel_scope.merge(cancel_scope);
+}
+
+void
+PersistenceMessageTrackerImpl::prune_cancelled_nodes_if_present(
+ BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope)
+{
+ for (auto& info : bucket_and_replicas) {
+ info.second = prune_cancelled_nodes(info.second, cancel_scope);
+ }
+}
+
+void
PersistenceMessageTrackerImpl::updateDB()
{
+ if (_cancel_scope.is_cancelled()) {
+ if (_cancel_scope.fully_cancelled()) {
+ return; // Fully cancelled ops cannot mutate the DB at all
+ }
+ prune_cancelled_nodes_if_present(_bucketInfo, _cancel_scope);
+ prune_cancelled_nodes_if_present(_remapBucketInfo, _cancel_scope);
+ }
+
for (const auto & entry : _bucketInfo) {
_op_ctx.update_bucket_database(entry.first, entry.second);
}
@@ -229,12 +258,19 @@ PersistenceMessageTrackerImpl::updateFailureResult(const api::BucketInfoReply& r
_success = false;
}
+bool
+PersistenceMessageTrackerImpl::node_is_effectively_cancelled(uint16_t node) const noexcept
+{
+ return _cancel_scope.node_is_cancelled(node); // Implicitly covers the fully cancelled case
+}
+
void
PersistenceMessageTrackerImpl::handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node)
{
LOG(spam, "Received CreateBucket reply for %s from node %u", reply.getBucketId().toString().c_str(), node);
if (!reply.getResult().success()
- && reply.getResult().getResult() != api::ReturnCode::EXISTS)
+ && (reply.getResult().getResult() != api::ReturnCode::EXISTS)
+ && !node_is_effectively_cancelled(node))
{
LOG(spam, "Create bucket reply failed, so deleting it from bucket db");
// We don't know if the bucket exists at this point, so we remove it from the DB.
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 9b06547dd98..8c44d70062c 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -4,6 +4,7 @@
#include "distributor_stripe_component.h"
#include "distributormetricsset.h"
#include "messagetracker.h"
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageapi/messageapi/bucketinfocommand.h>
#include <vespa/storageapi/messageapi/bucketinforeply.h>
@@ -14,6 +15,7 @@ struct PersistenceMessageTracker {
virtual ~PersistenceMessageTracker() = default;
using ToSend = MessageTracker::ToSend;
+ virtual void cancel(const CancelScope& cancel_scope) = 0;
virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
virtual void queueMessageBatch(std::vector<ToSend> messages) = 0;
virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0;
@@ -26,14 +28,9 @@ struct PersistenceMessageTracker {
};
class PersistenceMessageTrackerImpl final
- : public PersistenceMessageTracker,
- public MessageTracker
+ : public PersistenceMessageTracker,
+ public MessageTracker
{
-private:
- using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
- BucketInfoMap _remapBucketInfo;
- BucketInfoMap _bucketInfo;
-
public:
PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric,
std::shared_ptr<api::BucketInfoReply> reply,
@@ -42,6 +39,8 @@ public:
api::Timestamp revertTimestamp = 0);
~PersistenceMessageTrackerImpl() override;
+ void cancel(const CancelScope& cancel_scope) override;
+
void updateDB();
void updateMetrics();
[[nodiscard]] bool success() const noexcept { return _success; }
@@ -67,8 +66,11 @@ public:
void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override;
private:
- using MessageBatch = std::vector<uint64_t>;
+ using MessageBatch = std::vector<uint64_t>;
+ using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
+ BucketInfoMap _remapBucketInfo;
+ BucketInfoMap _bucketInfo;
std::vector<MessageBatch> _messageBatches;
PersistenceOperationMetricSet& _metric;
std::shared_ptr<api::BucketInfoReply> _reply;
@@ -77,20 +79,24 @@ private:
std::vector<BucketNodePair> _revertNodes;
mbus::Trace _trace;
framework::MilliSecTimer _requestTimer;
+ CancelScope _cancel_scope;
uint32_t _n_persistence_replies_total;
uint32_t _n_successful_persistence_replies;
uint8_t _priority;
bool _success;
- bool canSendReplyEarly() const;
+ static void prune_cancelled_nodes_if_present(BucketInfoMap& bucket_and_replicas,
+ const CancelScope& cancel_scope);
+ [[nodiscard]] bool canSendReplyEarly() const;
void addBucketInfoFromReply(uint16_t node, const api::BucketInfoReply& reply);
void logSuccessfulReply(uint16_t node, const api::BucketInfoReply& reply) const;
- bool hasSentReply() const noexcept { return !_reply; }
- bool shouldRevert() const;
- bool has_majority_successful_replies() const noexcept;
- bool has_minority_test_and_set_failure() const noexcept;
+ [[nodiscard]] bool hasSentReply() const noexcept { return !_reply; }
+ [[nodiscard]] bool shouldRevert() const;
+ [[nodiscard]] bool has_majority_successful_replies() const noexcept;
+ [[nodiscard]] bool has_minority_test_and_set_failure() const noexcept;
void sendReply(MessageSender& sender);
void updateFailureResult(const api::BucketInfoReply& reply);
+ [[nodiscard]] bool node_is_effectively_cancelled(uint16_t node) const noexcept;
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);
void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node);
void transfer_trace_state_to_reply();
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 70bee311f78..951ed6a6877 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -10,19 +10,23 @@ class Operation;
class SentMessageMap {
public:
+ using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
+
SentMessageMap();
~SentMessageMap();
- std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
- std::shared_ptr<Operation> pop();
+ [[nodiscard]] std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
+ [[nodiscard]] std::shared_ptr<Operation> pop();
void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg);
void clear();
- uint32_t size() const { return _map.size(); }
+ [[nodiscard]] uint32_t size() const { return _map.size(); }
[[nodiscard]] bool empty() const noexcept { return _map.empty(); }
std::string toString() const;
+
+ Map::const_iterator begin() const noexcept { return _map.cbegin(); }
+ Map::const_iterator end() const noexcept { return _map.cend(); }
private:
- using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
Map _map;
};