diff options
14 files changed, 869 insertions, 422 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index afc3b254c64..559784121a6 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -193,6 +193,12 @@ struct DistributorTest : Test, DistributorTestUtil { configureDistributor(builder); } + void configure_metadata_update_phase_enabled(bool enabled) { + ConfigBuilder builder; + builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled; + configureDistributor(builder); + } + void configureMaxClusterClockSkew(int seconds); void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); @@ -1044,6 +1050,17 @@ TEST_F(DistributorTest, merge_disabling_config_is_propagated_to_internal_config) EXPECT_FALSE(getConfig().merge_operations_disabled()); } +TEST_F(DistributorTest, metadata_update_phase_config_is_propagated_to_internal_config) { + createLinks(true); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_metadata_update_phase_enabled(true); + EXPECT_TRUE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()); + + configure_metadata_update_phase_enabled(false); + EXPECT_FALSE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()); +} + TEST_F(DistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) { createLinks(true); setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 0dbec8444cc..f14b78094d1 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -21,6 +21,7 @@ using config::ConfigGetter; using document::DocumenttypesConfig; using config::FileSpec; using document::test::makeDocumentBucket; +using document::BucketId; using namespace ::testing; namespace storage::distributor { @@ -29,8 +30,8 @@ struct GetOperationTest : Test, DistributorTestUtil { std::shared_ptr<const document::DocumentTypeRepo> _repo; document::DocumentId docId; - document::BucketId bucketId; - std::unique_ptr<Operation> op; + BucketId bucketId; + std::unique_ptr<GetOperation> op; GetOperationTest(); ~GetOperationTest() override; @@ -52,7 +53,7 @@ struct GetOperationTest : Test, DistributorTestUtil { } void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) { - auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]"); + auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, "[all]"); op = std::make_unique<GetOperation>( getExternalOperationHandler(), getDistributorBucketSpace(), getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(), @@ -135,6 +136,14 @@ struct GetOperationTest : Test, DistributorTestUtil { GetOperationTest::GetOperationTest() = default; GetOperationTest::~GetOperationTest() = default; +namespace { + +NewestReplica replica_of(api::Timestamp ts, const document::BucketId& bucket_id, uint16_t node) { + return NewestReplica::of(ts, bucket_id, node); +} + +} + TEST_F(GetOperationTest, simple) { setClusterState("distributor:1 storage:2"); @@ -149,7 +158,10 @@ TEST_F(GetOperationTest, simple) { EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_TRUE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0), *op->newest_replica()); } TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) { @@ -167,7 +179,10 @@ TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 2) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 0), *op->newest_replica()); } TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) { @@ -179,15 +194,18 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) { ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); - ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "newauthor", 2)); - ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "oldauthor", 1)); + ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "newauthor", 2)); + ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "oldauthor", 1)); ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 2) ReturnCode(NONE)", _sender.getLastReply()); - EXPECT_EQ("newauthor", getLastReplyAuthor()); + + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 1), *op->newest_replica()); } TEST_F(GetOperationTest, send_to_all_invalid_copies) { @@ -205,8 +223,9 @@ TEST_F(GetOperationTest, send_to_all_invalid_copies) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 2) ReturnCode(NONE)", _sender.getLastReply()); - EXPECT_EQ("newauthor", getLastReplyAuthor()); + + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); } @@ -235,8 +254,8 @@ TEST_F(GetOperationTest, send_to_all_invalid_nodes_when_inconsistent) { TEST_F(GetOperationTest, inconsistent_split) { setClusterState("distributor:1 storage:4"); - addNodesToBucketDB(document::BucketId(16, 0x0593), "0=100"); - addNodesToBucketDB(document::BucketId(17, 0x10593), "1=200"); + addNodesToBucketDB(BucketId(16, 0x0593), "0=100"); + addNodesToBucketDB(BucketId(17, 0x10593), "1=200"); sendGet(); @@ -248,9 +267,13 @@ TEST_F(GetOperationTest, inconsistent_split) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 2) ReturnCode(NONE)", _sender.getLastReply()); - EXPECT_EQ("newauthor", getLastReplyAuthor()); + + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); + // Bucket with highest timestamp should be returned. In this case it's the one on node 0. + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(2), BucketId(16, 0x0593), 0), *op->newest_replica()); } TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) { @@ -268,6 +291,8 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 2) ReturnCode(NONE)", _sender.getLastReply()); + + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); } @@ -288,7 +313,11 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 3) ReturnCode(NONE)", _sender.getLastReply()); + + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 1), *op->newest_replica()); } TEST_F(GetOperationTest, multi_inconsistent_bucket) { @@ -308,6 +337,8 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket) { _sender.getLastReply()); EXPECT_EQ("newauthor", getLastReplyAuthor()); + + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); } @@ -331,7 +362,12 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); + + EXPECT_TRUE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + // First send to node 2 fails, second is to node 3 which returned the highest timestamp + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 3), *op->newest_replica()); } TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) { @@ -342,6 +378,8 @@ TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 0) ReturnCode(NONE)", _sender.getLastReply()); + + EXPECT_FALSE(op->any_replicas_failed()); EXPECT_TRUE(last_reply_had_consistent_replicas()); // Nothing in the bucket, so nothing to be inconsistent with. } @@ -363,7 +401,14 @@ TEST_F(GetOperationTest, not_found) { EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT]. failures.notfound.getValue()); + EXPECT_FALSE(op->any_replicas_failed()); // "Not found" is not a failure. EXPECT_TRUE(last_reply_had_consistent_replicas()); + EXPECT_TRUE(op->newest_replica().has_value()); + // "Not found" is still a success with a timestamp of 0. This is because + // the caller may want to perform special logic if all replicas are in sync + // but are missing the document. + // FIXME make sure all callers are aware of this! + EXPECT_EQ(replica_of(api::Timestamp(0), bucketId, 0), *op->newest_replica()); } TEST_F(GetOperationTest, not_found_on_subset_of_replicas_marks_get_as_inconsistent) { @@ -403,8 +448,12 @@ TEST_F(GetOperationTest, resend_on_storage_failure) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); + + EXPECT_TRUE(op->any_replicas_failed()); // Replica had read failure, but they're still in sync. An immutable Get won't change that fact. EXPECT_TRUE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 2), *op->newest_replica()); } TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) { @@ -417,7 +466,11 @@ TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_in ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 3) ReturnCode(NONE)", _sender.getLastReply()); + + EXPECT_TRUE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 2), *op->newest_replica()); } TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) { @@ -442,7 +495,10 @@ TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 0) ReturnCode(IO_FAILURE)", _sender.getLastReply()); + + EXPECT_TRUE(op->any_replicas_failed()); EXPECT_TRUE(last_reply_had_consistent_replicas()); // Doesn't really matter since operation itself failed + EXPECT_FALSE(op->newest_replica().has_value()); } TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) { @@ -462,6 +518,8 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) { "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); EXPECT_TRUE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 1), *op->newest_replica()); } TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) { @@ -469,7 +527,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) { // Node 0 is local copy to distributor 0 and will be preferred when // sending initially. - addNodesToBucketDB(document::BucketId(16, 0x0593), "2=100,0=100"); + addNodesToBucketDB(BucketId(16, 0x0593), "2=100,0=100"); sendGet(); @@ -487,9 +545,12 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 3) ReturnCode(NONE)", _sender.getLastReply()); - EXPECT_EQ("newestauthor", getLastReplyAuthor()); + + EXPECT_TRUE(op->any_replicas_failed()); EXPECT_TRUE(last_reply_had_consistent_replicas()); + ASSERT_TRUE(op->newest_replica().has_value()); + EXPECT_EQ(replica_of(api::Timestamp(3), BucketId(16, 0x0593), 2), *op->newest_replica()); } TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) { diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index f1900c40d56..82466ec5ed7 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -32,9 +32,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil { document::TestDocRepo _testRepo; std::shared_ptr<const DocumentTypeRepo> _repo; const DocumentType* _doc_type; + DistributorMessageSenderStub _sender; TwoPhaseUpdateOperationTest(); - ~TwoPhaseUpdateOperationTest(); + ~TwoPhaseUpdateOperationTest() override; void checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const; @@ -81,6 +82,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); + void reply_to_metadata_get( + Operation& callback, + DistributorMessageSenderStub& sender, + uint32_t index, + uint64_t old_timestamp, + api::ReturnCode::Result result = api::ReturnCode::OK, + const std::string& trace_msg = ""); + struct UpdateOptions { bool _makeInconsistentSplit; bool _createIfNonExistent; @@ -130,6 +139,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil { Timestamp highest_get_timestamp, Timestamp expected_response_timestamp); + std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase); + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + cb->start(_sender, framework::MilliSecTime(0)); + return cb; + } + }; TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default; @@ -218,6 +235,24 @@ TwoPhaseUpdateOperationTest::replyToGet( callback.receive(sender, reply); } +void +TwoPhaseUpdateOperationTest::reply_to_metadata_get( + Operation& callback, + DistributorMessageSenderStub& sender, + uint32_t index, + uint64_t old_timestamp, + api::ReturnCode::Result result, + const std::string& trace_msg) +{ + auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index)); + auto reply = std::make_shared<api::GetReply>(get, std::shared_ptr<Document>(), old_timestamp); + reply->setResult(api::ReturnCode(result, "")); + if (!trace_msg.empty()) { + MBUS_TRACE(reply->getTrace(), 1, trace_msg); + } + callback.receive(sender, reply); +} + namespace { struct DummyTransportContext : api::TransportContext { @@ -281,234 +316,212 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, TEST_F(TwoPhaseUpdateOperationTest, simple) { setupDistributor(1, 1, "storage:1 distributor:1"); + auto cb = sendUpdate("0=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Update => 0", sender.getCommands(true)); + ASSERT_EQ("Update => 0", _sender.getCommands(true)); - replyToMessage(*cb, sender, 0, 90); + replyToMessage(*cb, _sender, 0, 90); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, non_existing) { setupDistributor(1, 1, "storage:1 distributor:1"); - - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate(""); + cb->start(_sender, framework::MilliSecTime(0)); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, update_failed) { setupDistributor(1, 1, "storage:1 distributor:1"); + auto cb = sendUpdate("0=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + ASSERT_EQ("Update => 0", _sender.getCommands(true)); - ASSERT_EQ("Update => 0", sender.getCommands(true)); - - replyToMessage(*cb, sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE); + replyToMessage(*cb, _sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) " "ReturnCode(INTERNAL_FAILURE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) { setupDistributor(2, 2, "storage:2 distributor:1"); + auto cb = sendUpdate("0=1/2/3,1=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); - replyToMessage(*cb, sender, 0, 90); - replyToMessage(*cb, sender, 1, 110); + replyToMessage(*cb, _sender, 0, 90); + replyToMessage(*cb, _sender, 1, 110); - ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true)); + ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true)); - replyToGet(*cb, sender, 2, 110); + replyToGet(*cb, _sender, 2, 110); - ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", sender.getCommands(true)); + ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", _sender.getCommands(true)); - ASSERT_TRUE(sender.replies().empty()); + ASSERT_TRUE(_sender.replies().empty()); - replyToPut(*cb, sender, 3); - replyToPut(*cb, sender, 4); + replyToPut(*cb, _sender, 3); + replyToPut(*cb, _sender, 4); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 110 Was inconsistent " "(best node 1)) ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) { setupDistributor(2, 2, "storage:2 distributor:1"); + auto cb = sendUpdate("0=1/2/3,1=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); - replyToMessage(*cb, sender, 0, 90); - replyToMessage(*cb, sender, 1, 110); + replyToMessage(*cb, _sender, 0, 90); + replyToMessage(*cb, _sender, 1, 110); - ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true)); - ASSERT_TRUE(sender.replies().empty()); + ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true)); + ASSERT_TRUE(_sender.replies().empty()); - replyToGet(*cb, sender, 2, 110, false); + replyToGet(*cb, _sender, 2, 110, false); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 110 Was inconsistent " "(best node 1)) ReturnCode(INTERNAL_FAILURE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_error) { setupDistributor(2, 2, "storage:2 distributor:1"); + auto cb = sendUpdate("0=1/2/3,1=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); - - replyToMessage(*cb, sender, 0, 90); - ASSERT_TRUE(sender.replies().empty()); - replyToMessage(*cb, sender, 1, 110, api::ReturnCode::IO_FAILURE); + replyToMessage(*cb, _sender, 0, 90); + ASSERT_TRUE(_sender.replies().empty()); + replyToMessage(*cb, _sender, 1, 110, api::ReturnCode::IO_FAILURE); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 90) " "ReturnCode(IO_FAILURE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) { setupDistributor(2, 2, "storage:2 distributor:1"); + auto cb = sendUpdate("0=1/2/3,1=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); - replyToMessage(*cb, sender, 0, 90); - replyToMessage(*cb, sender, 1, 110); + replyToMessage(*cb, _sender, 0, 90); + replyToMessage(*cb, _sender, 1, 110); ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", - sender.getLastCommand(true)); + _sender.getLastCommand(true)); - ASSERT_TRUE(sender.replies().empty()); - replyToGet(*cb, sender, 2, 110, false, api::ReturnCode::IO_FAILURE); + ASSERT_TRUE(_sender.replies().empty()); + replyToGet(*cb, _sender, 2, 110, false, api::ReturnCode::IO_FAILURE); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 110 Was inconsistent " "(best node 1)) ReturnCode(IO_FAILURE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); - - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); - replyToMessage(*cb, sender, 0, 90); - replyToMessage(*cb, sender, 1, 110); + replyToMessage(*cb, _sender, 0, 90); + replyToMessage(*cb, _sender, 1, 110); ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", - sender.getLastCommand(true)); + _sender.getLastCommand(true)); - replyToGet(*cb, sender, 2, 110); + replyToGet(*cb, _sender, 2, 110); ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", - sender.getCommands(true)); + _sender.getCommands(true)); - replyToPut(*cb, sender, 3, api::ReturnCode::IO_FAILURE); - ASSERT_TRUE(sender.replies().empty()); - replyToPut(*cb, sender, 4); + replyToPut(*cb, _sender, 3, api::ReturnCode::IO_FAILURE); + ASSERT_TRUE(_sender.replies().empty()); + replyToPut(*cb, _sender, 4); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 110 Was inconsistent " "(best node 1)) ReturnCode(IO_FAILURE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_started) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); + auto cb = sendUpdate("0=1/2/3,1=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); - - replyToMessage(*cb, sender, 0, 90); - replyToMessage(*cb, sender, 1, 110); + replyToMessage(*cb, _sender, 0, 90); + replyToMessage(*cb, _sender, 1, 110); ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", - sender.getLastCommand(true)); - checkMessageSettingsPropagatedTo(sender.commands().back()); + _sender.getLastCommand(true)); + checkMessageSettingsPropagatedTo(_sender.commands().back()); enableDistributorClusterState("storage:0 distributor:1"); - ASSERT_TRUE(sender.replies().empty()); - replyToGet(*cb, sender, 2, 110); + ASSERT_TRUE(_sender.replies().empty()); + replyToGet(*cb, _sender, 2, 110); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 110 Was inconsistent " "(best node 1)) ReturnCode(NOT_CONNECTED, " "Can't store document: No storage nodes available)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsistent_split) { setupDistributor(2, 2, "storage:2 distributor:1"); - - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=1/2/3", - UpdateOptions().makeInconsistentSplit(true))); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=1/2/3", UpdateOptions().makeInconsistentSplit(true)); + cb->start(_sender, framework::MilliSecTime(0)); std::string wanted("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0," "Get(BucketId(0x440000000000cac4), id:ns:testdoctype1::1) => 0"); - std::string text = sender.getCommands(true, true); + std::string text = _sender.getCommands(true, true); ASSERT_EQ(wanted, text); - replyToGet(*cb, sender, 0, 90); - replyToGet(*cb, sender, 1, 120); + replyToGet(*cb, _sender, 0, 90); + replyToGet(*cb, _sender, 1, 120); ASSERT_EQ("Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, " "timestamp 200000000, size 60) => 1," "Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, " "timestamp 200000000, size 60) => 0", - sender.getCommands(true, true, 2)); + _sender.getCommands(true, true, 2)); - replyToPut(*cb, sender, 2); - ASSERT_TRUE(sender.replies().empty()); - replyToPut(*cb, sender, 3); + replyToPut(*cb, _sender, 2); + ASSERT_TRUE(_sender.replies().empty()); + replyToPut(*cb, _sender, 3); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 120) " "ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } void @@ -523,33 +536,30 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo( TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_update) { setupDistributor(1, 1, "storage:1 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Update => 0", sender.getCommands(true)); + ASSERT_EQ("Update => 0", _sender.getCommands(true)); - StorageCommand::SP msg(sender.commands().back()); + StorageCommand::SP msg(_sender.commands().back()); checkMessageSettingsPropagatedTo(msg); } TEST_F(TwoPhaseUpdateOperationTest, n_of_m) { setupDistributor(2, 2, "storage:2 distributor:1", 1); + auto cb = sendUpdate("0=1/2/3,1=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true)); - - ASSERT_TRUE(sender.replies().empty()); - replyToMessage(*cb, sender, 0, 90); + ASSERT_TRUE(_sender.replies().empty()); + replyToMessage(*cb, _sender, 0, 90); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); - replyToMessage(*cb, sender, 1, 123); + replyToMessage(*cb, _sender, 1, 123); } std::string @@ -565,132 +575,114 @@ TwoPhaseUpdateOperationTest::getUpdatedValueFromLastPut( TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document) { setupDistributor(3, 3, "storage:3 distributor:1"); // 0,1 in sync. 2 out of sync. - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4")); - - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0," "Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 2", - sender.getCommands(true, true)); - replyToGet(*cb, sender, 0, 50); - replyToGet(*cb, sender, 1, 70); + _sender.getCommands(true, true)); + replyToGet(*cb, _sender, 0, 50); + replyToGet(*cb, _sender, 1, 70); ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1," "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2," "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0", - sender.getCommands(true, true, 2)); + _sender.getCommands(true, true, 2)); // Make sure Put contains an updated document (+10 arith. update on field // whose value equals gotten timestamp). In this case we want 70 -> 80. - ASSERT_EQ("80", getUpdatedValueFromLastPut(sender)); + ASSERT_EQ("80", getUpdatedValueFromLastPut(_sender)); - replyToPut(*cb, sender, 2); - replyToPut(*cb, sender, 3); - ASSERT_TRUE(sender.replies().empty()); - replyToPut(*cb, sender, 4); + replyToPut(*cb, _sender, 2); + replyToPut(*cb, _sender, 3); + ASSERT_TRUE(_sender.replies().empty()); + replyToPut(*cb, _sender, 4); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 70) " "ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_all_empty_gets) { setupDistributor(3, 3, "storage:3 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", - UpdateOptions().createIfNonExistent(true))); + auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true)); + cb->start(_sender, framework::MilliSecTime(0)); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true)); - replyToGet(*cb, sender, 0, 0, false); - replyToGet(*cb, sender, 1, 0, false); + ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, 0, false); + replyToGet(*cb, _sender, 1, 0, false); // Since create-if-non-existent is set, distributor should create doc from // scratch. ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1," "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2," "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0", - sender.getCommands(true, true, 2)); + _sender.getCommands(true, true, 2)); - ASSERT_EQ("10", getUpdatedValueFromLastPut(sender)); + ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender)); - replyToPut(*cb, sender, 2); - replyToPut(*cb, sender, 3); - ASSERT_TRUE(sender.replies().empty()); - replyToPut(*cb, sender, 4); + replyToPut(*cb, _sender, 2); + replyToPut(*cb, _sender, 3); + ASSERT_TRUE(_sender.replies().empty()); + replyToPut(*cb, _sender, 4); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 200000000) " "ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) { setupDistributor(3, 3, "storage:3 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", - UpdateOptions().createIfNonExistent(true))); - - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true)); + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true)); - replyToGet(*cb, sender, 0, 0, false); - replyToGet(*cb, sender, 1, 0, false); + ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, 0, false); + replyToGet(*cb, _sender, 1, 0, false); // Since create-if-non-existent is set, distributor should create doc from // scratch. - ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2)); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2)); - replyToPut(*cb, sender, 2); - replyToPut(*cb, sender, 3); - ASSERT_TRUE(sender.replies().empty()); - replyToPut(*cb, sender, 4, api::ReturnCode::IO_FAILURE); + replyToPut(*cb, _sender, 2); + replyToPut(*cb, _sender, 3); + ASSERT_TRUE(_sender.replies().empty()); + replyToPut(*cb, _sender, 4, api::ReturnCode::IO_FAILURE); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 200000000) " "ReturnCode(IO_FAILURE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", - UpdateOptions().createIfNonExistent(true))); - - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true)); + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, 0, false, api::ReturnCode::IO_FAILURE); - ASSERT_TRUE(sender.replies().empty()); - replyToGet(*cb, sender, 1, 0, false, api::ReturnCode::IO_FAILURE); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, 0, false, api::ReturnCode::IO_FAILURE); + ASSERT_TRUE(_sender.replies().empty()); + replyToGet(*cb, _sender, 1, 0, false, api::ReturnCode::IO_FAILURE); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) " "ReturnCode(IO_FAILURE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) { setupDistributor(2, 2, "storage:2 distributor:1"); // Create update for wrong doctype which will fail the update. - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError())); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError()); + cb->start(_sender, framework::MilliSecTime(0)); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, 50); - ASSERT_TRUE(sender.replies().empty()); - replyToGet(*cb, sender, 1, 70); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, 50); + ASSERT_TRUE(_sender.replies().empty()); + replyToGet(*cb, _sender, 1, 70); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " @@ -698,95 +690,88 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) { "ReturnCode(INTERNAL_FAILURE, Can not apply a " "\"testdoctype2\" document update to a " "\"testdoctype1\" document.)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) { setupDistributor(1, 1, "storage:1 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("", UpdateOptions().createIfNonExistent(true))); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true)); + cb->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) " "Reasons to start: => 0," "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, " "timestamp 200000000, size 60) => 0", - sender.getCommands(true, true)); + _sender.getCommands(true, true)); - ASSERT_EQ("10", getUpdatedValueFromLastPut(sender)); + ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender)); - replyToCreateBucket(*cb, sender, 0); - ASSERT_TRUE(sender.replies().empty()); - replyToPut(*cb, sender, 1); + replyToCreateBucket(*cb, _sender, 0); + ASSERT_TRUE(_sender.replies().empty()); + replyToPut(*cb, _sender, 1); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 200000000) " "ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_timestamp_constraint) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", - UpdateOptions().timestampToUpdate(1234))); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().timestampToUpdate(1234)); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, 100); - ASSERT_TRUE(sender.replies().empty()); - replyToGet(*cb, sender, 1, 110); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, 100); + ASSERT_TRUE(_sender.replies().empty()); + replyToGet(*cb, _sender, 1, 110); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) " "ReturnCode(NONE, No document with requested " "timestamp found)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings_to_gets_and_puts) { setupDistributor(3, 3, "storage:3 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true)); - checkMessageSettingsPropagatedTo(sender.command(0)); - checkMessageSettingsPropagatedTo(sender.command(1)); - replyToGet(*cb, sender, 0, 50); - replyToGet(*cb, sender, 1, 70); - ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2)); - checkMessageSettingsPropagatedTo(sender.command(2)); - checkMessageSettingsPropagatedTo(sender.command(3)); - checkMessageSettingsPropagatedTo(sender.command(4)); - replyToPut(*cb, sender, 2); - replyToPut(*cb, sender, 3); - replyToPut(*cb, sender, 4); + auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"); + cb->start(_sender, framework::MilliSecTime(0)); + + ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); + checkMessageSettingsPropagatedTo(_sender.command(0)); + checkMessageSettingsPropagatedTo(_sender.command(1)); + replyToGet(*cb, _sender, 0, 50); + replyToGet(*cb, _sender, 1, 70); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2)); + checkMessageSettingsPropagatedTo(_sender.command(2)); + checkMessageSettingsPropagatedTo(_sender.command(3)); + checkMessageSettingsPropagatedTo(_sender.command(4)); + replyToPut(*cb, _sender, 2); + replyToPut(*cb, _sender, 3); + replyToPut(*cb, _sender, 4); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replies) { setupDistributor(3, 3, "storage:3 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true)); - replyToGet(*cb, sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings"); - replyToGet(*cb, sender, 1, 70); - ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2)); - replyToPut(*cb, sender, 2, api::ReturnCode::OK, "fooo"); - replyToPut(*cb, sender, 3, api::ReturnCode::OK, "baaa"); - ASSERT_TRUE(sender.replies().empty()); - replyToPut(*cb, sender, 4); + auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"); + cb->start(_sender, framework::MilliSecTime(0)); + + ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings"); + replyToGet(*cb, _sender, 1, 70); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2)); + replyToPut(*cb, _sender, 2, api::ReturnCode::OK, "fooo"); + replyToPut(*cb, _sender, 3, api::ReturnCode::OK, "baaa"); + ASSERT_TRUE(_sender.replies().empty()); + replyToPut(*cb, _sender, 4); - ASSERT_EQ("Update Reply", sender.getLastReply(false)); + ASSERT_EQ("Update Reply", _sender.getLastReply(false)); - std::string trace(sender.replies().back()->getTrace().toString()); + std::string trace(_sender.replies().back()->getTrace().toString()); ASSERT_THAT(trace, HasSubstr("hello earthlings")); ASSERT_THAT(trace, HasSubstr("fooo")); ASSERT_THAT(trace, HasSubstr("baaa")); @@ -798,13 +783,11 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec Timestamp expected_response_timestamp) { setupDistributor(2, 2, "storage:2 distributor:1"); - // Update towards inconsistent bucket invokes safe path. - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); // Alter cluster state so that distributor is now down (technically the // entire cluster is down in this state, but this should not matter). In @@ -813,8 +796,8 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec // to a bucket we no longer own. enableDistributorClusterState("storage:2 distributor:1 .0.s:d"); getBucketDatabase().clear(); - replyToGet(*cb, sender, 0, lowest_get_timestamp); - replyToGet(*cb, sender, 1, highest_get_timestamp); + replyToGet(*cb, _sender, 0, lowest_get_timestamp); + replyToGet(*cb, _sender, 1, highest_get_timestamp); // BUCKET_NOT_FOUND is a transient error code which should cause the client // to re-send the operation, presumably to the correct distributor the next @@ -827,7 +810,7 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec "ReturnCode(BUCKET_NOT_FOUND, Distributor lost " "ownership of bucket between executing the read " "and write phases of a two-phase update operation)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) { @@ -843,46 +826,37 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition( - "testdoctype1.headerval==120"))); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + cb->start(_sender, framework::MilliSecTime(0)); // Newest doc has headerval==110, not 120. - replyToGet(*cb, sender, 0, 100); - replyToGet(*cb, sender, 1, 110); + replyToGet(*cb, _sender, 0, 100); + replyToGet(*cb, _sender, 1, 110); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) " "ReturnCode(TEST_AND_SET_CONDITION_FAILED, " "Condition did not match document)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_updated_doc) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition( - "testdoctype1.headerval==110"))); - - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - replyToGet(*cb, sender, 0, 100); - replyToGet(*cb, sender, 1, 110); - ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==110")); + + cb->start(_sender, framework::MilliSecTime(0)); + replyToGet(*cb, _sender, 0, 100); + replyToGet(*cb, _sender, 1, 110); + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2)); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with_illegal_params_error) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition( - "testdoctype1.san==fran...cisco"))); - - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - replyToGet(*cb, sender, 0, 100); - replyToGet(*cb, sender, 1, 110); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.san==fran...cisco")); + + cb->start(_sender, framework::MilliSecTime(0)); + replyToGet(*cb, _sender, 0, 100); + replyToGet(*cb, _sender, 1, 110); // NOTE: condition is currently not attempted parsed until Gets have been // replied to. This may change in the future. // XXX reliance on parser/exception error message is very fragile. @@ -893,19 +867,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with "Failed to parse test and set condition: " "syntax error, unexpected . at column 24 when " "parsing selection 'testdoctype1.san==fran...cisco')", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_with_illegal_params_error) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition( - "langbein.headerval=1234"))); - - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - replyToGet(*cb, sender, 0, 100); - replyToGet(*cb, sender, 1, 110); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("langbein.headerval=1234")); + + cb->start(_sender, framework::MilliSecTime(0)); + replyToGet(*cb, _sender, 0, 100); + replyToGet(*cb, _sender, 1, 110); // NOTE: condition is currently not attempted parsed until Gets have been // replied to. This may change in the future. EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " @@ -915,40 +886,35 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_w "Failed to parse test and set condition: " "Document type 'langbein' not found at column 1 " "when parsing selection 'langbein.headerval=1234')", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_auto_create_fails_with_tas_error) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition( - "testdoctype1.headerval==120"))); + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + cb->start(_sender, framework::MilliSecTime(0)); // Both Gets return nothing at all, nothing at all. - replyToGet(*cb, sender, 0, 100, false); - replyToGet(*cb, sender, 1, 110, false); + replyToGet(*cb, _sender, 0, 100, false); + replyToGet(*cb, _sender, 1, 110, false); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 0) " "ReturnCode(TEST_AND_SET_CONDITION_FAILED, " "Document did not exist)", - sender.getLastReply(true)); + _sender.getLastReply(true)); } TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_auto_create_sends_puts) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb( - sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions() + auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions() .condition("testdoctype1.headerval==120") - .createIfNonExistent(true))); + .createIfNonExistent(true)); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - replyToGet(*cb, sender, 0, 100, false); - replyToGet(*cb, sender, 1, 110, false); - ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); + cb->start(_sender, framework::MilliSecTime(0)); + replyToGet(*cb, _sender, 0, 100, false); + replyToGet(*cb, _sender, 1, 110, false); + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2)); } void @@ -966,11 +932,10 @@ TwoPhaseUpdateOperationTest::assertAbortedUpdateReplyWithContextPresent( TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) { setupDistributor(1, 1, "storage:1 distributor:1"); // Only 1 replica; consistent with itself by definition. - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3")); - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3"); + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Update => 0", sender.getCommands(true)); + ASSERT_EQ("Update => 0", _sender.getCommands(true)); // Close the operation. This should generate a single reply that is // bound to the original command. We can identify rogue replies by these // not having a transport context, as these are unique_ptrs that are @@ -985,11 +950,10 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) { TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) { setupDistributor(2, 2, "storage:2 distributor:1"); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); // Closing the operation should now only return an ABORTED reply for // the UpdateCommand, _not_ from the nested, pending Get operation (which // will implicitly generate an ABORTED reply for the synthesized Get @@ -1004,24 +968,23 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_re setupDistributor(2, 2, "storage:2 distributor:1"); getConfig().set_update_fast_path_restart_enabled(true); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. + cb->start(_sender, framework::MilliSecTime(0)); Timestamp old_timestamp = 500; - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, old_timestamp); - replyToGet(*cb, sender, 1, old_timestamp); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, old_timestamp); + replyToGet(*cb, _sender, 1, old_timestamp); - ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2)); - replyToMessage(*cb, sender, 2, old_timestamp); - replyToMessage(*cb, sender, 3, old_timestamp); + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2)); + replyToMessage(*cb, _sender, 2, old_timestamp); + replyToMessage(*cb, _sender, 3, old_timestamp); EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " "timestamp 0, timestamp of updated doc: 500) " "ReturnCode(NONE)", - sender.getLastReply(true)); + _sender.getLastReply(true)); auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; EXPECT_EQ(1, metrics.fast_path_restarts.getValue()); @@ -1031,17 +994,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do setupDistributor(2, 2, "storage:2 distributor:1"); getConfig().set_update_fast_path_restart_enabled(false); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. + cb->start(_sender, framework::MilliSecTime(0)); Timestamp old_timestamp = 500; - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, old_timestamp); - replyToGet(*cb, sender, 1, old_timestamp); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, old_timestamp); + replyToGet(*cb, _sender, 1, old_timestamp); // Should _not_ be restarted with fast path, as it has been config disabled - ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2)); auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; EXPECT_EQ(0, metrics.fast_path_restarts.getValue()); @@ -1051,9 +1013,8 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter setupDistributor(3, 3, "storage:3 distributor:1"); getConfig().set_update_fast_path_restart_enabled(true); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. + cb->start(_sender, framework::MilliSecTime(0)); // Replica set changes between time of Get requests sent and // responses received. This may happen e.g. if concurrent mutations @@ -1065,27 +1026,38 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); Timestamp old_timestamp = 500; - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, old_timestamp); - replyToGet(*cb, sender, 1, old_timestamp); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, old_timestamp); + replyToGet(*cb, _sender, 1, old_timestamp); - ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2)); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2)); } TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_found_on_a_replica_node) { setupDistributor(2, 2, "storage:2 distributor:1"); getConfig().set_update_fast_path_restart_enabled(true); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, Timestamp(0), false); - replyToGet(*cb, sender, 1, Timestamp(500)); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*cb, _sender, 0, Timestamp(0), false); + replyToGet(*cb, _sender, 1, Timestamp(500)); // Should _not_ send Update operations! - ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2)); +} + +// Buckets must be created from scratch by Put operations, updates alone cannot do this. +TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_no_initial_replicas_exist) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_update_fast_path_restart_enabled(true); + + // No replicas, technically consistent but cannot use fast path. + auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true)); + cb->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", + _sender.getCommands(true)); } // The weak consistency config _only_ applies to Get operations initiated directly @@ -1095,15 +1067,218 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency setupDistributor(2, 2, "storage:2 distributor:1"); getConfig().set_use_weak_internal_read_consistency_for_client_gets(true); - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. + cb->start(_sender, framework::MilliSecTime(0)); - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - auto& get_cmd = dynamic_cast<const api::GetCommand&>(*sender.command(0)); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0)); EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong); } +struct ThreePhaseUpdateTest : TwoPhaseUpdateOperationTest {}; + +TEST_F(ThreePhaseUpdateTest, metadata_only_gets_are_sent_if_3phase_update_enabled) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + { + auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0)); + EXPECT_EQ("[none]", get_cmd.getFieldSet()); + EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak); + checkMessageSettingsPropagatedTo(_sender.command(0)); + } + { + auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(1)); + EXPECT_EQ("[none]", get_cmd.getFieldSet()); + EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak); + checkMessageSettingsPropagatedTo(_sender.command(1)); + } +} + +TEST_F(ThreePhaseUpdateTest, full_document_get_sent_to_replica_with_highest_timestamp) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*cb, _sender, 0, 1000U); + reply_to_metadata_get(*cb, _sender, 1, 2000U); + // Node 1 has newest document version at ts=2000 + ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2)); + { + auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(2)); + EXPECT_EQ("[all]", get_cmd.getFieldSet()); + EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong); + } +} + +TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*cb, _sender, 0, 2000U); + reply_to_metadata_get(*cb, _sender, 1, 1000U); + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + replyToGet(*cb, _sender, 2, 2000U); + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3)); +} + +TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + api::Timestamp old_timestamp(1500); + reply_to_metadata_get(*cb, _sender, 0, old_timestamp); + reply_to_metadata_get(*cb, _sender, 1, old_timestamp); + + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2)); + replyToMessage(*cb, _sender, 2, old_timestamp); + replyToMessage(*cb, _sender, 3, old_timestamp); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 1500) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + EXPECT_EQ(1, metrics.fast_path_restarts.getValue()); +} + +TEST_F(ThreePhaseUpdateTest, fast_path_not_restarted_if_document_not_found_subset_of_replicas) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*cb, _sender, 0, 0U); + reply_to_metadata_get(*cb, _sender, 1, 1000U); + ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2)); // Not sending updates. +} + +TEST_F(ThreePhaseUpdateTest, no_document_found_on_any_replicas_is_considered_consistent) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + api::Timestamp no_document_timestamp(0); + reply_to_metadata_get(*cb, _sender, 0, no_document_timestamp); + reply_to_metadata_get(*cb, _sender, 1, no_document_timestamp); + + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2)); + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + EXPECT_EQ(1, metrics.fast_path_restarts.getValue()); +} + +TEST_F(ThreePhaseUpdateTest, metadata_get_phase_fails_if_any_replicas_return_failure) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*cb, _sender, 1, 1000U); + reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE); + ASSERT_EQ("", _sender.getCommands(true, false, 2)); // No further requests sent. + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(ABORTED, One or more metadata Get operations failed; aborting Update)", + _sender.getLastReply(true)); +} + +TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_set_changed_after_metadata_gets) { + setupDistributor(3, 3, "storage:3 distributor:1"); + getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true); + auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more. + cb->start(_sender, framework::MilliSecTime(0)); + // Add new replica to deterministic test bucket after gets have been sent + BucketId bucket(0x400000000000cac4); // Always the same in the test. + addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); + + Timestamp old_timestamp = 500; + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*cb, _sender, 0, old_timestamp); + reply_to_metadata_get(*cb, _sender, 1, old_timestamp); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)", + _sender.getLastReply(true)); +} + +/* + * We unify checking for changed replica sets and changed bucket ownership by only + * checking for changed replica sets, thereby avoiding a relatively costly ideal + * state recomputation that is otherwise redundant. Rationale for why this shall + * always be safe: + * - for metadata gets to be sent at all, there must be at least one replica + * under the target bucket subtree + * - if there are no replicas, the bucket is implicitly considered inconsistent, + * triggering safe path + * - since there were no replicas initially, the safe path will _not_ restart in + * fast path + * - the safe path will perform the update locally and start a PutOperation, + * implicitly creating new replicas + * - this happens in the same execution context as starting the update operation itself, + * consequently ownership in DB cannot have changed concurrently + * - when the a state transition happens where a distributor loses ownership of + * a bucket, it will always immediately purge it from its DB + * - this means that the replica set will inherently change + * + * It is technically possible to have an ABA situation where, in the course of + * an operation's lifetime, a distributor goes from owning a bucket to not + * owning it, back to owning it again. Although extremely unlikely to happen, + * it doesn't matter since the bucket info from the resulting mutations will + * be applied to the current state of the database anyway. + */ +TEST_F(ThreePhaseUpdateTest, update_aborted_if_ownership_changed_between_gets_and_fast_restart_update) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + // See do_test_ownership_changed_between_gets_and_second_phase() for more in-depth + // comments on why this particular cluster state is used. + enableDistributorClusterState("storage:2 distributor:1 .0.s:d"); + getBucketDatabase().clear(); + reply_to_metadata_get(*cb, _sender, 0, api::Timestamp(70)); + reply_to_metadata_get(*cb, _sender, 1, api::Timestamp(71)); + + // As mentioned in the above comments, ownership changes trigger + // on the replicas changed test instead of an explicit ownership + // change test. + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)", + _sender.getLastReply(true)); +} + +TEST_F(ThreePhaseUpdateTest, safe_mode_is_implicitly_triggered_if_no_replicas_exist) { + setupDistributor(1, 1, "storage:1 distributor:1"); + getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true); + auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true)); + cb->start(_sender, framework::MilliSecTime(0)); + + ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) " + "Reasons to start: => 0," + "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, " + "timestamp 200000000, size 60) => 0", + _sender.getCommands(true, true)); +} + +TEST_F(ThreePhaseUpdateTest, metadata_gets_propagate_mbus_trace_to_reply) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*cb, _sender, 1, 1000U); + reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE, + "'ello 'ello what's all this then?"); + ASSERT_EQ("", _sender.getCommands(true, false, 2)); + ASSERT_EQ("Update Reply", _sender.getLastReply(false)); + + std::string trace(_sender.replies().back()->getTrace().toString()); + ASSERT_THAT(trace, HasSubstr("'ello 'ello what's all this then?")); +} + +TEST_F(ThreePhaseUpdateTest, single_get_mbus_trace_is_propagated_to_reply) { + auto cb = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*cb, _sender, 0, 0U); + reply_to_metadata_get(*cb, _sender, 1, 1000U); + ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2)); + replyToGet(*cb, _sender, 2, 2000U, false, api::ReturnCode::INTERNAL_FAILURE, + "it is me, Leclerc! *lifts glasses*"); + ASSERT_EQ("Update Reply", _sender.getLastReply(false)); + + std::string trace(_sender.replies().back()->getTrace().toString()); + ASSERT_THAT(trace, HasSubstr("it is me, Leclerc! *lifts glasses*")); +} + // XXX currently differs in behavior from content nodes in that updates for // document IDs without explicit doctypes will _not_ be auto-failed on the // distributor. diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 522561ee8a5..9f51d70ce60 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -42,6 +42,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _update_fast_path_restart_enabled(false), _merge_operations_disabled(false), _use_weak_internal_read_consistency_for_client_gets(false), + _enable_metadata_only_fetch_phase_for_inconsistent_updates(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -155,6 +156,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent; _merge_operations_disabled = config.mergeOperationsDisabled; _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets; + _enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 333d7073715..b8e99165d69 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -235,6 +235,13 @@ public: return _use_weak_internal_read_consistency_for_client_gets; } + void set_enable_metadata_only_fetch_phase_for_inconsistent_updates(bool enable) noexcept { + _enable_metadata_only_fetch_phase_for_inconsistent_updates = enable; + } + bool enable_metadata_only_fetch_phase_for_inconsistent_updates() const noexcept { + return _enable_metadata_only_fetch_phase_for_inconsistent_updates; + } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -281,6 +288,7 @@ private: bool _update_fast_path_restart_enabled; bool _merge_operations_disabled; bool _use_weak_internal_read_consistency_for_client_gets; + bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index e4d3182ce7a..915b9b6b304 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -228,3 +228,12 @@ merge_operations_disabled bool default=false ## consistent view of fields across document versions. ## This is mostly useful in a system that is effectively read-only. use_weak_internal_read_consistency_for_client_gets bool default=false + +## If true, adds an initial metadata-only fetch phase to updates that touch buckets +## with inconsistent replicas. Metadata timestamps are compared and a single full Get +## is sent _only_ to one node with the highest timestamp. Without a metadata phase, +## full gets would be sent to _all_ nodes. +## Setting this option to true always implicitly enables the fast update restart +## feature, so it's not required to set that config to true, nor will setting it +## to false actually disable the feature. +enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false
\ No newline at end of file diff --git a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt index efef4e5e51d..998fc22b1d7 100644 --- a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt @@ -2,6 +2,7 @@ vespa_add_library(storage_distributoroperationexternal OBJECT SOURCES getoperation.cpp + newest_replica.cpp putoperation.cpp removelocationoperation.cpp removeoperation.cpp diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 9e66c212ac6..f800166a647 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -56,11 +56,12 @@ GetOperation::GetOperation(DistributorComponent& manager, _msg(std::move(msg)), _returnCode(api::ReturnCode::OK), _doc(), - _lastModified(), + _newest_replica(), _metric(metric), _operationTimer(manager.getClock()), _desired_read_consistency(desired_read_consistency), - _has_replica_inconsistency(false) + _has_replica_inconsistency(false), + _any_replicas_failed(false) { assignTargetNodeGroups(*read_guard); } @@ -110,7 +111,9 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document:: LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode()); - res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, res[best].copy.getNode(), command); + const auto target_node = res[best].copy.getNode(); + res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, target_node, command); + res[best].to_node = target_node; return true; } @@ -145,27 +148,31 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr< bool allDone = true; for (auto& response : _responses) { for (uint32_t i = 0; i < response.second.size(); i++) { - if (response.second[i].sent == getreply->getMsgId()) { + const auto& bucket_id = response.first.getBucketId(); + auto& send_state = response.second[i]; + if (send_state.sent == getreply->getMsgId()) { LOG(debug, "Get on %s returned %s", _msg->getDocumentId().toString().c_str(), getreply->getResult().toString().c_str()); - response.second[i].received = true; - response.second[i].returnCode = getreply->getResult(); + send_state.received = true; + send_state.returnCode = getreply->getResult(); if (getreply->getResult().success()) { - if (_lastModified.has_value() && (getreply->getLastModifiedTimestamp() != *_lastModified)) { + if (_newest_replica.has_value() && (getreply->getLastModifiedTimestamp() != _newest_replica->timestamp)) { // At least two document versions returned had different timestamps. _has_replica_inconsistency = true; // This is a one-way toggle. } - if (!_lastModified.has_value() || getreply->getLastModifiedTimestamp() > *_lastModified) { + if (!_newest_replica.has_value() || getreply->getLastModifiedTimestamp() > _newest_replica->timestamp) { _returnCode = getreply->getResult(); - _lastModified = getreply->getLastModifiedTimestamp(); + assert(response.second[i].to_node != UINT16_MAX); + _newest_replica = NewestReplica::of(getreply->getLastModifiedTimestamp(), bucket_id, send_state.to_node); _doc = getreply->getDocument(); } } else { - if (!_lastModified.has_value()) { - _returnCode = getreply->getResult(); + _any_replicas_failed = true; + if (!_newest_replica.has_value()) { + _returnCode = getreply->getResult(); // Don't overwrite if we have a good response. } if (!all_bucket_metadata_initially_consistent()) { // If we're sending to more than a single group of replicas it means our replica set is @@ -175,7 +182,7 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr< } // Try to send to another node in this checksum group. - bool sent = sendForChecksum(sender, response.first.getBucketId(), response.second); + bool sent = sendForChecksum(sender, bucket_id, response.second); if (sent) { allDone = false; } @@ -219,7 +226,8 @@ void GetOperation::sendReply(DistributorMessageSender& sender) { if (_msg.get()) { - auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified.value_or(0), !_has_replica_inconsistency); + const auto timestamp = _newest_replica.value_or(NewestReplica::make_empty()).timestamp; + auto repl = std::make_shared<api::GetReply>(*_msg, _doc, timestamp, !_has_replica_inconsistency); repl->setResult(_returnCode); update_internal_metrics(); sender.sendReply(repl); @@ -250,9 +258,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard _replicas_in_db.emplace_back(e.getBucketId(), copy.getNode()); if (!copy.valid()) { - _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy); + _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].emplace_back(copy); } else if (!copy.empty()) { - _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].push_back(copy); + _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].emplace_back(copy); } } } diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 1106968bcf7..57e878d9e40 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "newest_replica.h" #include <vespa/storageapi/defs.h> #include <vespa/storage/distributor/operations/operation.h> #include <vespa/storage/bucketdb/bucketdatabase.h> @@ -38,6 +39,9 @@ public: std::string getStatus() const override { return ""; } bool all_bucket_metadata_initially_consistent() const; + bool any_replicas_failed() const noexcept { + return _any_replicas_failed; + } // Exposed for unit testing. TODO feels a bit dirty :I const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; } @@ -50,6 +54,13 @@ public: return _desired_read_consistency; } + // Note: in the case the document could not be found on any replicas, but + // at least one node returned a non-error response, the returned value will + // have a timestamp of zero and the most recently asked node as its node. + const std::optional<NewestReplica>& newest_replica() const noexcept { + return _newest_replica; + } + private: class GroupId { public: @@ -66,20 +77,19 @@ private: int _node; }; - class BucketChecksumGroup { - public: - BucketChecksumGroup(const BucketCopy& c) : - copy(c), - sent(0), received(false), returnCode(api::ReturnCode::OK) + struct BucketChecksumGroup { + explicit BucketChecksumGroup(const BucketCopy& c) + : copy(c), sent(0), returnCode(api::ReturnCode::OK), to_node(UINT16_MAX), received(false) {} BucketCopy copy; api::StorageMessage::Id sent; - bool received; api::ReturnCode returnCode; + uint16_t to_node; + bool received; }; - typedef std::vector<BucketChecksumGroup> GroupVector; + using GroupVector = std::vector<BucketChecksumGroup>; // Organize the different copies by bucket/checksum pairs. We should // try to request GETs from each bucket and each different checksum @@ -94,13 +104,14 @@ private: api::ReturnCode _returnCode; std::shared_ptr<document::Document> _doc; - std::optional<api::Timestamp> _lastModified; + std::optional<NewestReplica> _newest_replica; PersistenceOperationMetricSet& _metric; framework::MilliSecTimer _operationTimer; std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db; api::InternalReadConsistency _desired_read_consistency; bool _has_replica_inconsistency; + bool _any_replicas_failed; void sendReply(DistributorMessageSender& sender); bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res); diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp new file mode 100644 index 00000000000..d615095ff63 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp @@ -0,0 +1,13 @@ +#include "newest_replica.h" +#include <ostream> + +namespace storage::distributor { + +std::ostream& operator<<(std::ostream& os, const NewestReplica& nr) { + os << "NewestReplica(timestamp " << nr.timestamp + << ", bucket_id " << nr.bucket_id + << ", node " << nr.node << ')'; + return os; +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.h b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h new file mode 100644 index 00000000000..86d5bee67d2 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h @@ -0,0 +1,38 @@ +#pragma once + +#include <vespa/document/bucket/bucketid.h> +#include <vespa/storageapi/defs.h> +#include <iosfwd> +#include <stddef.h> + +namespace storage::distributor { + +struct NewestReplica { + api::Timestamp timestamp {0}; + document::BucketId bucket_id; + uint16_t node {UINT16_MAX}; + + static NewestReplica of(api::Timestamp timestamp, + const document::BucketId& bucket_id, + uint16_t node) noexcept { + return {timestamp, bucket_id, node}; + } + + static NewestReplica make_empty() { + return {api::Timestamp(0), document::BucketId(), 0}; + } + + bool operator==(const NewestReplica& rhs) const noexcept { + return ((timestamp == rhs.timestamp) && + (bucket_id == rhs.bucket_id) && + (node == rhs.node)); + } + bool operator!=(const NewestReplica& rhs) const noexcept { + return !(*this == rhs); + } +}; + + +std::ostream& operator<<(std::ostream&, const NewestReplica&); + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 47cacafee80..42ccc3d31f3 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -36,6 +36,8 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( _sendState(SendState::NONE_SENT), _mode(Mode::FAST_PATH), _fast_path_repair_source_node(0xffff), + _use_initial_cheap_metadata_fetch_phase( + _manager.getDistributor().getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()), _replySent(false) { document::BucketIdFactory idFactory; @@ -92,10 +94,12 @@ const char* TwoPhaseUpdateOperation::stateToString(SendState state) { switch (state) { - case SendState::NONE_SENT: return "NONE_SENT"; - case SendState::UPDATES_SENT: return "UPDATES_SENT"; - case SendState::GETS_SENT: return "GETS_SENT"; - case SendState::PUTS_SENT: return "PUTS_SENT"; + case SendState::NONE_SENT: return "NONE_SENT"; + case SendState::UPDATES_SENT: return "UPDATES_SENT"; + case SendState::METADATA_GETS_SENT: return "METADATA_GETS_SENT"; + case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT"; + case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT"; + case SendState::PUTS_SENT: return "PUTS_SENT"; default: assert(!"Unknown state"); return ""; @@ -159,7 +163,7 @@ void TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) { _mode = Mode::FAST_PATH; - LOG(debug, "Update(%s) fast path: sending Update commands", _updateCmd->getDocumentId().toString().c_str()); + LOG(debug, "Update(%s) fast path: sending Update commands", update_doc_id().c_str()); auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric); UpdateOperation & op = *updateOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender); @@ -174,26 +178,50 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) void TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender) { - LOG(debug, "Update(%s) safe path: sending Get commands", _updateCmd->getDocumentId().toString().c_str()); - _mode = Mode::SLOW_PATH; - document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0)); - auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(),"[all]"); - copyMessageSettings(*_updateCmd, *get); - auto getOperation = std::make_shared<GetOperation>( - _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric); - GetOperation & op = *getOperation; - IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender); + auto get_operation = create_initial_safe_path_get_operation(); + GetOperation& op = *get_operation; + IntermediateMessageSender intermediate(_sentMessageMap, std::move(get_operation), sender); _replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time op.start(intermediate, _manager.getClock().getTimeInMillis()); - transitionTo(SendState::GETS_SENT); + + transitionTo(_use_initial_cheap_metadata_fetch_phase + ? SendState::METADATA_GETS_SENT + : SendState::FULL_GETS_SENT); if (intermediate._reply.get()) { assert(intermediate._reply->getType() == api::MessageType::GET_REPLY); + // We always trigger the safe path Get reply handling here regardless of whether + // metadata-only or full Gets were sent. This is because we might get an early + // reply due to there being no replicas in existence at all for the target bucket. + // In this case, we rely on the safe path fallback to implicitly create the bucket + // by performing the update locally and sending CreateBucket+Put to the ideal nodes. handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply)); } } +std::shared_ptr<GetOperation> +TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() { + document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0)); + const char* field_set = _use_initial_cheap_metadata_fetch_phase ? "[none]" : "[all]"; + auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), field_set); + copyMessageSettings(*_updateCmd, *get); + // Metadata-only Gets just look at the data in the meta-store, not any fields. + // The meta-store is always updated before any ACK is returned for a mutation, + // so all the information we need is guaranteed to be consistent even with a + // weak read. But since weak reads allow the Get operation to bypass commit + // queues, latency may be greatly reduced in contended situations. + auto read_consistency = (_use_initial_cheap_metadata_fetch_phase + ? api::InternalReadConsistency::Weak + : api::InternalReadConsistency::Strong); + LOG(debug, "Update(%s) safe path: sending Get commands with field set '%s' " + "and internal read consistency %s", + update_doc_id().c_str(), field_set, api::to_string(read_consistency)); + return std::make_shared<GetOperation>( + _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), + get, _getMetric, read_consistency); +} + void TwoPhaseUpdateOperation::onStart(DistributorMessageSender& sender) { if (isFastPathPossible()) { @@ -247,7 +275,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen transitionTo(SendState::PUTS_SENT); LOG(debug, "Update(%s): sending Put commands with doc %s", - _updateCmd->getDocumentId().toString().c_str(), doc->toString(true).c_str()); + update_doc_id().c_str(), doc->toString(true).c_str()); if (intermediate._reply.get()) { sendReplyWithResult(sender, intermediate._reply->getResult()); @@ -269,13 +297,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg) { if (msg->getType() == api::MessageType::GET_REPLY) { - assert(_sendState == SendState::GETS_SENT); - api::GetReply& getReply = static_cast<api::GetReply&> (*msg); + assert(_sendState == SendState::FULL_GETS_SENT); + auto& getReply = static_cast<api::GetReply&>(*msg); addTraceFromReply(getReply); - LOG(debug, "Update(%s) Get reply had result: %s", - _updateCmd->getDocumentId().toString().c_str(), - getReply.getResult().toString().c_str()); + LOG(debug, "Update(%s) fast path: Get reply had result %s", + update_doc_id().c_str(), getReply.getResult().toString().c_str()); if (!getReply.getResult().success()) { sendReplyWithResult(sender, getReply.getResult()); @@ -310,7 +337,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, // Failed or was consistent sendReply(sender, intermediate._reply); } else { - LOG(debug, "Update(%s) fast path: was inconsistent!", _updateCmd->getDocumentId().toString().c_str()); + LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str()); _updateReply = intermediate._reply; _fast_path_repair_source_node = bestNode.second; @@ -319,7 +346,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, copyMessageSettings(*_updateCmd, *cmd); sender.sendToNode(lib::NodeType::STORAGE, _fast_path_repair_source_node, cmd); - transitionTo(SendState::GETS_SENT); + transitionTo(SendState::FULL_GETS_SENT); } } } else { @@ -328,7 +355,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, addTraceFromReply(*intermediate._reply); sendReplyWithResult(sender, intermediate._reply->getResult()); LOG(warning, "Forced convergence of '%s' using document from node %u", - _updateCmd->getDocumentId().toString().c_str(), _fast_path_repair_source_node); + update_doc_id().c_str(), _fast_path_repair_source_node); } } } @@ -337,6 +364,15 @@ void TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg) { + // No explicit operation is associated with the direct replica Get operation, + // so we handle its reply separately. + if (_sendState == SendState::SINGLE_GET_SENT) { + assert(msg->getType() == api::MessageType::GET_REPLY); + LOG(spam, "Received single full Get reply for '%s'", update_doc_id().c_str()); + addTraceFromReply(*msg); + handleSafePathReceivedGet(sender, dynamic_cast<api::GetReply&>(*msg)); + return; + } std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId()); assert(callback.get()); Operation & callbackOp = *callback; @@ -348,7 +384,12 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender, return; // Not enough replies received yet or we're draining callbacks. } addTraceFromReply(*intermediate._reply); - if (_sendState == SendState::GETS_SENT) { + if (_sendState == SendState::METADATA_GETS_SENT) { + assert(intermediate._reply->getType() == api::MessageType::GET_REPLY); + const auto& get_op = dynamic_cast<const GetOperation&>(*intermediate.callback); + handle_safe_path_received_metadata_get(sender, static_cast<api::GetReply&>(*intermediate._reply), + get_op.newest_replica(), get_op.any_replicas_failed()); + } else if (_sendState == SendState::FULL_GETS_SENT) { assert(intermediate._reply->getType() == api::MessageType::GET_REPLY); handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply)); } else if (_sendState == SendState::PUTS_SENT) { @@ -359,6 +400,60 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender, } } +void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get( + DistributorMessageSender& sender, api::GetReply& reply, + const std::optional<NewestReplica>& newest_replica, + bool any_replicas_failed) +{ + LOG(debug, "Update(%s): got (metadata only) Get reply with result %s", + update_doc_id().c_str(), reply.getResult().toString().c_str()); + + if (!reply.getResult().success()) { + sendReplyWithResult(sender, reply.getResult()); + return; + } + // It's possible for a single replica to fail during processing without the entire + // Get operation failing. Although we know a priori if replicas are out of sync, + // we don't know which one has the highest timestamp (it might have been the one + // on the node that the metadata Get just failed towards). To err on the side of + // caution we abort the update if this happens. If a simple metadata Get fails, it + // is highly likely that a full partial update or put operation would fail as well. + if (any_replicas_failed) { + LOG(debug, "Update(%s): had failed replicas, aborting update", update_doc_id().c_str()); + sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::Result::ABORTED, + "One or more metadata Get operations failed; aborting Update")); + return; + } + if (!replica_set_unchanged_after_get_operation()) { + // Use BUCKET_NOT_FOUND to trigger a silent retry. + LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str()); + sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::Result::BUCKET_NOT_FOUND, + "Replica sets changed between update phases, client must retry")); + return; + } + if (reply.had_consistent_replicas()) { + LOG(debug, "Update(%s): metadata Gets consistent; restarting in fast path", update_doc_id().c_str()); + restart_with_fast_path_due_to_consistent_get_timestamps(sender); + return; + } + // If we've gotten here, we must have had no Get failures and replicas must + // be somehow inconsistent. Replicas can only be inconsistent if their timestamps + // mismatch, so we must have observed at least one non-zero timestamp. + assert(newest_replica.has_value() && (newest_replica->timestamp != api::Timestamp(0))); + // Timestamps were not in sync, so we have to fetch the document from the highest + // timestamped replica, apply the update to it and then explicitly Put the result + // to all replicas. + document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_replica->bucket_id); + LOG(debug, "Update(%s): sending single payload Get to %s on node %u (had timestamp %" PRIu64 ")", + update_doc_id().c_str(), bucket.toString().c_str(), + newest_replica->node, newest_replica->timestamp); + auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), "[all]"); + copyMessageSettings(*_updateCmd, *cmd); + sender.sendToNode(lib::NodeType::STORAGE, newest_replica->node, cmd); + + transitionTo(SendState::SINGLE_GET_SENT); +} + void TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sender, api::GetReply& reply) { @@ -395,7 +490,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen return; } else if (shouldCreateIfNonExistent()) { LOG(debug, "No existing documents found for %s, creating blank document to update", - _updateCmd->getUpdate()->getId().toString().c_str()); + update_doc_id().c_str()); docToUpdate = createBlankDocument(); setUpdatedForTimestamp(putTimestamp); } else { @@ -412,7 +507,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) { return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() && - reply.wasFound() && + !_replicas_at_get_send_time.empty() && // To ensure we send CreateBucket+Put if no replicas exist. reply.had_consistent_replicas() && replica_set_unchanged_after_get_operation()); } @@ -434,12 +529,15 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) { LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode", - _updateCmd->getDocumentId().toString().c_str()); + update_doc_id().c_str()); if (lostBucketOwnershipBetweenPhases()) { sendLostOwnershipTransientErrorReply(sender); return; } _updateMetric.fast_path_restarts.inc(); + // Must not be any other messages in flight, or we might mis-interpret them when we + // have switched back to fast-path mode. + assert(_sentMessageMap.empty()); startFastPathUpdate(sender); } @@ -553,4 +651,9 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) { } } +vespalib::string TwoPhaseUpdateOperation::update_doc_id() const { + assert(_updateCmd.get() != nullptr); + return _updateCmd->getDocumentId().toString(); +} + } diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index f9e32c31a9c..f714232b427 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -1,11 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <set> +#include "newest_replica.h" #include <vespa/storageapi/messageapi/returncode.h> #include <vespa/storage/distributor/persistencemessagetracker.h> #include <vespa/storage/distributor/operations/sequenced_operation.h> #include <vespa/document/update/documentupdate.h> +#include <set> namespace document { class Document; @@ -23,6 +24,7 @@ class UpdateMetricSet; namespace distributor { class DistributorBucketSpace; +class GetOperation; /* * General functional outline: @@ -45,6 +47,7 @@ class DistributorBucketSpace; * * Note that the above case also implicitly handles the case in which a * bucket does not exist. + * */ @@ -56,7 +59,7 @@ public: const std::shared_ptr<api::UpdateCommand> & msg, DistributorMetricSet& metrics, SequencingHandle sequencingHandle = SequencingHandle()); - ~TwoPhaseUpdateOperation(); + ~TwoPhaseUpdateOperation() override; void onStart(DistributorMessageSender& sender) override; @@ -69,13 +72,13 @@ public: void onClose(DistributorMessageSender& sender) override; - bool canSendHeaderOnly() const; - private: enum class SendState { NONE_SENT, UPDATES_SENT, - GETS_SENT, + METADATA_GETS_SENT, + SINGLE_GET_SENT, + FULL_GETS_SENT, PUTS_SENT, }; @@ -85,7 +88,7 @@ private: }; void transitionTo(SendState newState); - const char* stateToString(SendState); + static const char* stateToString(SendState); void sendReply(DistributorMessageSender&, std::shared_ptr<api::StorageReply>&); @@ -108,10 +111,13 @@ private: const std::shared_ptr<api::StorageReply>&); void handleSafePathReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>&); - void handleSafePathReceivedGet(DistributorMessageSender&, - api::GetReply&); - void handleSafePathReceivedPut(DistributorMessageSender&, - const api::PutReply&); + std::shared_ptr<GetOperation> create_initial_safe_path_get_operation(); + void handle_safe_path_received_metadata_get(DistributorMessageSender&, + api::GetReply&, + const std::optional<NewestReplica>&, + bool any_replicas_failed); + void handleSafePathReceivedGet(DistributorMessageSender&, api::GetReply&); + void handleSafePathReceivedPut(DistributorMessageSender&, const api::PutReply&); bool shouldCreateIfNonExistent() const; bool processAndMatchTasCondition( DistributorMessageSender& sender, @@ -124,6 +130,8 @@ private: bool may_restart_with_fast_path(const api::GetReply& reply); bool replica_set_unchanged_after_get_operation() const; void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender); + // Precondition: reply has not yet been sent. + vespalib::string update_doc_id() const; UpdateMetricSet& _updateMetric; PersistenceOperationMetricSet& _putMetric; @@ -139,6 +147,7 @@ private: document::BucketId _updateDocBucketId; std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time; uint16_t _fast_path_repair_source_node; + bool _use_initial_cheap_metadata_fetch_phase; bool _replySent; }; diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h index 3364c6606eb..405e59f7b31 100644 --- a/storage/src/vespa/storage/distributor/sentmessagemap.h +++ b/storage/src/vespa/storage/distributor/sentmessagemap.h @@ -16,26 +16,18 @@ class SentMessageMap { public: SentMessageMap(); - ~SentMessageMap(); std::shared_ptr<Operation> pop(api::StorageMessage::Id id); - std::shared_ptr<Operation> pop(); void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg); - void clear(); - uint32_t size() const { return _map.size(); } - - uint32_t empty() const { return _map.empty(); } - + [[nodiscard]] bool empty() const noexcept { return _map.empty(); } std::string toString() const; - private: - typedef std::map<api::StorageMessage::Id, std::shared_ptr<Operation> > Map; - + using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>; Map _map; }; |