summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-19 11:42:56 +0100
committerGitHub <noreply@github.com>2020-03-19 11:42:56 +0100
commit141c98cbcf97153858d6ae5a535e331d7cf174ca (patch)
treeb7fe058b9255bf856e7db4f545095f467e0ed3d3 /storage
parentd36f42b5fb276c1bf11d8ec412e57b770fdb62e0 (diff)
parent611e572184e095ceaa0fb09e7a2da062a699b1d3 (diff)
Merge pull request #12586 from vespa-engine/vekterli/add-cheap-metadata-fetch-phase-for-updates
Add initial metadata-only phase to inconsistent update handling
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp17
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp85
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp868
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp38
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h27
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/newest_replica.h44
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp163
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h29
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h12
14 files changed, 894 insertions, 423 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index afc3b254c64..559784121a6 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -193,6 +193,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_metadata_update_phase_enabled(bool enabled) {
+ ConfigBuilder builder;
+ builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -1044,6 +1050,17 @@ TEST_F(DistributorTest, merge_disabling_config_is_propagated_to_internal_config)
EXPECT_FALSE(getConfig().merge_operations_disabled());
}
+TEST_F(DistributorTest, metadata_update_phase_config_is_propagated_to_internal_config) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_metadata_update_phase_enabled(true);
+ EXPECT_TRUE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates());
+
+ configure_metadata_update_phase_enabled(false);
+ EXPECT_FALSE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates());
+}
+
TEST_F(DistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) {
createLinks(true);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 0dbec8444cc..f14b78094d1 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -21,6 +21,7 @@ using config::ConfigGetter;
using document::DocumenttypesConfig;
using config::FileSpec;
using document::test::makeDocumentBucket;
+using document::BucketId;
using namespace ::testing;
namespace storage::distributor {
@@ -29,8 +30,8 @@ struct GetOperationTest : Test, DistributorTestUtil {
std::shared_ptr<const document::DocumentTypeRepo> _repo;
document::DocumentId docId;
- document::BucketId bucketId;
- std::unique_ptr<Operation> op;
+ BucketId bucketId;
+ std::unique_ptr<GetOperation> op;
GetOperationTest();
~GetOperationTest() override;
@@ -52,7 +53,7 @@ struct GetOperationTest : Test, DistributorTestUtil {
}
void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) {
- auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]");
+ auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, "[all]");
op = std::make_unique<GetOperation>(
getExternalOperationHandler(), getDistributorBucketSpace(),
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
@@ -135,6 +136,14 @@ struct GetOperationTest : Test, DistributorTestUtil {
GetOperationTest::GetOperationTest() = default;
GetOperationTest::~GetOperationTest() = default;
+namespace {
+
+NewestReplica replica_of(api::Timestamp ts, const document::BucketId& bucket_id, uint16_t node) {
+ return NewestReplica::of(ts, bucket_id, node);
+}
+
+}
+
TEST_F(GetOperationTest, simple) {
setClusterState("distributor:1 storage:2");
@@ -149,7 +158,10 @@ TEST_F(GetOperationTest, simple) {
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0), *op->newest_replica());
}
TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) {
@@ -167,7 +179,10 @@ TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 0), *op->newest_replica());
}
TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
@@ -179,15 +194,18 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
- ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "newauthor", 2));
- ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "oldauthor", 1));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "newauthor", 2));
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "oldauthor", 1));
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 1), *op->newest_replica());
}
TEST_F(GetOperationTest, send_to_all_invalid_copies) {
@@ -205,8 +223,9 @@ TEST_F(GetOperationTest, send_to_all_invalid_copies) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
}
@@ -235,8 +254,8 @@ TEST_F(GetOperationTest, send_to_all_invalid_nodes_when_inconsistent) {
TEST_F(GetOperationTest, inconsistent_split) {
setClusterState("distributor:1 storage:4");
- addNodesToBucketDB(document::BucketId(16, 0x0593), "0=100");
- addNodesToBucketDB(document::BucketId(17, 0x10593), "1=200");
+ addNodesToBucketDB(BucketId(16, 0x0593), "0=100");
+ addNodesToBucketDB(BucketId(17, 0x10593), "1=200");
sendGet();
@@ -248,9 +267,13 @@ TEST_F(GetOperationTest, inconsistent_split) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ // Bucket with highest timestamp should be returned. In this case it's the one on node 0.
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(2), BucketId(16, 0x0593), 0), *op->newest_replica());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
@@ -268,6 +291,8 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
}
@@ -288,7 +313,11 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 1), *op->newest_replica());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket) {
@@ -308,6 +337,8 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
}
@@ -331,7 +362,12 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ // First send to node 2 fails, second is to node 3 which returned the highest timestamp
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 3), *op->newest_replica());
}
TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
@@ -342,6 +378,8 @@ TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas()); // Nothing in the bucket, so nothing to be inconsistent with.
}
@@ -363,7 +401,14 @@ TEST_F(GetOperationTest, not_found) {
EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
failures.notfound.getValue());
+ EXPECT_FALSE(op->any_replicas_failed()); // "Not found" is not a failure.
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ EXPECT_TRUE(op->newest_replica().has_value());
+ // "Not found" is still a success with a timestamp of 0. This is because
+ // the caller may want to perform special logic if all replicas are in sync
+ // but are missing the document.
+ // FIXME make sure all callers are aware of this!
+ EXPECT_EQ(replica_of(api::Timestamp(0), bucketId, 0), *op->newest_replica());
}
TEST_F(GetOperationTest, not_found_on_subset_of_replicas_marks_get_as_inconsistent) {
@@ -403,8 +448,12 @@ TEST_F(GetOperationTest, resend_on_storage_failure) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
// Replica had read failure, but they're still in sync. An immutable Get won't change that fact.
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 2), *op->newest_replica());
}
TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) {
@@ -417,7 +466,11 @@ TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_in
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 2), *op->newest_replica());
}
TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
@@ -442,7 +495,10 @@ TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(IO_FAILURE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas()); // Doesn't really matter since operation itself failed
+ EXPECT_FALSE(op->newest_replica().has_value());
}
TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
@@ -462,6 +518,8 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 1), *op->newest_replica());
}
TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
@@ -469,7 +527,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
// Node 0 is local copy to distributor 0 and will be preferred when
// sending initially.
- addNodesToBucketDB(document::BucketId(16, 0x0593), "2=100,0=100");
+ addNodesToBucketDB(BucketId(16, 0x0593), "2=100,0=100");
sendGet();
@@ -487,9 +545,12 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newestauthor", getLastReplyAuthor());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(3), BucketId(16, 0x0593), 2), *op->newest_replica());
}
TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index f1900c40d56..f3ce4d92263 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -32,9 +32,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
document::TestDocRepo _testRepo;
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _doc_type;
+ DistributorMessageSenderStub _sender;
TwoPhaseUpdateOperationTest();
- ~TwoPhaseUpdateOperationTest();
+ ~TwoPhaseUpdateOperationTest() override;
void checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const;
@@ -81,6 +82,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
+ void reply_to_metadata_get(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t old_timestamp,
+ api::ReturnCode::Result result = api::ReturnCode::OK,
+ const std::string& trace_msg = "");
+
struct UpdateOptions {
bool _makeInconsistentSplit;
bool _createIfNonExistent;
@@ -130,6 +139,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
Timestamp highest_get_timestamp,
Timestamp expected_response_timestamp);
+ std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
+ return cb;
+ }
+
};
TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default;
@@ -218,6 +235,24 @@ TwoPhaseUpdateOperationTest::replyToGet(
callback.receive(sender, reply);
}
+void
+TwoPhaseUpdateOperationTest::reply_to_metadata_get(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t old_timestamp,
+ api::ReturnCode::Result result,
+ const std::string& trace_msg)
+{
+ auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index));
+ auto reply = std::make_shared<api::GetReply>(get, std::shared_ptr<Document>(), old_timestamp);
+ reply->setResult(api::ReturnCode(result, ""));
+ if (!trace_msg.empty()) {
+ MBUS_TRACE(reply->getTrace(), 1, trace_msg);
+ }
+ callback.receive(sender, reply);
+}
+
namespace {
struct DummyTransportContext : api::TransportContext {
@@ -281,234 +316,212 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
TEST_F(TwoPhaseUpdateOperationTest, simple) {
setupDistributor(1, 1, "storage:1 distributor:1");
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
+ replyToMessage(*cb, _sender, 0, 90);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
setupDistributor(1, 1, "storage:1 distributor:1");
-
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate(""));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("");
+ cb->start(_sender, framework::MilliSecTime(0));
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
setupDistributor(1, 1, "storage:1 distributor:1");
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE);
+ replyToMessage(*cb, _sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(INTERNAL_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
- ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true));
+ ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true));
- replyToGet(*cb, sender, 2, 110);
+ replyToGet(*cb, _sender, 2, 110);
- ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", _sender.getCommands(true));
- ASSERT_TRUE(sender.replies().empty());
+ ASSERT_TRUE(_sender.replies().empty());
- replyToPut(*cb, sender, 3);
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 3);
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true));
+ ASSERT_TRUE(_sender.replies().empty());
- ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true));
- ASSERT_TRUE(sender.replies().empty());
-
- replyToGet(*cb, sender, 2, 110, false);
+ replyToGet(*cb, _sender, 2, 110, false);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(INTERNAL_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- ASSERT_TRUE(sender.replies().empty());
- replyToMessage(*cb, sender, 1, 110, api::ReturnCode::IO_FAILURE);
+ replyToMessage(*cb, _sender, 0, 90);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToMessage(*cb, _sender, 1, 110, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
+ _sender.getLastCommand(true));
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 2, 110, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 2, 110, false, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
+ _sender.getLastCommand(true));
- replyToGet(*cb, sender, 2, 110);
+ replyToGet(*cb, _sender, 2, 110);
ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0",
- sender.getCommands(true));
+ _sender.getCommands(true));
- replyToPut(*cb, sender, 3, api::ReturnCode::IO_FAILURE);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 3, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_started) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
- checkMessageSettingsPropagatedTo(sender.commands().back());
+ _sender.getLastCommand(true));
+ checkMessageSettingsPropagatedTo(_sender.commands().back());
enableDistributorClusterState("storage:0 distributor:1");
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 2, 110);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 2, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(NOT_CONNECTED, "
"Can't store document: No storage nodes available)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsistent_split) {
setupDistributor(2, 2, "storage:2 distributor:1");
-
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3",
- UpdateOptions().makeInconsistentSplit(true)));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3", UpdateOptions().makeInconsistentSplit(true));
+ cb->start(_sender, framework::MilliSecTime(0));
std::string wanted("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x440000000000cac4), id:ns:testdoctype1::1) => 0");
- std::string text = sender.getCommands(true, true);
+ std::string text = _sender.getCommands(true, true);
ASSERT_EQ(wanted, text);
- replyToGet(*cb, sender, 0, 90);
- replyToGet(*cb, sender, 1, 120);
+ replyToGet(*cb, _sender, 0, 90);
+ replyToGet(*cb, _sender, 1, 120);
ASSERT_EQ("Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
- replyToPut(*cb, sender, 2);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 3);
+ replyToPut(*cb, _sender, 2);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 3);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 120) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
void
@@ -523,33 +536,30 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_update) {
setupDistributor(1, 1, "storage:1 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- StorageCommand::SP msg(sender.commands().back());
+ StorageCommand::SP msg(_sender.commands().back());
checkMessageSettingsPropagatedTo(msg);
}
TEST_F(TwoPhaseUpdateOperationTest, n_of_m) {
setupDistributor(2, 2, "storage:2 distributor:1", 1);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_TRUE(sender.replies().empty());
- replyToMessage(*cb, sender, 0, 90);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToMessage(*cb, _sender, 0, 90);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
- replyToMessage(*cb, sender, 1, 123);
+ replyToMessage(*cb, _sender, 1, 123);
}
std::string
@@ -565,132 +575,114 @@ TwoPhaseUpdateOperationTest::getUpdatedValueFromLastPut(
TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document) {
setupDistributor(3, 3, "storage:3 distributor:1");
// 0,1 in sync. 2 out of sync.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 2",
- sender.getCommands(true, true));
- replyToGet(*cb, sender, 0, 50);
- replyToGet(*cb, sender, 1, 70);
+ _sender.getCommands(true, true));
+ replyToGet(*cb, _sender, 0, 50);
+ replyToGet(*cb, _sender, 1, 70);
ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
// Make sure Put contains an updated document (+10 arith. update on field
// whose value equals gotten timestamp). In this case we want 70 -> 80.
- ASSERT_EQ("80", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("80", getUpdatedValueFromLastPut(_sender));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 70) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_all_empty_gets) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false);
- replyToGet(*cb, sender, 1, 0, false);
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false);
+ replyToGet(*cb, _sender, 1, 0, false);
// Since create-if-non-existent is set, distributor should create doc from
// scratch.
ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
- ASSERT_EQ("10", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false);
- replyToGet(*cb, sender, 1, 0, false);
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false);
+ replyToGet(*cb, _sender, 1, 0, false);
// Since create-if-non-existent is set, distributor should create doc from
// scratch.
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4, api::ReturnCode::IO_FAILURE);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 0, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 0, false, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setupDistributor(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError()));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError());
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 50);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 70);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 50);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 70);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
@@ -698,95 +690,88 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
"ReturnCode(INTERNAL_FAILURE, Can not apply a "
"\"testdoctype2\" document update to a "
"\"testdoctype1\" document.)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) {
setupDistributor(1, 1, "storage:1 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("", UpdateOptions().createIfNonExistent(true)));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
"Reasons to start: => 0,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true));
+ _sender.getCommands(true, true));
- ASSERT_EQ("10", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender));
- replyToCreateBucket(*cb, sender, 0);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 1);
+ replyToCreateBucket(*cb, _sender, 0);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 1);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_timestamp_constraint) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4",
- UpdateOptions().timestampToUpdate(1234)));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().timestampToUpdate(1234));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 100);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 110);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 100);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(NONE, No document with requested "
"timestamp found)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings_to_gets_and_puts) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- checkMessageSettingsPropagatedTo(sender.command(0));
- checkMessageSettingsPropagatedTo(sender.command(1));
- replyToGet(*cb, sender, 0, 50);
- replyToGet(*cb, sender, 1, 70);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
- checkMessageSettingsPropagatedTo(sender.command(2));
- checkMessageSettingsPropagatedTo(sender.command(3));
- checkMessageSettingsPropagatedTo(sender.command(4));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- replyToPut(*cb, sender, 4);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ checkMessageSettingsPropagatedTo(_sender.command(0));
+ checkMessageSettingsPropagatedTo(_sender.command(1));
+ replyToGet(*cb, _sender, 0, 50);
+ replyToGet(*cb, _sender, 1, 70);
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
+ checkMessageSettingsPropagatedTo(_sender.command(2));
+ checkMessageSettingsPropagatedTo(_sender.command(3));
+ checkMessageSettingsPropagatedTo(_sender.command(4));
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ replyToPut(*cb, _sender, 4);
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replies) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
- replyToGet(*cb, sender, 1, 70);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
- replyToPut(*cb, sender, 2, api::ReturnCode::OK, "fooo");
- replyToPut(*cb, sender, 3, api::ReturnCode::OK, "baaa");
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
+ replyToGet(*cb, _sender, 1, 70);
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
+ replyToPut(*cb, _sender, 2, api::ReturnCode::OK, "fooo");
+ replyToPut(*cb, _sender, 3, api::ReturnCode::OK, "baaa");
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
- ASSERT_EQ("Update Reply", sender.getLastReply(false));
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
- std::string trace(sender.replies().back()->getTrace().toString());
+ std::string trace(_sender.replies().back()->getTrace().toString());
ASSERT_THAT(trace, HasSubstr("hello earthlings"));
ASSERT_THAT(trace, HasSubstr("fooo"));
ASSERT_THAT(trace, HasSubstr("baaa"));
@@ -798,13 +783,11 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
Timestamp expected_response_timestamp)
{
setupDistributor(2, 2, "storage:2 distributor:1");
-
// Update towards inconsistent bucket invokes safe path.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Alter cluster state so that distributor is now down (technically the
// entire cluster is down in this state, but this should not matter). In
@@ -813,8 +796,8 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
// to a bucket we no longer own.
enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
getBucketDatabase().clear();
- replyToGet(*cb, sender, 0, lowest_get_timestamp);
- replyToGet(*cb, sender, 1, highest_get_timestamp);
+ replyToGet(*cb, _sender, 0, lowest_get_timestamp);
+ replyToGet(*cb, _sender, 1, highest_get_timestamp);
// BUCKET_NOT_FOUND is a transient error code which should cause the client
// to re-send the operation, presumably to the correct distributor the next
@@ -827,7 +810,7 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
"ReturnCode(BUCKET_NOT_FOUND, Distributor lost "
"ownership of bucket between executing the read "
"and write phases of a two-phase update operation)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
@@ -843,46 +826,37 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==120")));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
// Newest doc has headerval==110, not 120.
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Condition did not match document)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_updated_doc) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==110")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==110"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with_illegal_params_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.san==fran...cisco")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.san==fran...cisco"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
// replied to. This may change in the future.
// XXX reliance on parser/exception error message is very fragile.
@@ -893,19 +867,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with
"Failed to parse test and set condition: "
"syntax error, unexpected . at column 24 when "
"parsing selection 'testdoctype1.san==fran...cisco')",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_with_illegal_params_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "langbein.headerval=1234")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("langbein.headerval=1234"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
// replied to. This may change in the future.
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
@@ -915,40 +886,35 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_w
"Failed to parse test and set condition: "
"Document type 'langbein' not found at column 1 "
"when parsing selection 'langbein.headerval=1234')",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_auto_create_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==120")));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
// Both Gets return nothing at all, nothing at all.
- replyToGet(*cb, sender, 0, 100, false);
- replyToGet(*cb, sender, 1, 110, false);
+ replyToGet(*cb, _sender, 0, 100, false);
+ replyToGet(*cb, _sender, 1, 110, false);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Document did not exist)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_auto_create_sends_puts) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions()
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions()
.condition("testdoctype1.headerval==120")
- .createIfNonExistent(true)));
+ .createIfNonExistent(true));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100, false);
- replyToGet(*cb, sender, 1, 110, false);
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100, false);
+ replyToGet(*cb, _sender, 1, 110, false);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
}
void
@@ -966,11 +932,10 @@ TwoPhaseUpdateOperationTest::assertAbortedUpdateReplyWithContextPresent(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
setupDistributor(1, 1, "storage:1 distributor:1");
// Only 1 replica; consistent with itself by definition.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
// Close the operation. This should generate a single reply that is
// bound to the original command. We can identify rogue replies by these
// not having a transport context, as these are unique_ptrs that are
@@ -985,11 +950,10 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Closing the operation should now only return an ABORTED reply for
// the UpdateCommand, _not_ from the nested, pending Get operation (which
// will implicitly generate an ABORTED reply for the synthesized Get
@@ -1004,24 +968,23 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_re
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2));
- replyToMessage(*cb, sender, 2, old_timestamp);
- replyToMessage(*cb, sender, 3, old_timestamp);
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ replyToMessage(*cb, _sender, 2, old_timestamp);
+ replyToMessage(*cb, _sender, 3, old_timestamp);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 500) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
@@ -1031,17 +994,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(false);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
// Should _not_ be restarted with fast path, as it has been config disabled
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
@@ -1051,9 +1013,8 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
setupDistributor(3, 3, "storage:3 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
@@ -1065,27 +1026,38 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_found_on_a_replica_node) {
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, Timestamp(0), false);
- replyToGet(*cb, sender, 1, Timestamp(500));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, Timestamp(0), false);
+ replyToGet(*cb, _sender, 1, Timestamp(500));
// Should _not_ send Update operations!
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
+}
+
+// Buckets must be created from scratch by Put operations, updates alone cannot do this.
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_no_initial_replicas_exist) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(true);
+
+ // No replicas, technically consistent but cannot use fast path.
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0",
+ _sender.getCommands(true));
}
// The weak consistency config _only_ applies to Get operations initiated directly
@@ -1095,15 +1067,233 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_use_weak_internal_read_consistency_for_client_gets(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- auto& get_cmd = dynamic_cast<const api::GetCommand&>(*sender.command(0));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
}
+struct ThreePhaseUpdateTest : TwoPhaseUpdateOperationTest {};
+
+TEST_F(ThreePhaseUpdateTest, metadata_only_gets_are_sent_if_3phase_update_enabled) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
+ EXPECT_EQ("[none]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak);
+ checkMessageSettingsPropagatedTo(_sender.command(0));
+ }
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(1));
+ EXPECT_EQ("[none]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak);
+ checkMessageSettingsPropagatedTo(_sender.command(1));
+ }
+}
+
+TEST_F(ThreePhaseUpdateTest, full_document_get_sent_to_replica_with_highest_timestamp) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 1000U);
+ reply_to_metadata_get(*cb, _sender, 1, 2000U);
+ // Node 1 has newest document version at ts=2000
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(2));
+ EXPECT_EQ("[all]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
+ }
+}
+
+TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 2000U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+ replyToGet(*cb, _sender, 2, 2000U);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+}
+
+TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ api::Timestamp old_timestamp(1500);
+ reply_to_metadata_get(*cb, _sender, 0, old_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, old_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ replyToMessage(*cb, _sender, 2, old_timestamp);
+ replyToMessage(*cb, _sender, 3, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 1500) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(ThreePhaseUpdateTest, fast_path_not_restarted_if_document_not_found_subset_of_replicas) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 0U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2)); // Not sending updates.
+}
+
+TEST_F(ThreePhaseUpdateTest, no_document_found_on_any_replicas_is_considered_consistent) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ api::Timestamp no_document_timestamp(0);
+ reply_to_metadata_get(*cb, _sender, 0, no_document_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, no_document_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(ThreePhaseUpdateTest, metadata_get_phase_fails_if_any_replicas_return_failure) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE);
+ ASSERT_EQ("", _sender.getCommands(true, false, 2)); // No further requests sent.
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(ABORTED, One or more metadata Get operations failed; aborting Update)",
+ _sender.getLastReply(true));
+}
+
+TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_set_changed_after_metadata_gets) {
+ setupDistributor(3, 3, "storage:3 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
+ cb->start(_sender, framework::MilliSecTime(0));
+ // Add new replica to deterministic test bucket after gets have been sent
+ BucketId bucket(0x400000000000cac4); // Always the same in the test.
+ addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, old_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)",
+ _sender.getLastReply(true));
+}
+
+TEST_F(ThreePhaseUpdateTest, single_full_get_cannot_restart_in_fast_path) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
+ getConfig().set_update_fast_path_restart_enabled(true);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 1000U);
+ reply_to_metadata_get(*cb, _sender, 1, 2000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ replyToGet(*cb, _sender, 2, 2000U);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+}
+
+/*
+ * We unify checking for changed replica sets and changed bucket ownership by only
+ * checking for changed replica sets, thereby avoiding a relatively costly ideal
+ * state recomputation that is otherwise redundant. Rationale for why this shall
+ * always be safe:
+ * - for metadata gets to be sent at all, there must be at least one replica
+ * under the target bucket subtree
+ * - if there are no replicas, the bucket is implicitly considered inconsistent,
+ * triggering safe path
+ * - since there were no replicas initially, the safe path will _not_ restart in
+ * fast path
+ * - the safe path will perform the update locally and start a PutOperation,
+ * implicitly creating new replicas
+ * - this happens in the same execution context as starting the update operation itself,
+ * consequently ownership in DB cannot have changed concurrently
+ * - when the a state transition happens where a distributor loses ownership of
+ * a bucket, it will always immediately purge it from its DB
+ * - this means that the replica set will inherently change
+ *
+ * It is technically possible to have an ABA situation where, in the course of
+ * an operation's lifetime, a distributor goes from owning a bucket to not
+ * owning it, back to owning it again. Although extremely unlikely to happen,
+ * it doesn't matter since the bucket info from the resulting mutations will
+ * be applied to the current state of the database anyway.
+ */
+TEST_F(ThreePhaseUpdateTest, update_aborted_if_ownership_changed_between_gets_and_fast_restart_update) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ // See do_test_ownership_changed_between_gets_and_second_phase() for more in-depth
+ // comments on why this particular cluster state is used.
+ enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
+ getBucketDatabase().clear();
+ reply_to_metadata_get(*cb, _sender, 0, api::Timestamp(70));
+ reply_to_metadata_get(*cb, _sender, 1, api::Timestamp(71));
+
+ // As mentioned in the above comments, ownership changes trigger
+ // on the replicas changed test instead of an explicit ownership
+ // change test.
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)",
+ _sender.getLastReply(true));
+}
+
+TEST_F(ThreePhaseUpdateTest, safe_mode_is_implicitly_triggered_if_no_replicas_exist) {
+ setupDistributor(1, 1, "storage:1 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
+ "Reasons to start: => 0,"
+ "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, "
+ "timestamp 200000000, size 60) => 0",
+ _sender.getCommands(true, true));
+}
+
+TEST_F(ThreePhaseUpdateTest, metadata_gets_propagate_mbus_trace_to_reply) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE,
+ "'ello 'ello what's all this then?");
+ ASSERT_EQ("", _sender.getCommands(true, false, 2));
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
+
+ std::string trace(_sender.replies().back()->getTrace().toString());
+ ASSERT_THAT(trace, HasSubstr("'ello 'ello what's all this then?"));
+}
+
+TEST_F(ThreePhaseUpdateTest, single_get_mbus_trace_is_propagated_to_reply) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 0U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ replyToGet(*cb, _sender, 2, 2000U, false, api::ReturnCode::INTERNAL_FAILURE,
+ "it is me, Leclerc! *lifts glasses*");
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
+
+ std::string trace(_sender.replies().back()->getTrace().toString());
+ ASSERT_THAT(trace, HasSubstr("it is me, Leclerc! *lifts glasses*"));
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 522561ee8a5..9f51d70ce60 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -42,6 +42,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_update_fast_path_restart_enabled(false),
_merge_operations_disabled(false),
_use_weak_internal_read_consistency_for_client_gets(false),
+ _enable_metadata_only_fetch_phase_for_inconsistent_updates(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -155,6 +156,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent;
_merge_operations_disabled = config.mergeOperationsDisabled;
_use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets;
+ _enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 333d7073715..b8e99165d69 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -235,6 +235,13 @@ public:
return _use_weak_internal_read_consistency_for_client_gets;
}
+ void set_enable_metadata_only_fetch_phase_for_inconsistent_updates(bool enable) noexcept {
+ _enable_metadata_only_fetch_phase_for_inconsistent_updates = enable;
+ }
+ bool enable_metadata_only_fetch_phase_for_inconsistent_updates() const noexcept {
+ return _enable_metadata_only_fetch_phase_for_inconsistent_updates;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -281,6 +288,7 @@ private:
bool _update_fast_path_restart_enabled;
bool _merge_operations_disabled;
bool _use_weak_internal_read_consistency_for_client_gets;
+ bool _enable_metadata_only_fetch_phase_for_inconsistent_updates;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index e4d3182ce7a..915b9b6b304 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -228,3 +228,12 @@ merge_operations_disabled bool default=false
## consistent view of fields across document versions.
## This is mostly useful in a system that is effectively read-only.
use_weak_internal_read_consistency_for_client_gets bool default=false
+
+## If true, adds an initial metadata-only fetch phase to updates that touch buckets
+## with inconsistent replicas. Metadata timestamps are compared and a single full Get
+## is sent _only_ to one node with the highest timestamp. Without a metadata phase,
+## full gets would be sent to _all_ nodes.
+## Setting this option to true always implicitly enables the fast update restart
+## feature, so it's not required to set that config to true, nor will setting it
+## to false actually disable the feature.
+enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false \ No newline at end of file
diff --git a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
index efef4e5e51d..998fc22b1d7 100644
--- a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
@@ -2,6 +2,7 @@
vespa_add_library(storage_distributoroperationexternal OBJECT
SOURCES
getoperation.cpp
+ newest_replica.cpp
putoperation.cpp
removelocationoperation.cpp
removeoperation.cpp
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 9e66c212ac6..f800166a647 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -56,11 +56,12 @@ GetOperation::GetOperation(DistributorComponent& manager,
_msg(std::move(msg)),
_returnCode(api::ReturnCode::OK),
_doc(),
- _lastModified(),
+ _newest_replica(),
_metric(metric),
_operationTimer(manager.getClock()),
_desired_read_consistency(desired_read_consistency),
- _has_replica_inconsistency(false)
+ _has_replica_inconsistency(false),
+ _any_replicas_failed(false)
{
assignTargetNodeGroups(*read_guard);
}
@@ -110,7 +111,9 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::
LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode());
- res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, res[best].copy.getNode(), command);
+ const auto target_node = res[best].copy.getNode();
+ res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, target_node, command);
+ res[best].to_node = target_node;
return true;
}
@@ -145,27 +148,31 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
bool allDone = true;
for (auto& response : _responses) {
for (uint32_t i = 0; i < response.second.size(); i++) {
- if (response.second[i].sent == getreply->getMsgId()) {
+ const auto& bucket_id = response.first.getBucketId();
+ auto& send_state = response.second[i];
+ if (send_state.sent == getreply->getMsgId()) {
LOG(debug, "Get on %s returned %s",
_msg->getDocumentId().toString().c_str(),
getreply->getResult().toString().c_str());
- response.second[i].received = true;
- response.second[i].returnCode = getreply->getResult();
+ send_state.received = true;
+ send_state.returnCode = getreply->getResult();
if (getreply->getResult().success()) {
- if (_lastModified.has_value() && (getreply->getLastModifiedTimestamp() != *_lastModified)) {
+ if (_newest_replica.has_value() && (getreply->getLastModifiedTimestamp() != _newest_replica->timestamp)) {
// At least two document versions returned had different timestamps.
_has_replica_inconsistency = true; // This is a one-way toggle.
}
- if (!_lastModified.has_value() || getreply->getLastModifiedTimestamp() > *_lastModified) {
+ if (!_newest_replica.has_value() || getreply->getLastModifiedTimestamp() > _newest_replica->timestamp) {
_returnCode = getreply->getResult();
- _lastModified = getreply->getLastModifiedTimestamp();
+ assert(response.second[i].to_node != UINT16_MAX);
+ _newest_replica = NewestReplica::of(getreply->getLastModifiedTimestamp(), bucket_id, send_state.to_node);
_doc = getreply->getDocument();
}
} else {
- if (!_lastModified.has_value()) {
- _returnCode = getreply->getResult();
+ _any_replicas_failed = true;
+ if (!_newest_replica.has_value()) {
+ _returnCode = getreply->getResult(); // Don't overwrite if we have a good response.
}
if (!all_bucket_metadata_initially_consistent()) {
// If we're sending to more than a single group of replicas it means our replica set is
@@ -175,7 +182,7 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
}
// Try to send to another node in this checksum group.
- bool sent = sendForChecksum(sender, response.first.getBucketId(), response.second);
+ bool sent = sendForChecksum(sender, bucket_id, response.second);
if (sent) {
allDone = false;
}
@@ -219,7 +226,8 @@ void
GetOperation::sendReply(DistributorMessageSender& sender)
{
if (_msg.get()) {
- auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified.value_or(0), !_has_replica_inconsistency);
+ const auto timestamp = _newest_replica.value_or(NewestReplica::make_empty()).timestamp;
+ auto repl = std::make_shared<api::GetReply>(*_msg, _doc, timestamp, !_has_replica_inconsistency);
repl->setResult(_returnCode);
update_internal_metrics();
sender.sendReply(repl);
@@ -250,9 +258,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
_replicas_in_db.emplace_back(e.getBucketId(), copy.getNode());
if (!copy.valid()) {
- _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy);
+ _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].emplace_back(copy);
} else if (!copy.empty()) {
- _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].push_back(copy);
+ _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].emplace_back(copy);
}
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 1106968bcf7..57e878d9e40 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "newest_replica.h"
#include <vespa/storageapi/defs.h>
#include <vespa/storage/distributor/operations/operation.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
@@ -38,6 +39,9 @@ public:
std::string getStatus() const override { return ""; }
bool all_bucket_metadata_initially_consistent() const;
+ bool any_replicas_failed() const noexcept {
+ return _any_replicas_failed;
+ }
// Exposed for unit testing. TODO feels a bit dirty :I
const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
@@ -50,6 +54,13 @@ public:
return _desired_read_consistency;
}
+ // Note: in the case the document could not be found on any replicas, but
+ // at least one node returned a non-error response, the returned value will
+ // have a timestamp of zero and the most recently asked node as its node.
+ const std::optional<NewestReplica>& newest_replica() const noexcept {
+ return _newest_replica;
+ }
+
private:
class GroupId {
public:
@@ -66,20 +77,19 @@ private:
int _node;
};
- class BucketChecksumGroup {
- public:
- BucketChecksumGroup(const BucketCopy& c) :
- copy(c),
- sent(0), received(false), returnCode(api::ReturnCode::OK)
+ struct BucketChecksumGroup {
+ explicit BucketChecksumGroup(const BucketCopy& c)
+ : copy(c), sent(0), returnCode(api::ReturnCode::OK), to_node(UINT16_MAX), received(false)
{}
BucketCopy copy;
api::StorageMessage::Id sent;
- bool received;
api::ReturnCode returnCode;
+ uint16_t to_node;
+ bool received;
};
- typedef std::vector<BucketChecksumGroup> GroupVector;
+ using GroupVector = std::vector<BucketChecksumGroup>;
// Organize the different copies by bucket/checksum pairs. We should
// try to request GETs from each bucket and each different checksum
@@ -94,13 +104,14 @@ private:
api::ReturnCode _returnCode;
std::shared_ptr<document::Document> _doc;
- std::optional<api::Timestamp> _lastModified;
+ std::optional<NewestReplica> _newest_replica;
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
api::InternalReadConsistency _desired_read_consistency;
bool _has_replica_inconsistency;
+ bool _any_replicas_failed;
void sendReply(DistributorMessageSender& sender);
bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res);
diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp
new file mode 100644
index 00000000000..8ca3b9bf411
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp
@@ -0,0 +1,14 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "newest_replica.h"
+#include <ostream>
+
+namespace storage::distributor {
+
+std::ostream& operator<<(std::ostream& os, const NewestReplica& nr) {
+ os << "NewestReplica(timestamp " << nr.timestamp
+ << ", bucket_id " << nr.bucket_id
+ << ", node " << nr.node << ')';
+ return os;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.h b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h
new file mode 100644
index 00000000000..9eb9c1b8bd0
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h
@@ -0,0 +1,44 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/storageapi/defs.h>
+#include <iosfwd>
+#include <stddef.h>
+
+namespace storage::distributor {
+
+/*
+ * Tracks the information required to identify the location of the newest replica
+ * for any given document. Newest here means the replica containing the document
+ * version with the highest mutation timestamp.
+ */
+struct NewestReplica {
+ api::Timestamp timestamp {0};
+ document::BucketId bucket_id;
+ uint16_t node {UINT16_MAX};
+
+ static NewestReplica of(api::Timestamp timestamp,
+ const document::BucketId& bucket_id,
+ uint16_t node) noexcept {
+ return {timestamp, bucket_id, node};
+ }
+
+ static NewestReplica make_empty() {
+ return {api::Timestamp(0), document::BucketId(), 0};
+ }
+
+ bool operator==(const NewestReplica& rhs) const noexcept {
+ return ((timestamp == rhs.timestamp) &&
+ (bucket_id == rhs.bucket_id) &&
+ (node == rhs.node));
+ }
+ bool operator!=(const NewestReplica& rhs) const noexcept {
+ return !(*this == rhs);
+ }
+};
+
+
+std::ostream& operator<<(std::ostream&, const NewestReplica&);
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 47cacafee80..dd8f8b553fc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -36,6 +36,8 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_sendState(SendState::NONE_SENT),
_mode(Mode::FAST_PATH),
_fast_path_repair_source_node(0xffff),
+ _use_initial_cheap_metadata_fetch_phase(
+ _manager.getDistributor().getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()),
_replySent(false)
{
document::BucketIdFactory idFactory;
@@ -92,10 +94,12 @@ const char*
TwoPhaseUpdateOperation::stateToString(SendState state)
{
switch (state) {
- case SendState::NONE_SENT: return "NONE_SENT";
- case SendState::UPDATES_SENT: return "UPDATES_SENT";
- case SendState::GETS_SENT: return "GETS_SENT";
- case SendState::PUTS_SENT: return "PUTS_SENT";
+ case SendState::NONE_SENT: return "NONE_SENT";
+ case SendState::UPDATES_SENT: return "UPDATES_SENT";
+ case SendState::METADATA_GETS_SENT: return "METADATA_GETS_SENT";
+ case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT";
+ case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT";
+ case SendState::PUTS_SENT: return "PUTS_SENT";
default:
assert(!"Unknown state");
return "";
@@ -159,7 +163,7 @@ void
TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
{
_mode = Mode::FAST_PATH;
- LOG(debug, "Update(%s) fast path: sending Update commands", _updateCmd->getDocumentId().toString().c_str());
+ LOG(debug, "Update(%s) fast path: sending Update commands", update_doc_id().c_str());
auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric);
UpdateOperation & op = *updateOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
@@ -174,26 +178,50 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
void
TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
{
- LOG(debug, "Update(%s) safe path: sending Get commands", _updateCmd->getDocumentId().toString().c_str());
-
_mode = Mode::SLOW_PATH;
- document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
- auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(),"[all]");
- copyMessageSettings(*_updateCmd, *get);
- auto getOperation = std::make_shared<GetOperation>(
- _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric);
- GetOperation & op = *getOperation;
- IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender);
+ auto get_operation = create_initial_safe_path_get_operation();
+ GetOperation& op = *get_operation;
+ IntermediateMessageSender intermediate(_sentMessageMap, std::move(get_operation), sender);
_replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time
op.start(intermediate, _manager.getClock().getTimeInMillis());
- transitionTo(SendState::GETS_SENT);
+
+ transitionTo(_use_initial_cheap_metadata_fetch_phase
+ ? SendState::METADATA_GETS_SENT
+ : SendState::FULL_GETS_SENT);
if (intermediate._reply.get()) {
assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
+ // We always trigger the safe path Get reply handling here regardless of whether
+ // metadata-only or full Gets were sent. This is because we might get an early
+ // reply due to there being no replicas in existence at all for the target bucket.
+ // In this case, we rely on the safe path fallback to implicitly create the bucket
+ // by performing the update locally and sending CreateBucket+Put to the ideal nodes.
handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply));
}
}
+std::shared_ptr<GetOperation>
+TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() {
+ document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
+ const char* field_set = _use_initial_cheap_metadata_fetch_phase ? "[none]" : "[all]";
+ auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), field_set);
+ copyMessageSettings(*_updateCmd, *get);
+ // Metadata-only Gets just look at the data in the meta-store, not any fields.
+ // The meta-store is always updated before any ACK is returned for a mutation,
+ // so all the information we need is guaranteed to be consistent even with a
+ // weak read. But since weak reads allow the Get operation to bypass commit
+ // queues, latency may be greatly reduced in contended situations.
+ auto read_consistency = (_use_initial_cheap_metadata_fetch_phase
+ ? api::InternalReadConsistency::Weak
+ : api::InternalReadConsistency::Strong);
+ LOG(debug, "Update(%s) safe path: sending Get commands with field set '%s' "
+ "and internal read consistency %s",
+ update_doc_id().c_str(), field_set, api::to_string(read_consistency));
+ return std::make_shared<GetOperation>(
+ _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(),
+ get, _getMetric, read_consistency);
+}
+
void
TwoPhaseUpdateOperation::onStart(DistributorMessageSender& sender) {
if (isFastPathPossible()) {
@@ -247,7 +275,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
transitionTo(SendState::PUTS_SENT);
LOG(debug, "Update(%s): sending Put commands with doc %s",
- _updateCmd->getDocumentId().toString().c_str(), doc->toString(true).c_str());
+ update_doc_id().c_str(), doc->toString(true).c_str());
if (intermediate._reply.get()) {
sendReplyWithResult(sender, intermediate._reply->getResult());
@@ -269,13 +297,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
const std::shared_ptr<api::StorageReply>& msg)
{
if (msg->getType() == api::MessageType::GET_REPLY) {
- assert(_sendState == SendState::GETS_SENT);
- api::GetReply& getReply = static_cast<api::GetReply&> (*msg);
+ assert(_sendState == SendState::FULL_GETS_SENT);
+ auto& getReply = static_cast<api::GetReply&>(*msg);
addTraceFromReply(getReply);
- LOG(debug, "Update(%s) Get reply had result: %s",
- _updateCmd->getDocumentId().toString().c_str(),
- getReply.getResult().toString().c_str());
+ LOG(debug, "Update(%s) fast path: Get reply had result %s",
+ update_doc_id().c_str(), getReply.getResult().toString().c_str());
if (!getReply.getResult().success()) {
sendReplyWithResult(sender, getReply.getResult());
@@ -310,7 +337,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
// Failed or was consistent
sendReply(sender, intermediate._reply);
} else {
- LOG(debug, "Update(%s) fast path: was inconsistent!", _updateCmd->getDocumentId().toString().c_str());
+ LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str());
_updateReply = intermediate._reply;
_fast_path_repair_source_node = bestNode.second;
@@ -319,7 +346,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
copyMessageSettings(*_updateCmd, *cmd);
sender.sendToNode(lib::NodeType::STORAGE, _fast_path_repair_source_node, cmd);
- transitionTo(SendState::GETS_SENT);
+ transitionTo(SendState::FULL_GETS_SENT);
}
}
} else {
@@ -328,7 +355,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
addTraceFromReply(*intermediate._reply);
sendReplyWithResult(sender, intermediate._reply->getResult());
LOG(warning, "Forced convergence of '%s' using document from node %u",
- _updateCmd->getDocumentId().toString().c_str(), _fast_path_repair_source_node);
+ update_doc_id().c_str(), _fast_path_repair_source_node);
}
}
}
@@ -337,6 +364,15 @@ void
TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
const std::shared_ptr<api::StorageReply>& msg)
{
+ // No explicit operation is associated with the direct replica Get operation,
+ // so we handle its reply separately.
+ if (_sendState == SendState::SINGLE_GET_SENT) {
+ assert(msg->getType() == api::MessageType::GET_REPLY);
+ LOG(spam, "Received single full Get reply for '%s'", update_doc_id().c_str());
+ addTraceFromReply(*msg);
+ handleSafePathReceivedGet(sender, dynamic_cast<api::GetReply&>(*msg));
+ return;
+ }
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
assert(callback.get());
Operation & callbackOp = *callback;
@@ -348,7 +384,12 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
return; // Not enough replies received yet or we're draining callbacks.
}
addTraceFromReply(*intermediate._reply);
- if (_sendState == SendState::GETS_SENT) {
+ if (_sendState == SendState::METADATA_GETS_SENT) {
+ assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
+ const auto& get_op = dynamic_cast<const GetOperation&>(*intermediate.callback);
+ handle_safe_path_received_metadata_get(sender, static_cast<api::GetReply&>(*intermediate._reply),
+ get_op.newest_replica(), get_op.any_replicas_failed());
+ } else if (_sendState == SendState::FULL_GETS_SENT) {
assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply));
} else if (_sendState == SendState::PUTS_SENT) {
@@ -359,6 +400,60 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
}
}
+void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
+ DistributorMessageSender& sender, api::GetReply& reply,
+ const std::optional<NewestReplica>& newest_replica,
+ bool any_replicas_failed)
+{
+ LOG(debug, "Update(%s): got (metadata only) Get reply with result %s",
+ update_doc_id().c_str(), reply.getResult().toString().c_str());
+
+ if (!reply.getResult().success()) {
+ sendReplyWithResult(sender, reply.getResult());
+ return;
+ }
+ // It's possible for a single replica to fail during processing without the entire
+ // Get operation failing. Although we know a priori if replicas are out of sync,
+ // we don't know which one has the highest timestamp (it might have been the one
+ // on the node that the metadata Get just failed towards). To err on the side of
+ // caution we abort the update if this happens. If a simple metadata Get fails, it
+ // is highly likely that a full partial update or put operation would fail as well.
+ if (any_replicas_failed) {
+ LOG(debug, "Update(%s): had failed replicas, aborting update", update_doc_id().c_str());
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::Result::ABORTED,
+ "One or more metadata Get operations failed; aborting Update"));
+ return;
+ }
+ if (!replica_set_unchanged_after_get_operation()) {
+ // Use BUCKET_NOT_FOUND to trigger a silent retry.
+ LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str());
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::Result::BUCKET_NOT_FOUND,
+ "Replica sets changed between update phases, client must retry"));
+ return;
+ }
+ if (reply.had_consistent_replicas()) {
+ LOG(debug, "Update(%s): metadata Gets consistent; restarting in fast path", update_doc_id().c_str());
+ restart_with_fast_path_due_to_consistent_get_timestamps(sender);
+ return;
+ }
+ // If we've gotten here, we must have had no Get failures and replicas must
+ // be somehow inconsistent. Replicas can only be inconsistent if their timestamps
+ // mismatch, so we must have observed at least one non-zero timestamp.
+ assert(newest_replica.has_value() && (newest_replica->timestamp != api::Timestamp(0)));
+ // Timestamps were not in sync, so we have to fetch the document from the highest
+ // timestamped replica, apply the update to it and then explicitly Put the result
+ // to all replicas.
+ document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_replica->bucket_id);
+ LOG(debug, "Update(%s): sending single payload Get to %s on node %u (had timestamp %" PRIu64 ")",
+ update_doc_id().c_str(), bucket.toString().c_str(),
+ newest_replica->node, newest_replica->timestamp);
+ auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), "[all]");
+ copyMessageSettings(*_updateCmd, *cmd);
+ sender.sendToNode(lib::NodeType::STORAGE, newest_replica->node, cmd);
+
+ transitionTo(SendState::SINGLE_GET_SENT);
+}
+
void
TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sender, api::GetReply& reply)
{
@@ -370,7 +465,9 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
sendReplyWithResult(sender, reply.getResult());
return;
}
- if (may_restart_with_fast_path(reply)) {
+ // Single Get could technically be considered consistent with itself, so make
+ // sure we never treat that as sufficient for restarting in the fast path.
+ if ((_sendState != SendState::SINGLE_GET_SENT) && may_restart_with_fast_path(reply)) {
restart_with_fast_path_due_to_consistent_get_timestamps(sender);
return;
}
@@ -395,7 +492,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
return;
} else if (shouldCreateIfNonExistent()) {
LOG(debug, "No existing documents found for %s, creating blank document to update",
- _updateCmd->getUpdate()->getId().toString().c_str());
+ update_doc_id().c_str());
docToUpdate = createBlankDocument();
setUpdatedForTimestamp(putTimestamp);
} else {
@@ -412,7 +509,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) {
return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() &&
- reply.wasFound() &&
+ !_replicas_at_get_send_time.empty() && // To ensure we send CreateBucket+Put if no replicas exist.
reply.had_consistent_replicas() &&
replica_set_unchanged_after_get_operation());
}
@@ -434,12 +531,15 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) {
LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
- _updateCmd->getDocumentId().toString().c_str());
+ update_doc_id().c_str());
if (lostBucketOwnershipBetweenPhases()) {
sendLostOwnershipTransientErrorReply(sender);
return;
}
_updateMetric.fast_path_restarts.inc();
+ // Must not be any other messages in flight, or we might mis-interpret them when we
+ // have switched back to fast-path mode.
+ assert(_sentMessageMap.empty());
startFastPathUpdate(sender);
}
@@ -553,4 +653,9 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) {
}
}
+vespalib::string TwoPhaseUpdateOperation::update_doc_id() const {
+ assert(_updateCmd.get() != nullptr);
+ return _updateCmd->getDocumentId().toString();
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index f9e32c31a9c..f714232b427 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -1,11 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <set>
+#include "newest_replica.h"
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/storage/distributor/persistencemessagetracker.h>
#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/document/update/documentupdate.h>
+#include <set>
namespace document {
class Document;
@@ -23,6 +24,7 @@ class UpdateMetricSet;
namespace distributor {
class DistributorBucketSpace;
+class GetOperation;
/*
* General functional outline:
@@ -45,6 +47,7 @@ class DistributorBucketSpace;
*
* Note that the above case also implicitly handles the case in which a
* bucket does not exist.
+ *
*/
@@ -56,7 +59,7 @@ public:
const std::shared_ptr<api::UpdateCommand> & msg,
DistributorMetricSet& metrics,
SequencingHandle sequencingHandle = SequencingHandle());
- ~TwoPhaseUpdateOperation();
+ ~TwoPhaseUpdateOperation() override;
void onStart(DistributorMessageSender& sender) override;
@@ -69,13 +72,13 @@ public:
void onClose(DistributorMessageSender& sender) override;
- bool canSendHeaderOnly() const;
-
private:
enum class SendState {
NONE_SENT,
UPDATES_SENT,
- GETS_SENT,
+ METADATA_GETS_SENT,
+ SINGLE_GET_SENT,
+ FULL_GETS_SENT,
PUTS_SENT,
};
@@ -85,7 +88,7 @@ private:
};
void transitionTo(SendState newState);
- const char* stateToString(SendState);
+ static const char* stateToString(SendState);
void sendReply(DistributorMessageSender&,
std::shared_ptr<api::StorageReply>&);
@@ -108,10 +111,13 @@ private:
const std::shared_ptr<api::StorageReply>&);
void handleSafePathReceive(DistributorMessageSender&,
const std::shared_ptr<api::StorageReply>&);
- void handleSafePathReceivedGet(DistributorMessageSender&,
- api::GetReply&);
- void handleSafePathReceivedPut(DistributorMessageSender&,
- const api::PutReply&);
+ std::shared_ptr<GetOperation> create_initial_safe_path_get_operation();
+ void handle_safe_path_received_metadata_get(DistributorMessageSender&,
+ api::GetReply&,
+ const std::optional<NewestReplica>&,
+ bool any_replicas_failed);
+ void handleSafePathReceivedGet(DistributorMessageSender&, api::GetReply&);
+ void handleSafePathReceivedPut(DistributorMessageSender&, const api::PutReply&);
bool shouldCreateIfNonExistent() const;
bool processAndMatchTasCondition(
DistributorMessageSender& sender,
@@ -124,6 +130,8 @@ private:
bool may_restart_with_fast_path(const api::GetReply& reply);
bool replica_set_unchanged_after_get_operation() const;
void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender);
+ // Precondition: reply has not yet been sent.
+ vespalib::string update_doc_id() const;
UpdateMetricSet& _updateMetric;
PersistenceOperationMetricSet& _putMetric;
@@ -139,6 +147,7 @@ private:
document::BucketId _updateDocBucketId;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time;
uint16_t _fast_path_repair_source_node;
+ bool _use_initial_cheap_metadata_fetch_phase;
bool _replySent;
};
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 3364c6606eb..405e59f7b31 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -16,26 +16,18 @@ class SentMessageMap
{
public:
SentMessageMap();
-
~SentMessageMap();
std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
-
std::shared_ptr<Operation> pop();
void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg);
-
void clear();
-
uint32_t size() const { return _map.size(); }
-
- uint32_t empty() const { return _map.empty(); }
-
+ [[nodiscard]] bool empty() const noexcept { return _map.empty(); }
std::string toString() const;
-
private:
- typedef std::map<api::StorageMessage::Id, std::shared_ptr<Operation> > Map;
-
+ using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
Map _map;
};