summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor')
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp17
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp60
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp155
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp40
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp169
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp40
6 files changed, 447 insertions, 34 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
index 757a9329ea6..617401dd271 100644
--- a/storage/src/tests/distributor/check_condition_test.cpp
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
#include <vespa/storage/distributor/node_supported_features.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/check_condition.h>
#include <vespa/storage/distributor/persistence_operation_metric_set.h>
#include <vespa/storageapi/message/persistence.h>
@@ -227,6 +228,20 @@ TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) {
});
}
+TEST_F(CheckConditionTest, check_fails_if_condition_explicitly_cancelled) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.cancel(_sender, CancelScope::of_fully_cancelled());
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::ABORTED);
+ });
+}
+
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -242,6 +257,7 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
@@ -255,6 +271,7 @@ TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start
});
}
+// TODO deprecate in favor of cancelling
TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) {
test_cond_with_2_gets_sent([&](auto& cond) {
cond.handle_reply(_sender, make_matched_reply(0));
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 9b5056f2066..b1cf1cbc636 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -113,9 +113,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil {
ASSERT_EQ(entry->getNodeCount(), info.size());
EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
for (size_t i = 0; i < info.size(); ++i) {
- EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo())
- << "Mismatching info for node " << i << ": " << info[i] << " vs "
- << entry->getNode(i)->getBucketInfo();
+ auto& node = entry->getNodeRef(i);
+ EXPECT_EQ(info[i], node.getBucketInfo()) << "Mismatching DB bucket info for node " << node.getNode();
}
}
@@ -172,6 +171,51 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until
EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics
}
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_if_operation_fully_canceled) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged. Note that in a real scenario, the DB entry will have been removed
+ // as part of the ownership change, but there are already non-cancellation behaviors that
+ // avoid creating buckets from scratch in the DB if they do not exist, so just checking to
+ // see if the bucket exists or not risks hiding missing cancellation edge handling.
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+ // However, we still update our metrics if we _did_ remove documents on one or more nodes
+ EXPECT_EQ(70u, gc_removed_documents_metric());
+}
+
+TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_for_cancelled_node) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for node 0, changed for node 1
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(4567, 90, 500)}, 34));
+}
+
+TEST_F(GarbageCollectionOperationTest, node_cancellation_is_cumulative) {
+ auto op = create_op();
+ op->start(_sender);
+ ASSERT_EQ(2, _sender.commands().size());
+
+ reply_to_nth_request(*op, 0, 1234, 70);
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+ reply_to_nth_request(*op, 1, 4567, 60);
+
+ // DB state is unchanged for both nodes
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+}
+
TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
auto op = create_op();
op->start(_sender);
@@ -363,6 +407,16 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in
receive_phase1_replies_and_assert_no_phase_2_started();
}
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_fully_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_fully_cancelled());
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
+TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_partially_cancelled_between_phases) {
+ _op->cancel(_sender, CancelScope::of_node_subset({0}));
+ receive_phase1_replies_and_assert_no_phase_2_started();
+}
+
TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) {
enable_two_phase_gc();
auto op = create_op();
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 76b6741442e..ee87fe84df6 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
@@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R
_sender.getCommands(true, true, 4));
}
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+
+ // DB is not touched (note: normally node 1 would be removed at the cancel-edge).
+ ASSERT_EQ("BucketId(0x4000000000008f09) : "
+ "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), "
+ "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)",
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
+ // No new requests sent
+ ASSERT_EQ("", _sender.getCommands(true, true, 4));
+}
+
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+ sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0
+
+ // Bucket info recheck only sent to node 1, as it's not cancelled
+ ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1",
+ _sender.getCommands(true, true, 4));
+}
+
TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cfg = make_config();
@@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck
_sender.getLastReply());
}
+TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ createAndSendSampleDocument(TIMEOUT);
+
+ ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 0,"
+ "Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 1",
+ _sender.getCommands(true, true));
+
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+
+ sendReply(0);
+ sendReply(1);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, storage_failed) {
setup_stripe(2, 1, "storage:1 distributor:1");
@@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
- std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
ASSERT_TRUE(sreply);
sreply->remapBucketId(document::BucketId(17, 13));
@@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
dumpBucket(document::BucketId(17, 13)));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) {
setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
@@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
@@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
// TODO probably also do this for updates and removes
// TODO consider if we should use the pending state verbatim for computing targets if it exists
+// TODO make this redundant through operation cancelling
+// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this??
TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "test");
@@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state
_sender.getLastReply(true));
}
+TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2});
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+
+ // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless
+ // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path.
+ {
+ std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
+ auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
+ ASSERT_TRUE(sreply);
+ sreply->remapBucketId(remap_bucket);
+ sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5));
+ op->receive(_sender, reply);
+ }
+
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9));
+
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket));
+ EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket));
+}
+
+TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7));
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)",
+ dumpBucket(bucket));
+}
+
TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
setup_stripe(Redundancy(2), NodeCount(2),
"distributor:1 storage:2 .0.s:r .1.s:r");
@@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) {
_sender.getLastReply());
}
+TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill
+ // the write phase for all replicas.
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) {
setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
auto doc = createDummyDocument("test", "test");
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index d169c80a95d..3fad2c194a2 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -68,6 +68,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
std::unique_ptr<api::StorageReply> reply(removec->makeReply());
auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get());
removeR->setOldTimestamp(oldTimestamp);
+ removeR->setBucketInfo(api::BucketInfo(1,2,3,4,5));
callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release()));
}
@@ -307,6 +308,45 @@ TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_err
_sender.getLastReply());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(ExtRemoveOperationTest, cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 50, false, true));
+ op->cancel(_sender, CancelScope::of_fully_cancelled());
+ reply_with(make_get_reply(1, 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(ExtRemoveOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT));
+ ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucketId), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, 0, 50);
+ replyToMessage(*op, 1, 50);
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(bucketId));
+ // Reply is still OK since the operation went through on the content nodes
+ ASSERT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, timestamp 100, removed doc from 50) ReturnCode(NONE)",
+ _sender.getLastReply());
+
+}
+
TEST_F(ExtRemoveOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) {
ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index da32225cde3..1907335545a 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -4,16 +4,16 @@
#include <vespa/config/helper/configgetter.h>
#include <vespa/document/base/testdocrepo.h>
#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/fieldvalue/intfieldvalue.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/update/arithmeticvalueupdate.h>
-#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h>
+#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storageapi/message/persistence.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
#include <gmock/gmock.h>
namespace storage::distributor {
@@ -30,8 +30,9 @@ using namespace ::testing;
struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
document::TestDocRepo _testRepo;
std::shared_ptr<const DocumentTypeRepo> _repo;
- const DocumentType* _doc_type;
+ const DocumentType* _doc_type{nullptr};
DistributorMessageSenderStub _sender;
+ BucketId _bucket_id{0x400000000000cac4};
TwoPhaseUpdateOperationTest();
~TwoPhaseUpdateOperationTest() override;
@@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
void checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const;
- std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
+ static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&);
void SetUp() override {
_repo = _testRepo.getTypeRepoSp();
@@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
close();
}
- void replyToMessage(Operation& callback,
- DistributorMessageSenderStub& sender,
- uint32_t index,
- uint64_t oldTimestamp,
- api::ReturnCode::Result result = api::ReturnCode::OK);
+ static void replyToMessage(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t oldTimestamp,
+ api::ReturnCode::Result result = api::ReturnCode::OK);
- void replyToPut(
+ static void replyToPut(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void replyToCreateBucket(
+ static void replyToCreateBucket(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
- void reply_to_metadata_get(
+ static void reply_to_metadata_get(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& trace_msg = "");
- void reply_to_get_with_tombstone(
+ static void reply_to_get_with_tombstone(
Operation& callback,
DistributorMessageSenderStub& sender,
uint32_t index,
@@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil {
Timestamp highest_get_timestamp,
Timestamp expected_response_timestamp);
- std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
- setup_stripe(2, 2, "storage:2 distributor:1");
+ void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas);
+
+ void enable_3phase_updates(bool enable = true) {
auto cfg = make_config();
- cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
+ cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable);
configure_stripe(cfg);
+ }
+
+ std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates(enable_3phase);
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
cb->start(_sender);
return cb;
@@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<PutCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
+ dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5));
if (!traceMsg.empty()) {
MBUS_TRACE(reply->getTrace(), 1, traceMsg);
}
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket(
{
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2);
- std::unique_ptr<api::StorageReply> reply(putc.makeReply());
+ std::shared_ptr<api::StorageReply> reply(putc.makeReply());
reply->setResult(api::ReturnCode(result, ""));
- callback.receive(sender,
- std::shared_ptr<StorageReply>(reply.release()));
+ callback.receive(sender, reply);
}
void
@@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
update->setCreateIfNonExistent(options._createIfNonExistent);
document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId());
+ assert(id == _bucket_id);
document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId());
if (bucketState.length()) {
@@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste
_sender.getLastReply(true));
}
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=1/2/3");
+ op->start(_sender);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
+
+ replyToMessage(*op, _sender, 0, 110);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToMessage(*op, _sender, 1, 110);
+
+ // Client operation itself should return success since the update went through on all replica nodes
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 110) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
void
TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const
@@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1);
}
+void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ enable_3phase_updates();
+ auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
+ op->start(_sender);
+
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*op, _sender, 0, 70);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO custom cancellation failure metric?
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true);
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) {
+ do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false);
+}
+
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setup_stripe(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
@@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
EXPECT_EQ(1, m.ok.getValue());
}
+TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1});
+ op->cancel(_sender, CancelScope::of_node_subset({1}));
+
+ replyToGet(*op, _sender, 2, 2000U);
+ ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster "
+ "state change between executing the read and write phases of a write-repair update)",
+ _sender.getLastReply(true));
+
+ // TODO cancellation metrics?
+}
+
+TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) {
+ auto op = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*op, _sender, 0, 2000);
+ reply_to_metadata_get(*op, _sender, 1, 1000);
+
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+ replyToGet(*op, _sender, 2, 2000U);
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0});
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+ replyToPut(*op, _sender, 3);
+ replyToPut(*op, _sender, 4);
+
+ // Update itself is ACKed
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 2000) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ // But cancelled replicas are not reintroduced into the bucket DB
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)",
+ dumpBucket(_bucket_id));
+}
+
+
TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
auto cb = set_up_2_inconsistent_replicas_and_start_update();
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
@@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_
auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
cb->start(_sender);
// Add new replica to deterministic test bucket after gets have been sent
- BucketId bucket(0x400000000000cac4); // Always the same in the test.
- addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+ addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3");
Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index d0ae31b9524..fefc88a27c2 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -11,7 +11,7 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
using config::ConfigGetter;
using config::FileSpec;
@@ -29,11 +29,14 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil {
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _html_type;
+ UpdateOperationTest()
+ : _repo(std::make_shared<DocumentTypeRepo>(*ConfigGetter<DocumenttypesConfig>::
+ getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))),
+ _html_type(_repo->getDocumentType("text/html"))
+ {
+ }
+
void SetUp() override {
- _repo.reset(
- new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>::
- getConfig("config-doctypes", FileSpec("../config-doctypes.cfg"))));
- _html_type = _repo->getDocumentType("text/html");
createLinks();
}
@@ -241,4 +244,31 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest
EXPECT_EQ(2, m.diverging_timestamp_updates.getValue());
}
+// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops
+// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests.
+
+TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ std::shared_ptr<UpdateOperation> op = sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3");
+ DistributorMessageSenderStub sender;
+ op->start(sender);
+
+ ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bId), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ replyToMessage(*op, sender, 0, 120);
+ replyToMessage(*op, sender, 1, 120);
+ replyToMessage(*op, sender, 2, 120);
+
+ EXPECT_EQ("BucketId(0x400000000000cac4) : "
+ "node(idx=1,crc=0x2,docs=4/4,bytes=6/6,trusted=true,active=false,ready=false)",
+ dumpBucket(_bId));
+}
+
}