summaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-15 12:31:55 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-15 12:31:55 +0000
commit2af589636d76af4ad3d26eaf199ddc4ce8cf011e (patch)
treeed3b59977eb7181bd756a42c909e32d7fe207fca /storage/src/tests
parentbf057fb22f9c917d616031a0cd32597b315bb803 (diff)
Use fast updates when replica metadata is out of sync but document itself is in sync
When a bucket has replicas with mismatching metadata (i.e. they are out of sync), the distributor will initiate a write-repair for updates to avoid divergence of replica content. This is done by first sending a Get to all diverging replica sets, picking the highest timestamp and applying the update locally. The updated document is then sent out as a Put. This can be very expensive if document Put operations are disproportionally more expensive than partial updates, and also makes the distributor thread part of a contended critical path. This commit lets `TwoPhaseUpdateOperation` restart an update as a "fast path" update (partial updates sent directly to the nodes) if the initial read phase returns the same timestamp for the document across all replicas. It also removes an old (but now presumed unsafe) optimization where Get operations are only sent to replicas marked "trusted" even if others are out of sync with it. Since trustedness is a transient state that does not persist across restarts or bucket handoffs, it's not robust enough to be used for such purposes. Gets will now be sent to all out of sync replica groups regardless of trusted status.
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp17
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp47
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp85
3 files changed, 135 insertions, 14 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index e6c617a32e4..94d33e50047 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -174,6 +174,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_update_fast_path_restart_enabled(bool enabled) {
+ ConfigBuilder builder;
+ builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -1001,6 +1007,17 @@ TEST_F(DistributorTest, stale_reads_config_is_propagated_to_external_operation_h
EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled());
}
+TEST_F(DistributorTest, fast_path_on_consistent_gets_config_is_propagated_to_internal_config) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_update_fast_path_restart_enabled(true);
+ EXPECT_TRUE(getConfig().update_fast_path_restart_enabled());
+
+ configure_update_fast_path_restart_enabled(false);
+ EXPECT_FALSE(getConfig().update_fast_path_restart_enabled());
+}
+
TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) {
createLinks(false);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 99d7c12551d..72a124d45b4 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -117,6 +117,13 @@ struct GetOperationTest : Test, DistributorTestUtil {
}
}
+ bool last_reply_had_consistent_replicas() {
+ assert(!_sender.replies().empty());
+ auto& msg = *_sender.replies().back();
+ assert(msg.getType() == api::MessageType::GET_REPLY);
+ return dynamic_cast<api::GetReply&>(msg).had_consistent_replicas();
+ }
+
void setClusterState(const std::string& clusterState) {
enableDistributorClusterState(clusterState);
}
@@ -139,22 +146,25 @@ TEST_F(GetOperationTest, simple) {
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
-TEST_F(GetOperationTest, ask_trusted_node_if_bucket_is_inconsistent) {
+TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) {
setClusterState("distributor:1 storage:4");
addNodesToBucketDB(bucketId, "0=100/3/10,1=200/4/12/t");
sendGet();
- ASSERT_EQ("Get => 1", _sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
- ASSERT_NO_FATAL_FAILURE(replyWithDocument());
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "newauthor", 2));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "oldauthor", 1));
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
- "timestamp 100) ReturnCode(NONE)",
+ "timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
@@ -174,6 +184,7 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, send_to_all_invalid_copies) {
@@ -193,6 +204,7 @@ TEST_F(GetOperationTest, send_to_all_invalid_copies) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, send_to_all_invalid_nodes_when_inconsistent) {
@@ -235,6 +247,7 @@ TEST_F(GetOperationTest, inconsistent_split) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
@@ -252,6 +265,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
@@ -271,6 +285,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket) {
@@ -290,6 +305,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
@@ -312,6 +328,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
@@ -322,6 +339,7 @@ TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas()); // Nothing in the bucket, so nothing to be inconsistent with.
}
TEST_F(GetOperationTest, not_found) {
@@ -340,8 +358,9 @@ TEST_F(GetOperationTest, not_found) {
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
- EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
+ EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
failures.notfound.getValue());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, resend_on_storage_failure) {
@@ -366,6 +385,21 @@ TEST_F(GetOperationTest, resend_on_storage_failure) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ // Replica had read failure, but they're still in sync. An immutable Get won't change that fact.
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
+}
+
+TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) {
+ setClusterState("distributor:1 storage:3");
+ addNodesToBucketDB(bucketId, "1=100,2=200");
+ sendGet();
+ ASSERT_EQ("Get => 1,Get => 2", _sender.getCommands(true));
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::TIMEOUT, "", 0));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "newestauthor", 3));
+ ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
+ "timestamp 3) ReturnCode(NONE)",
+ _sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
@@ -390,6 +424,7 @@ TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(IO_FAILURE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas()); // Doesn't really matter since operation itself failed
}
TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
@@ -408,6 +443,7 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
@@ -435,6 +471,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
_sender.getLastReply());
EXPECT_EQ("newestauthor", getLastReplyAuthor());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index df9bf683326..67ef3374633 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -126,6 +126,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
void assertAbortedUpdateReplyWithContextPresent(
const DistributorMessageSenderStub& closeSender) const;
+ void do_test_ownership_changed_between_gets_and_second_phase(Timestamp lowest_get_timestamp,
+ Timestamp highest_get_timestamp,
+ Timestamp expected_response_timestamp);
+
};
TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default;
@@ -142,7 +146,9 @@ TwoPhaseUpdateOperationTest::replyToMessage(
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& updatec = dynamic_cast<UpdateCommand&>(*msg2);
std::unique_ptr<api::StorageReply> reply(updatec.makeReply());
- static_cast<api::UpdateReply*>(reply.get())->setOldTimestamp(oldTimestamp);
+ auto& update_reply = dynamic_cast<api::UpdateReply&>(*reply);
+ update_reply.setOldTimestamp(oldTimestamp);
+ update_reply.setBucketInfo(api::BucketInfo(0x123, 1, 100)); // Dummy info to avoid invalid info being returned
reply->setResult(api::ReturnCode(result, ""));
callback.receive(sender,
@@ -193,7 +199,7 @@ TwoPhaseUpdateOperationTest::replyToGet(
api::ReturnCode::Result result,
const std::string& traceMsg)
{
- auto& get = static_cast<const api::GetCommand&>(*sender.command(index));
+ auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index));
std::shared_ptr<api::StorageReply> reply;
if (haveDocument) {
@@ -786,7 +792,11 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replie
ASSERT_THAT(trace, HasSubstr("baaa"));
}
-TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
+void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_second_phase(
+ Timestamp lowest_get_timestamp,
+ Timestamp highest_get_timestamp,
+ Timestamp expected_response_timestamp)
+{
setupDistributor(2, 2, "storage:2 distributor:1");
// Update towards inconsistent bucket invokes safe path.
@@ -803,21 +813,31 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
// to a bucket we no longer own.
enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
getBucketDatabase().clear();
- replyToGet(*cb, sender, 0, 70);
- replyToGet(*cb, sender, 1, 70);
-
+ replyToGet(*cb, sender, 0, lowest_get_timestamp);
+ replyToGet(*cb, sender, 1, highest_get_timestamp);
+
// BUCKET_NOT_FOUND is a transient error code which should cause the client
// to re-send the operation, presumably to the correct distributor the next
// time.
+ // Timestamp of updated doc varies depending on whether fast or safe path
+ // was triggered, as the reply is created via different paths.
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
- "timestamp 0, timestamp of updated doc: 70) "
+ "timestamp 0, timestamp of updated doc: " + std::to_string(expected_response_timestamp) + ") "
"ReturnCode(BUCKET_NOT_FOUND, Distributor lost "
- "ownership of bucket between executing the read "
- "and write phases of a two-phase update operation)",
+ "ownership of bucket between executing the read "
+ "and write phases of a two-phase update operation)",
sender.getLastReply(true));
}
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
+ do_test_ownership_changed_between_gets_and_second_phase(70, 71, 71);
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) {
+ do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart
+}
+
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
std::shared_ptr<TwoPhaseUpdateOperation> cb(
@@ -977,6 +997,53 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
assertAbortedUpdateReplyWithContextPresent(closeSender);
}
+TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_restarts_with_fast_path_if_enabled) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(true);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ replyToGet(*cb, sender, 0, old_timestamp);
+ replyToGet(*cb, sender, 1, old_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2));
+ replyToMessage(*cb, sender, 2, old_timestamp);
+ replyToMessage(*cb, sender, 3, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 500) "
+ "ReturnCode(NONE)",
+ sender.getLastReply(true));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_does_not_restart_with_fast_path_if_disabled) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(false);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ replyToGet(*cb, sender, 0, old_timestamp);
+ replyToGet(*cb, sender, 1, old_timestamp);
+
+ // Should _not_ be restarted with fast path, as it has been config disabled
+ ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.