summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-12 16:09:08 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-16 15:14:03 +0000
commiteb28962cb8edfdc3ad75fc9298a71bf31ab96af2 (patch)
treed5410917f69566acffbe494623b36d608ed6b0a5 /storage/src/tests/distributor/twophaseupdateoperationtest.cpp
parent7ec5934200cdffb5e17a083b6f56e9b4ae37246f (diff)
Add initial metadata-only phase to inconsistent update handling
If bucket replicas are inconsistent, the common case is that only a small subset of documents contained in the buckets are actually mutually out of sync. The added metadata phase optimizes for such a case by initially sending Get requests to all divergent replicas that only ask for the timestamps (and no fields). This is a very cheap and fast operation. If all returned timestamps are in sync, the update can be restarted in the fast path. Otherwise, a full Get will _only_ be sent to the newest replica, and its result will be used for performing the update on the distributor itself, before pushing out the result as Puts. This is in contrast to today's behavior where full Gets are sent to all replicas. For users with large documents this can be very expensive. In addition, the metadata Get operations are sent with weak internal read consistency (as they do not need to read any previously written, possibly in-flight fields). This lets them bypass the main commit queue entirely.
Diffstat (limited to 'storage/src/tests/distributor/twophaseupdateoperationtest.cpp')
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp853
1 files changed, 514 insertions, 339 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index f1900c40d56..82466ec5ed7 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -32,9 +32,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
document::TestDocRepo _testRepo;
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _doc_type;
+ DistributorMessageSenderStub _sender;
TwoPhaseUpdateOperationTest();
- ~TwoPhaseUpdateOperationTest();
+ ~TwoPhaseUpdateOperationTest() override;
void checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const;
@@ -81,6 +82,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
+ void reply_to_metadata_get(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t old_timestamp,
+ api::ReturnCode::Result result = api::ReturnCode::OK,
+ const std::string& trace_msg = "");
+
struct UpdateOptions {
bool _makeInconsistentSplit;
bool _createIfNonExistent;
@@ -130,6 +139,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
Timestamp highest_get_timestamp,
Timestamp expected_response_timestamp);
+ std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
+ return cb;
+ }
+
};
TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default;
@@ -218,6 +235,24 @@ TwoPhaseUpdateOperationTest::replyToGet(
callback.receive(sender, reply);
}
+void
+TwoPhaseUpdateOperationTest::reply_to_metadata_get(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t old_timestamp,
+ api::ReturnCode::Result result,
+ const std::string& trace_msg)
+{
+ auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index));
+ auto reply = std::make_shared<api::GetReply>(get, std::shared_ptr<Document>(), old_timestamp);
+ reply->setResult(api::ReturnCode(result, ""));
+ if (!trace_msg.empty()) {
+ MBUS_TRACE(reply->getTrace(), 1, trace_msg);
+ }
+ callback.receive(sender, reply);
+}
+
namespace {
struct DummyTransportContext : api::TransportContext {
@@ -281,234 +316,212 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
TEST_F(TwoPhaseUpdateOperationTest, simple) {
setupDistributor(1, 1, "storage:1 distributor:1");
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
+ replyToMessage(*cb, _sender, 0, 90);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
setupDistributor(1, 1, "storage:1 distributor:1");
-
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate(""));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("");
+ cb->start(_sender, framework::MilliSecTime(0));
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
setupDistributor(1, 1, "storage:1 distributor:1");
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE);
+ replyToMessage(*cb, _sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(INTERNAL_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
- ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true));
+ ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true));
- replyToGet(*cb, sender, 2, 110);
+ replyToGet(*cb, _sender, 2, 110);
- ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", _sender.getCommands(true));
- ASSERT_TRUE(sender.replies().empty());
+ ASSERT_TRUE(_sender.replies().empty());
- replyToPut(*cb, sender, 3);
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 3);
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
- ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true));
- ASSERT_TRUE(sender.replies().empty());
+ ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true));
+ ASSERT_TRUE(_sender.replies().empty());
- replyToGet(*cb, sender, 2, 110, false);
+ replyToGet(*cb, _sender, 2, 110, false);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(INTERNAL_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90);
- ASSERT_TRUE(sender.replies().empty());
- replyToMessage(*cb, sender, 1, 110, api::ReturnCode::IO_FAILURE);
+ replyToMessage(*cb, _sender, 0, 90);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToMessage(*cb, _sender, 1, 110, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
+ _sender.getLastCommand(true));
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 2, 110, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 2, 110, false, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
+ _sender.getLastCommand(true));
- replyToGet(*cb, sender, 2, 110);
+ replyToGet(*cb, _sender, 2, 110);
ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0",
- sender.getCommands(true));
+ _sender.getCommands(true));
- replyToPut(*cb, sender, 3, api::ReturnCode::IO_FAILURE);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 3, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_started) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
- checkMessageSettingsPropagatedTo(sender.commands().back());
+ _sender.getLastCommand(true));
+ checkMessageSettingsPropagatedTo(_sender.commands().back());
enableDistributorClusterState("storage:0 distributor:1");
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 2, 110);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 2, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(NOT_CONNECTED, "
"Can't store document: No storage nodes available)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsistent_split) {
setupDistributor(2, 2, "storage:2 distributor:1");
-
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3",
- UpdateOptions().makeInconsistentSplit(true)));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3", UpdateOptions().makeInconsistentSplit(true));
+ cb->start(_sender, framework::MilliSecTime(0));
std::string wanted("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x440000000000cac4), id:ns:testdoctype1::1) => 0");
- std::string text = sender.getCommands(true, true);
+ std::string text = _sender.getCommands(true, true);
ASSERT_EQ(wanted, text);
- replyToGet(*cb, sender, 0, 90);
- replyToGet(*cb, sender, 1, 120);
+ replyToGet(*cb, _sender, 0, 90);
+ replyToGet(*cb, _sender, 1, 120);
ASSERT_EQ("Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
- replyToPut(*cb, sender, 2);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 3);
+ replyToPut(*cb, _sender, 2);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 3);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 120) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
void
@@ -523,33 +536,30 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_update) {
setupDistributor(1, 1, "storage:1 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- StorageCommand::SP msg(sender.commands().back());
+ StorageCommand::SP msg(_sender.commands().back());
checkMessageSettingsPropagatedTo(msg);
}
TEST_F(TwoPhaseUpdateOperationTest, n_of_m) {
setupDistributor(2, 2, "storage:2 distributor:1", 1);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
-
- ASSERT_TRUE(sender.replies().empty());
- replyToMessage(*cb, sender, 0, 90);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToMessage(*cb, _sender, 0, 90);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
- replyToMessage(*cb, sender, 1, 123);
+ replyToMessage(*cb, _sender, 1, 123);
}
std::string
@@ -565,132 +575,114 @@ TwoPhaseUpdateOperationTest::getUpdatedValueFromLastPut(
TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document) {
setupDistributor(3, 3, "storage:3 distributor:1");
// 0,1 in sync. 2 out of sync.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 2",
- sender.getCommands(true, true));
- replyToGet(*cb, sender, 0, 50);
- replyToGet(*cb, sender, 1, 70);
+ _sender.getCommands(true, true));
+ replyToGet(*cb, _sender, 0, 50);
+ replyToGet(*cb, _sender, 1, 70);
ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
// Make sure Put contains an updated document (+10 arith. update on field
// whose value equals gotten timestamp). In this case we want 70 -> 80.
- ASSERT_EQ("80", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("80", getUpdatedValueFromLastPut(_sender));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 70) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_all_empty_gets) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false);
- replyToGet(*cb, sender, 1, 0, false);
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false);
+ replyToGet(*cb, _sender, 1, 0, false);
// Since create-if-non-existent is set, distributor should create doc from
// scratch.
ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
- ASSERT_EQ("10", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false);
- replyToGet(*cb, sender, 1, 0, false);
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false);
+ replyToGet(*cb, _sender, 1, 0, false);
// Since create-if-non-existent is set, distributor should create doc from
// scratch.
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4, api::ReturnCode::IO_FAILURE);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 0, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 0, false, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setupDistributor(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError()));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError());
+ cb->start(_sender, framework::MilliSecTime(0));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 50);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 70);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 50);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 70);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
@@ -698,95 +690,88 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
"ReturnCode(INTERNAL_FAILURE, Can not apply a "
"\"testdoctype2\" document update to a "
"\"testdoctype1\" document.)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) {
setupDistributor(1, 1, "storage:1 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("", UpdateOptions().createIfNonExistent(true)));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
"Reasons to start: => 0,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true));
+ _sender.getCommands(true, true));
- ASSERT_EQ("10", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender));
- replyToCreateBucket(*cb, sender, 0);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 1);
+ replyToCreateBucket(*cb, _sender, 0);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 1);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_timestamp_constraint) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4",
- UpdateOptions().timestampToUpdate(1234)));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().timestampToUpdate(1234));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 100);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 110);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 100);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(NONE, No document with requested "
"timestamp found)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings_to_gets_and_puts) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- checkMessageSettingsPropagatedTo(sender.command(0));
- checkMessageSettingsPropagatedTo(sender.command(1));
- replyToGet(*cb, sender, 0, 50);
- replyToGet(*cb, sender, 1, 70);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
- checkMessageSettingsPropagatedTo(sender.command(2));
- checkMessageSettingsPropagatedTo(sender.command(3));
- checkMessageSettingsPropagatedTo(sender.command(4));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- replyToPut(*cb, sender, 4);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ checkMessageSettingsPropagatedTo(_sender.command(0));
+ checkMessageSettingsPropagatedTo(_sender.command(1));
+ replyToGet(*cb, _sender, 0, 50);
+ replyToGet(*cb, _sender, 1, 70);
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
+ checkMessageSettingsPropagatedTo(_sender.command(2));
+ checkMessageSettingsPropagatedTo(_sender.command(3));
+ checkMessageSettingsPropagatedTo(_sender.command(4));
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ replyToPut(*cb, _sender, 4);
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replies) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
- replyToGet(*cb, sender, 1, 70);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
- replyToPut(*cb, sender, 2, api::ReturnCode::OK, "fooo");
- replyToPut(*cb, sender, 3, api::ReturnCode::OK, "baaa");
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
+ replyToGet(*cb, _sender, 1, 70);
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
+ replyToPut(*cb, _sender, 2, api::ReturnCode::OK, "fooo");
+ replyToPut(*cb, _sender, 3, api::ReturnCode::OK, "baaa");
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
- ASSERT_EQ("Update Reply", sender.getLastReply(false));
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
- std::string trace(sender.replies().back()->getTrace().toString());
+ std::string trace(_sender.replies().back()->getTrace().toString());
ASSERT_THAT(trace, HasSubstr("hello earthlings"));
ASSERT_THAT(trace, HasSubstr("fooo"));
ASSERT_THAT(trace, HasSubstr("baaa"));
@@ -798,13 +783,11 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
Timestamp expected_response_timestamp)
{
setupDistributor(2, 2, "storage:2 distributor:1");
-
// Update towards inconsistent bucket invokes safe path.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Alter cluster state so that distributor is now down (technically the
// entire cluster is down in this state, but this should not matter). In
@@ -813,8 +796,8 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
// to a bucket we no longer own.
enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
getBucketDatabase().clear();
- replyToGet(*cb, sender, 0, lowest_get_timestamp);
- replyToGet(*cb, sender, 1, highest_get_timestamp);
+ replyToGet(*cb, _sender, 0, lowest_get_timestamp);
+ replyToGet(*cb, _sender, 1, highest_get_timestamp);
// BUCKET_NOT_FOUND is a transient error code which should cause the client
// to re-send the operation, presumably to the correct distributor the next
@@ -827,7 +810,7 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
"ReturnCode(BUCKET_NOT_FOUND, Distributor lost "
"ownership of bucket between executing the read "
"and write phases of a two-phase update operation)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
@@ -843,46 +826,37 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==120")));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
// Newest doc has headerval==110, not 120.
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Condition did not match document)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_updated_doc) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==110")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==110"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with_illegal_params_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.san==fran...cisco")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.san==fran...cisco"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
// replied to. This may change in the future.
// XXX reliance on parser/exception error message is very fragile.
@@ -893,19 +867,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with
"Failed to parse test and set condition: "
"syntax error, unexpected . at column 24 when "
"parsing selection 'testdoctype1.san==fran...cisco')",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_with_illegal_params_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "langbein.headerval=1234")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("langbein.headerval=1234"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
// replied to. This may change in the future.
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
@@ -915,40 +886,35 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_w
"Failed to parse test and set condition: "
"Document type 'langbein' not found at column 1 "
"when parsing selection 'langbein.headerval=1234')",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_auto_create_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==120")));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
// Both Gets return nothing at all, nothing at all.
- replyToGet(*cb, sender, 0, 100, false);
- replyToGet(*cb, sender, 1, 110, false);
+ replyToGet(*cb, _sender, 0, 100, false);
+ replyToGet(*cb, _sender, 1, 110, false);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Document did not exist)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_auto_create_sends_puts) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions()
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions()
.condition("testdoctype1.headerval==120")
- .createIfNonExistent(true)));
+ .createIfNonExistent(true));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100, false);
- replyToGet(*cb, sender, 1, 110, false);
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100, false);
+ replyToGet(*cb, _sender, 1, 110, false);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
}
void
@@ -966,11 +932,10 @@ TwoPhaseUpdateOperationTest::assertAbortedUpdateReplyWithContextPresent(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
setupDistributor(1, 1, "storage:1 distributor:1");
// Only 1 replica; consistent with itself by definition.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
// Close the operation. This should generate a single reply that is
// bound to the original command. We can identify rogue replies by these
// not having a transport context, as these are unique_ptrs that are
@@ -985,11 +950,10 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Closing the operation should now only return an ABORTED reply for
// the UpdateCommand, _not_ from the nested, pending Get operation (which
// will implicitly generate an ABORTED reply for the synthesized Get
@@ -1004,24 +968,23 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_re
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2));
- replyToMessage(*cb, sender, 2, old_timestamp);
- replyToMessage(*cb, sender, 3, old_timestamp);
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ replyToMessage(*cb, _sender, 2, old_timestamp);
+ replyToMessage(*cb, _sender, 3, old_timestamp);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 500) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
@@ -1031,17 +994,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(false);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
// Should _not_ be restarted with fast path, as it has been config disabled
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
@@ -1051,9 +1013,8 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
setupDistributor(3, 3, "storage:3 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
@@ -1065,27 +1026,38 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_found_on_a_replica_node) {
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, Timestamp(0), false);
- replyToGet(*cb, sender, 1, Timestamp(500));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, Timestamp(0), false);
+ replyToGet(*cb, _sender, 1, Timestamp(500));
// Should _not_ send Update operations!
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
+}
+
+// Buckets must be created from scratch by Put operations, updates alone cannot do this.
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_no_initial_replicas_exist) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(true);
+
+ // No replicas, technically consistent but cannot use fast path.
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0",
+ _sender.getCommands(true));
}
// The weak consistency config _only_ applies to Get operations initiated directly
@@ -1095,15 +1067,218 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_use_weak_internal_read_consistency_for_client_gets(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- auto& get_cmd = dynamic_cast<const api::GetCommand&>(*sender.command(0));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
}
+struct ThreePhaseUpdateTest : TwoPhaseUpdateOperationTest {};
+
+TEST_F(ThreePhaseUpdateTest, metadata_only_gets_are_sent_if_3phase_update_enabled) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
+ EXPECT_EQ("[none]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak);
+ checkMessageSettingsPropagatedTo(_sender.command(0));
+ }
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(1));
+ EXPECT_EQ("[none]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak);
+ checkMessageSettingsPropagatedTo(_sender.command(1));
+ }
+}
+
+TEST_F(ThreePhaseUpdateTest, full_document_get_sent_to_replica_with_highest_timestamp) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 1000U);
+ reply_to_metadata_get(*cb, _sender, 1, 2000U);
+ // Node 1 has newest document version at ts=2000
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(2));
+ EXPECT_EQ("[all]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
+ }
+}
+
+TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 2000U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+ replyToGet(*cb, _sender, 2, 2000U);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+}
+
+TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ api::Timestamp old_timestamp(1500);
+ reply_to_metadata_get(*cb, _sender, 0, old_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, old_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ replyToMessage(*cb, _sender, 2, old_timestamp);
+ replyToMessage(*cb, _sender, 3, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 1500) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(ThreePhaseUpdateTest, fast_path_not_restarted_if_document_not_found_subset_of_replicas) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 0U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2)); // Not sending updates.
+}
+
+TEST_F(ThreePhaseUpdateTest, no_document_found_on_any_replicas_is_considered_consistent) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ api::Timestamp no_document_timestamp(0);
+ reply_to_metadata_get(*cb, _sender, 0, no_document_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, no_document_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(ThreePhaseUpdateTest, metadata_get_phase_fails_if_any_replicas_return_failure) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE);
+ ASSERT_EQ("", _sender.getCommands(true, false, 2)); // No further requests sent.
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(ABORTED, One or more metadata Get operations failed; aborting Update)",
+ _sender.getLastReply(true));
+}
+
+TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_set_changed_after_metadata_gets) {
+ setupDistributor(3, 3, "storage:3 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
+ cb->start(_sender, framework::MilliSecTime(0));
+ // Add new replica to deterministic test bucket after gets have been sent
+ BucketId bucket(0x400000000000cac4); // Always the same in the test.
+ addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, old_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)",
+ _sender.getLastReply(true));
+}
+
+/*
+ * We unify checking for changed replica sets and changed bucket ownership by only
+ * checking for changed replica sets, thereby avoiding a relatively costly ideal
+ * state recomputation that is otherwise redundant. Rationale for why this shall
+ * always be safe:
+ * - for metadata gets to be sent at all, there must be at least one replica
+ * under the target bucket subtree
+ * - if there are no replicas, the bucket is implicitly considered inconsistent,
+ * triggering safe path
+ * - since there were no replicas initially, the safe path will _not_ restart in
+ * fast path
+ * - the safe path will perform the update locally and start a PutOperation,
+ * implicitly creating new replicas
+ * - this happens in the same execution context as starting the update operation itself,
+ * consequently ownership in DB cannot have changed concurrently
+ * - when the a state transition happens where a distributor loses ownership of
+ * a bucket, it will always immediately purge it from its DB
+ * - this means that the replica set will inherently change
+ *
+ * It is technically possible to have an ABA situation where, in the course of
+ * an operation's lifetime, a distributor goes from owning a bucket to not
+ * owning it, back to owning it again. Although extremely unlikely to happen,
+ * it doesn't matter since the bucket info from the resulting mutations will
+ * be applied to the current state of the database anyway.
+ */
+TEST_F(ThreePhaseUpdateTest, update_aborted_if_ownership_changed_between_gets_and_fast_restart_update) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ // See do_test_ownership_changed_between_gets_and_second_phase() for more in-depth
+ // comments on why this particular cluster state is used.
+ enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
+ getBucketDatabase().clear();
+ reply_to_metadata_get(*cb, _sender, 0, api::Timestamp(70));
+ reply_to_metadata_get(*cb, _sender, 1, api::Timestamp(71));
+
+ // As mentioned in the above comments, ownership changes trigger
+ // on the replicas changed test instead of an explicit ownership
+ // change test.
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)",
+ _sender.getLastReply(true));
+}
+
+TEST_F(ThreePhaseUpdateTest, safe_mode_is_implicitly_triggered_if_no_replicas_exist) {
+ setupDistributor(1, 1, "storage:1 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
+ "Reasons to start: => 0,"
+ "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, "
+ "timestamp 200000000, size 60) => 0",
+ _sender.getCommands(true, true));
+}
+
+TEST_F(ThreePhaseUpdateTest, metadata_gets_propagate_mbus_trace_to_reply) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE,
+ "'ello 'ello what's all this then?");
+ ASSERT_EQ("", _sender.getCommands(true, false, 2));
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
+
+ std::string trace(_sender.replies().back()->getTrace().toString());
+ ASSERT_THAT(trace, HasSubstr("'ello 'ello what's all this then?"));
+}
+
+TEST_F(ThreePhaseUpdateTest, single_get_mbus_trace_is_propagated_to_reply) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 0U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ replyToGet(*cb, _sender, 2, 2000U, false, api::ReturnCode::INTERNAL_FAILURE,
+ "it is me, Leclerc! *lifts glasses*");
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
+
+ std::string trace(_sender.replies().back()->getTrace().toString());
+ ASSERT_THAT(trace, HasSubstr("it is me, Leclerc! *lifts glasses*"));
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.