diff options
Diffstat (limited to 'storage/src/tests/distributor/twophaseupdateoperationtest.cpp')
-rw-r--r-- | storage/src/tests/distributor/twophaseupdateoperationtest.cpp | 85 |
1 files changed, 76 insertions, 9 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index df9bf683326..67ef3374633 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -126,6 +126,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil { void assertAbortedUpdateReplyWithContextPresent( const DistributorMessageSenderStub& closeSender) const; + void do_test_ownership_changed_between_gets_and_second_phase(Timestamp lowest_get_timestamp, + Timestamp highest_get_timestamp, + Timestamp expected_response_timestamp); + }; TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default; @@ -142,7 +146,9 @@ TwoPhaseUpdateOperationTest::replyToMessage( std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& updatec = dynamic_cast<UpdateCommand&>(*msg2); std::unique_ptr<api::StorageReply> reply(updatec.makeReply()); - static_cast<api::UpdateReply*>(reply.get())->setOldTimestamp(oldTimestamp); + auto& update_reply = dynamic_cast<api::UpdateReply&>(*reply); + update_reply.setOldTimestamp(oldTimestamp); + update_reply.setBucketInfo(api::BucketInfo(0x123, 1, 100)); // Dummy info to avoid invalid info being returned reply->setResult(api::ReturnCode(result, "")); callback.receive(sender, @@ -193,7 +199,7 @@ TwoPhaseUpdateOperationTest::replyToGet( api::ReturnCode::Result result, const std::string& traceMsg) { - auto& get = static_cast<const api::GetCommand&>(*sender.command(index)); + auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index)); std::shared_ptr<api::StorageReply> reply; if (haveDocument) { @@ -786,7 +792,11 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replie ASSERT_THAT(trace, HasSubstr("baaa")); } -TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) { +void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_second_phase( + Timestamp lowest_get_timestamp, + Timestamp highest_get_timestamp, + Timestamp expected_response_timestamp) +{ setupDistributor(2, 2, "storage:2 distributor:1"); // Update towards inconsistent bucket invokes safe path. @@ -803,21 +813,31 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge // to a bucket we no longer own. enableDistributorClusterState("storage:2 distributor:1 .0.s:d"); getBucketDatabase().clear(); - replyToGet(*cb, sender, 0, 70); - replyToGet(*cb, sender, 1, 70); - + replyToGet(*cb, sender, 0, lowest_get_timestamp); + replyToGet(*cb, sender, 1, highest_get_timestamp); + // BUCKET_NOT_FOUND is a transient error code which should cause the client // to re-send the operation, presumably to the correct distributor the next // time. + // Timestamp of updated doc varies depending on whether fast or safe path + // was triggered, as the reply is created via different paths. EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " - "timestamp 0, timestamp of updated doc: 70) " + "timestamp 0, timestamp of updated doc: " + std::to_string(expected_response_timestamp) + ") " "ReturnCode(BUCKET_NOT_FOUND, Distributor lost " - "ownership of bucket between executing the read " - "and write phases of a two-phase update operation)", + "ownership of bucket between executing the read " + "and write phases of a two-phase update operation)", sender.getLastReply(true)); } +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) { + do_test_ownership_changed_between_gets_and_second_phase(70, 71, 71); +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) { + do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart +} + TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) { setupDistributor(2, 2, "storage:2 distributor:1"); std::shared_ptr<TwoPhaseUpdateOperation> cb( @@ -977,6 +997,53 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) { assertAbortedUpdateReplyWithContextPresent(closeSender); } +TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_restarts_with_fast_path_if_enabled) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_update_fast_path_restart_enabled(true); + + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + Timestamp old_timestamp = 500; + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + replyToGet(*cb, sender, 0, old_timestamp); + replyToGet(*cb, sender, 1, old_timestamp); + + ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2)); + replyToMessage(*cb, sender, 2, old_timestamp); + replyToMessage(*cb, sender, 3, old_timestamp); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 500) " + "ReturnCode(NONE)", + sender.getLastReply(true)); + + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + EXPECT_EQ(1, metrics.fast_path_restarts.getValue()); +} + +TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_does_not_restart_with_fast_path_if_disabled) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_update_fast_path_restart_enabled(false); + + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + Timestamp old_timestamp = 500; + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + replyToGet(*cb, sender, 0, old_timestamp); + replyToGet(*cb, sender, 1, old_timestamp); + + // Should _not_ be restarted with fast path, as it has been config disabled + ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); + + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + EXPECT_EQ(0, metrics.fast_path_restarts.getValue()); +} + // XXX currently differs in behavior from content nodes in that updates for // document IDs without explicit doctypes will _not_ be auto-failed on the // distributor. |