summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-12 16:09:08 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-16 15:14:03 +0000
commiteb28962cb8edfdc3ad75fc9298a71bf31ab96af2 (patch)
treed5410917f69566acffbe494623b36d608ed6b0a5 /storage
parent7ec5934200cdffb5e17a083b6f56e9b4ae37246f (diff)
Add initial metadata-only phase to inconsistent update handling
If bucket replicas are inconsistent, the common case is that only a small subset of documents contained in the buckets are actually mutually out of sync. The added metadata phase optimizes for such a case by initially sending Get requests to all divergent replicas that only ask for the timestamps (and no fields). This is a very cheap and fast operation. If all returned timestamps are in sync, the update can be restarted in the fast path. Otherwise, a full Get will _only_ be sent to the newest replica, and its result will be used for performing the update on the distributor itself, before pushing out the result as Puts. This is in contrast to today's behavior where full Gets are sent to all replicas. For users with large documents this can be very expensive. In addition, the metadata Get operations are sent with weak internal read consistency (as they do not need to read any previously written, possibly in-flight fields). This lets them bypass the main commit queue entirely.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp17
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp85
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp853
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp38
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h27
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/newest_replica.h38
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp159
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h29
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h12
14 files changed, 869 insertions, 422 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index afc3b254c64..559784121a6 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -193,6 +193,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_metadata_update_phase_enabled(bool enabled) {
+ ConfigBuilder builder;
+ builder.enableMetadataOnlyFetchPhaseForInconsistentUpdates = enabled;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -1044,6 +1050,17 @@ TEST_F(DistributorTest, merge_disabling_config_is_propagated_to_internal_config)
EXPECT_FALSE(getConfig().merge_operations_disabled());
}
+TEST_F(DistributorTest, metadata_update_phase_config_is_propagated_to_internal_config) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_metadata_update_phase_enabled(true);
+ EXPECT_TRUE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates());
+
+ configure_metadata_update_phase_enabled(false);
+ EXPECT_FALSE(getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates());
+}
+
TEST_F(DistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) {
createLinks(true);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 0dbec8444cc..f14b78094d1 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -21,6 +21,7 @@ using config::ConfigGetter;
using document::DocumenttypesConfig;
using config::FileSpec;
using document::test::makeDocumentBucket;
+using document::BucketId;
using namespace ::testing;
namespace storage::distributor {
@@ -29,8 +30,8 @@ struct GetOperationTest : Test, DistributorTestUtil {
std::shared_ptr<const document::DocumentTypeRepo> _repo;
document::DocumentId docId;
- document::BucketId bucketId;
- std::unique_ptr<Operation> op;
+ BucketId bucketId;
+ std::unique_ptr<GetOperation> op;
GetOperationTest();
~GetOperationTest() override;
@@ -52,7 +53,7 @@ struct GetOperationTest : Test, DistributorTestUtil {
}
void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) {
- auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]");
+ auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, "[all]");
op = std::make_unique<GetOperation>(
getExternalOperationHandler(), getDistributorBucketSpace(),
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
@@ -135,6 +136,14 @@ struct GetOperationTest : Test, DistributorTestUtil {
GetOperationTest::GetOperationTest() = default;
GetOperationTest::~GetOperationTest() = default;
+namespace {
+
+NewestReplica replica_of(api::Timestamp ts, const document::BucketId& bucket_id, uint16_t node) {
+ return NewestReplica::of(ts, bucket_id, node);
+}
+
+}
+
TEST_F(GetOperationTest, simple) {
setClusterState("distributor:1 storage:2");
@@ -149,7 +158,10 @@ TEST_F(GetOperationTest, simple) {
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0), *op->newest_replica());
}
TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) {
@@ -167,7 +179,10 @@ TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 0), *op->newest_replica());
}
TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
@@ -179,15 +194,18 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
- ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "newauthor", 2));
- ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "oldauthor", 1));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "newauthor", 2));
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "oldauthor", 1));
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 1), *op->newest_replica());
}
TEST_F(GetOperationTest, send_to_all_invalid_copies) {
@@ -205,8 +223,9 @@ TEST_F(GetOperationTest, send_to_all_invalid_copies) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
}
@@ -235,8 +254,8 @@ TEST_F(GetOperationTest, send_to_all_invalid_nodes_when_inconsistent) {
TEST_F(GetOperationTest, inconsistent_split) {
setClusterState("distributor:1 storage:4");
- addNodesToBucketDB(document::BucketId(16, 0x0593), "0=100");
- addNodesToBucketDB(document::BucketId(17, 0x10593), "1=200");
+ addNodesToBucketDB(BucketId(16, 0x0593), "0=100");
+ addNodesToBucketDB(BucketId(17, 0x10593), "1=200");
sendGet();
@@ -248,9 +267,13 @@ TEST_F(GetOperationTest, inconsistent_split) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ // Bucket with highest timestamp should be returned. In this case it's the one on node 0.
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(2), BucketId(16, 0x0593), 0), *op->newest_replica());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
@@ -268,6 +291,8 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
}
@@ -288,7 +313,11 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 1), *op->newest_replica());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket) {
@@ -308,6 +337,8 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
}
@@ -331,7 +362,12 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ // First send to node 2 fails, second is to node 3 which returned the highest timestamp
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 3), *op->newest_replica());
}
TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
@@ -342,6 +378,8 @@ TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_FALSE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas()); // Nothing in the bucket, so nothing to be inconsistent with.
}
@@ -363,7 +401,14 @@ TEST_F(GetOperationTest, not_found) {
EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
failures.notfound.getValue());
+ EXPECT_FALSE(op->any_replicas_failed()); // "Not found" is not a failure.
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ EXPECT_TRUE(op->newest_replica().has_value());
+ // "Not found" is still a success with a timestamp of 0. This is because
+ // the caller may want to perform special logic if all replicas are in sync
+ // but are missing the document.
+ // FIXME make sure all callers are aware of this!
+ EXPECT_EQ(replica_of(api::Timestamp(0), bucketId, 0), *op->newest_replica());
}
TEST_F(GetOperationTest, not_found_on_subset_of_replicas_marks_get_as_inconsistent) {
@@ -403,8 +448,12 @@ TEST_F(GetOperationTest, resend_on_storage_failure) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
// Replica had read failure, but they're still in sync. An immutable Get won't change that fact.
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 2), *op->newest_replica());
}
TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) {
@@ -417,7 +466,11 @@ TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_in
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 2), *op->newest_replica());
}
TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
@@ -442,7 +495,10 @@ TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(IO_FAILURE)",
_sender.getLastReply());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas()); // Doesn't really matter since operation itself failed
+ EXPECT_FALSE(op->newest_replica().has_value());
}
TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
@@ -462,6 +518,8 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 1), *op->newest_replica());
}
TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
@@ -469,7 +527,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
// Node 0 is local copy to distributor 0 and will be preferred when
// sending initially.
- addNodesToBucketDB(document::BucketId(16, 0x0593), "2=100,0=100");
+ addNodesToBucketDB(BucketId(16, 0x0593), "2=100,0=100");
sendGet();
@@ -487,9 +545,12 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
-
EXPECT_EQ("newestauthor", getLastReplyAuthor());
+
+ EXPECT_TRUE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas());
+ ASSERT_TRUE(op->newest_replica().has_value());
+ EXPECT_EQ(replica_of(api::Timestamp(3), BucketId(16, 0x0593), 2), *op->newest_replica());
}
TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index f1900c40d56..82466ec5ed7 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -32,9 +32,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
document::TestDocRepo _testRepo;
std::shared_ptr<const DocumentTypeRepo> _repo;
const DocumentType* _doc_type;
+ DistributorMessageSenderStub _sender;
TwoPhaseUpdateOperationTest();
- ~TwoPhaseUpdateOperationTest();
+ ~TwoPhaseUpdateOperationTest() override;
void checkMessageSettingsPropagatedTo(
const api::StorageCommand::SP& msg) const;
@@ -81,6 +82,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
api::ReturnCode::Result result = api::ReturnCode::OK,
const std::string& traceMsg = "");
+ void reply_to_metadata_get(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t old_timestamp,
+ api::ReturnCode::Result result = api::ReturnCode::OK,
+ const std::string& trace_msg = "");
+
struct UpdateOptions {
bool _makeInconsistentSplit;
bool _createIfNonExistent;
@@ -130,6 +139,14 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
Timestamp highest_get_timestamp,
Timestamp expected_response_timestamp);
+ std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase);
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
+ return cb;
+ }
+
};
TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default;
@@ -218,6 +235,24 @@ TwoPhaseUpdateOperationTest::replyToGet(
callback.receive(sender, reply);
}
+void
+TwoPhaseUpdateOperationTest::reply_to_metadata_get(
+ Operation& callback,
+ DistributorMessageSenderStub& sender,
+ uint32_t index,
+ uint64_t old_timestamp,
+ api::ReturnCode::Result result,
+ const std::string& trace_msg)
+{
+ auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index));
+ auto reply = std::make_shared<api::GetReply>(get, std::shared_ptr<Document>(), old_timestamp);
+ reply->setResult(api::ReturnCode(result, ""));
+ if (!trace_msg.empty()) {
+ MBUS_TRACE(reply->getTrace(), 1, trace_msg);
+ }
+ callback.receive(sender, reply);
+}
+
namespace {
struct DummyTransportContext : api::TransportContext {
@@ -281,234 +316,212 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
TEST_F(TwoPhaseUpdateOperationTest, simple) {
setupDistributor(1, 1, "storage:1 distributor:1");
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
+ replyToMessage(*cb, _sender, 0, 90);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
setupDistributor(1, 1, "storage:1 distributor:1");
-
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate(""));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("");
+ cb->start(_sender, framework::MilliSecTime(0));
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
setupDistributor(1, 1, "storage:1 distributor:1");
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE);
+ replyToMessage(*cb, _sender, 0, 90, api::ReturnCode::INTERNAL_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(INTERNAL_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
- ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true));
+ ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true));
- replyToGet(*cb, sender, 2, 110);
+ replyToGet(*cb, _sender, 2, 110);
- ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0", _sender.getCommands(true));
- ASSERT_TRUE(sender.replies().empty());
+ ASSERT_TRUE(_sender.replies().empty());
- replyToPut(*cb, sender, 3);
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 3);
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_not_found) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
- ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", sender.getLastCommand(true));
- ASSERT_TRUE(sender.replies().empty());
+ ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1", _sender.getLastCommand(true));
+ ASSERT_TRUE(_sender.replies().empty());
- replyToGet(*cb, sender, 2, 110, false);
+ replyToGet(*cb, _sender, 2, 110, false);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(INTERNAL_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_update_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90);
- ASSERT_TRUE(sender.replies().empty());
- replyToMessage(*cb, sender, 1, 110, api::ReturnCode::IO_FAILURE);
+ replyToMessage(*cb, _sender, 0, 90);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToMessage(*cb, _sender, 1, 110, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_get_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
+ _sender.getLastCommand(true));
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 2, 110, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 2, 110, false, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
+ _sender.getLastCommand(true));
- replyToGet(*cb, sender, 2, 110);
+ replyToGet(*cb, _sender, 2, 110);
ASSERT_EQ("Update => 0,Update => 1,Get => 1,Put => 1,Put => 0",
- sender.getCommands(true));
+ _sender.getCommands(true));
- replyToPut(*cb, sender, 3, api::ReturnCode::IO_FAILURE);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 3, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_put_not_started) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
-
- replyToMessage(*cb, sender, 0, 90);
- replyToMessage(*cb, sender, 1, 110);
+ replyToMessage(*cb, _sender, 0, 90);
+ replyToMessage(*cb, _sender, 1, 110);
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 1",
- sender.getLastCommand(true));
- checkMessageSettingsPropagatedTo(sender.commands().back());
+ _sender.getLastCommand(true));
+ checkMessageSettingsPropagatedTo(_sender.commands().back());
enableDistributorClusterState("storage:0 distributor:1");
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 2, 110);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 2, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 110 Was inconsistent "
"(best node 1)) ReturnCode(NOT_CONNECTED, "
"Can't store document: No storage nodes available)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsistent_split) {
setupDistributor(2, 2, "storage:2 distributor:1");
-
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3",
- UpdateOptions().makeInconsistentSplit(true)));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3", UpdateOptions().makeInconsistentSplit(true));
+ cb->start(_sender, framework::MilliSecTime(0));
std::string wanted("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x440000000000cac4), id:ns:testdoctype1::1) => 0");
- std::string text = sender.getCommands(true, true);
+ std::string text = _sender.getCommands(true, true);
ASSERT_EQ(wanted, text);
- replyToGet(*cb, sender, 0, 90);
- replyToGet(*cb, sender, 1, 120);
+ replyToGet(*cb, _sender, 0, 90);
+ replyToGet(*cb, _sender, 1, 120);
ASSERT_EQ("Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x440000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
- replyToPut(*cb, sender, 2);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 3);
+ replyToPut(*cb, _sender, 2);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 3);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 120) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
void
@@ -523,33 +536,30 @@ TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_propagates_message_settings_to_update) {
setupDistributor(1, 1, "storage:1 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
- StorageCommand::SP msg(sender.commands().back());
+ StorageCommand::SP msg(_sender.commands().back());
checkMessageSettingsPropagatedTo(msg);
}
TEST_F(TwoPhaseUpdateOperationTest, n_of_m) {
setupDistributor(2, 2, "storage:2 distributor:1", 1);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true));
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true));
-
- ASSERT_TRUE(sender.replies().empty());
- replyToMessage(*cb, sender, 0, 90);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToMessage(*cb, _sender, 0, 90);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
- replyToMessage(*cb, sender, 1, 123);
+ replyToMessage(*cb, _sender, 1, 123);
}
std::string
@@ -565,132 +575,114 @@ TwoPhaseUpdateOperationTest::getUpdatedValueFromLastPut(
TEST_F(TwoPhaseUpdateOperationTest, safe_path_updates_newest_received_document) {
setupDistributor(3, 3, "storage:3 distributor:1");
// 0,1 in sync. 2 out of sync.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 0,"
"Get(BucketId(0x400000000000cac4), id:ns:testdoctype1::1) => 2",
- sender.getCommands(true, true));
- replyToGet(*cb, sender, 0, 50);
- replyToGet(*cb, sender, 1, 70);
+ _sender.getCommands(true, true));
+ replyToGet(*cb, _sender, 0, 50);
+ replyToGet(*cb, _sender, 1, 70);
ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
// Make sure Put contains an updated document (+10 arith. update on field
// whose value equals gotten timestamp). In this case we want 70 -> 80.
- ASSERT_EQ("80", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("80", getUpdatedValueFromLastPut(_sender));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 70) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, create_if_non_existent_creates_document_if_all_empty_gets) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false);
- replyToGet(*cb, sender, 1, 0, false);
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false);
+ replyToGet(*cb, _sender, 1, 0, false);
// Since create-if-non-existent is set, distributor should create doc from
// scratch.
ASSERT_EQ("Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 1,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 2,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true, 2));
+ _sender.getCommands(true, true, 2));
- ASSERT_EQ("10", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_has_failed_put) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false);
- replyToGet(*cb, sender, 1, 0, false);
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false);
+ replyToGet(*cb, _sender, 1, 0, false);
// Since create-if-non-existent is set, distributor should create doc from
// scratch.
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4, api::ReturnCode::IO_FAILURE);
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4",
- UpdateOptions().createIfNonExistent(true)));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 0, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 0, false, api::ReturnCode::IO_FAILURE);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 0, false, api::ReturnCode::IO_FAILURE);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(IO_FAILURE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
setupDistributor(2, 2, "storage:2 distributor:1");
// Create update for wrong doctype which will fail the update.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError()));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().withError());
+ cb->start(_sender, framework::MilliSecTime(0));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 50);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 70);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 50);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 70);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
@@ -698,95 +690,88 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) {
"ReturnCode(INTERNAL_FAILURE, Can not apply a "
"\"testdoctype2\" document update to a "
"\"testdoctype1\" document.)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) {
setupDistributor(1, 1, "storage:1 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("", UpdateOptions().createIfNonExistent(true)));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
"Reasons to start: => 0,"
"Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, "
"timestamp 200000000, size 60) => 0",
- sender.getCommands(true, true));
+ _sender.getCommands(true, true));
- ASSERT_EQ("10", getUpdatedValueFromLastPut(sender));
+ ASSERT_EQ("10", getUpdatedValueFromLastPut(_sender));
- replyToCreateBucket(*cb, sender, 0);
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 1);
+ replyToCreateBucket(*cb, _sender, 0);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 1);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_timestamp_constraint) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4",
- UpdateOptions().timestampToUpdate(1234)));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().timestampToUpdate(1234));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 100);
- ASSERT_TRUE(sender.replies().empty());
- replyToGet(*cb, sender, 1, 110);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 100);
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToGet(*cb, _sender, 1, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(NONE, No document with requested "
"timestamp found)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_update_propagates_message_settings_to_gets_and_puts) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- checkMessageSettingsPropagatedTo(sender.command(0));
- checkMessageSettingsPropagatedTo(sender.command(1));
- replyToGet(*cb, sender, 0, 50);
- replyToGet(*cb, sender, 1, 70);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
- checkMessageSettingsPropagatedTo(sender.command(2));
- checkMessageSettingsPropagatedTo(sender.command(3));
- checkMessageSettingsPropagatedTo(sender.command(4));
- replyToPut(*cb, sender, 2);
- replyToPut(*cb, sender, 3);
- replyToPut(*cb, sender, 4);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ checkMessageSettingsPropagatedTo(_sender.command(0));
+ checkMessageSettingsPropagatedTo(_sender.command(1));
+ replyToGet(*cb, _sender, 0, 50);
+ replyToGet(*cb, _sender, 1, 70);
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
+ checkMessageSettingsPropagatedTo(_sender.command(2));
+ checkMessageSettingsPropagatedTo(_sender.command(3));
+ checkMessageSettingsPropagatedTo(_sender.command(4));
+ replyToPut(*cb, _sender, 2);
+ replyToPut(*cb, _sender, 3);
+ replyToPut(*cb, _sender, 4);
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replies) {
setupDistributor(3, 3, "storage:3 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
-
- ASSERT_EQ("Get => 0,Get => 2", sender.getCommands(true));
- replyToGet(*cb, sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
- replyToGet(*cb, sender, 1, 70);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
- replyToPut(*cb, sender, 2, api::ReturnCode::OK, "fooo");
- replyToPut(*cb, sender, 3, api::ReturnCode::OK, "baaa");
- ASSERT_TRUE(sender.replies().empty());
- replyToPut(*cb, sender, 4);
+ auto cb = sendUpdate("0=1/2/3,1=1/2/3,2=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("Get => 0,Get => 2", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, 50, true, api::ReturnCode::OK, "hello earthlings");
+ replyToGet(*cb, _sender, 1, 70);
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
+ replyToPut(*cb, _sender, 2, api::ReturnCode::OK, "fooo");
+ replyToPut(*cb, _sender, 3, api::ReturnCode::OK, "baaa");
+ ASSERT_TRUE(_sender.replies().empty());
+ replyToPut(*cb, _sender, 4);
- ASSERT_EQ("Update Reply", sender.getLastReply(false));
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
- std::string trace(sender.replies().back()->getTrace().toString());
+ std::string trace(_sender.replies().back()->getTrace().toString());
ASSERT_THAT(trace, HasSubstr("hello earthlings"));
ASSERT_THAT(trace, HasSubstr("fooo"));
ASSERT_THAT(trace, HasSubstr("baaa"));
@@ -798,13 +783,11 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
Timestamp expected_response_timestamp)
{
setupDistributor(2, 2, "storage:2 distributor:1");
-
// Update towards inconsistent bucket invokes safe path.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Alter cluster state so that distributor is now down (technically the
// entire cluster is down in this state, but this should not matter). In
@@ -813,8 +796,8 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
// to a bucket we no longer own.
enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
getBucketDatabase().clear();
- replyToGet(*cb, sender, 0, lowest_get_timestamp);
- replyToGet(*cb, sender, 1, highest_get_timestamp);
+ replyToGet(*cb, _sender, 0, lowest_get_timestamp);
+ replyToGet(*cb, _sender, 1, highest_get_timestamp);
// BUCKET_NOT_FOUND is a transient error code which should cause the client
// to re-send the operation, presumably to the correct distributor the next
@@ -827,7 +810,7 @@ void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_sec
"ReturnCode(BUCKET_NOT_FOUND, Distributor lost "
"ownership of bucket between executing the read "
"and write phases of a two-phase update operation)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
@@ -843,46 +826,37 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==120")));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
// Newest doc has headerval==110, not 120.
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Condition did not match document)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_updated_doc) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==110")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==110"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with_illegal_params_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.san==fran...cisco")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.san==fran...cisco"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
// replied to. This may change in the future.
// XXX reliance on parser/exception error message is very fragile.
@@ -893,19 +867,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_parse_failure_fails_with
"Failed to parse test and set condition: "
"syntax error, unexpected . at column 24 when "
"parsing selection 'testdoctype1.san==fran...cisco')",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_with_illegal_params_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "langbein.headerval=1234")));
-
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100);
- replyToGet(*cb, sender, 1, 110);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("langbein.headerval=1234"));
+
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100);
+ replyToGet(*cb, _sender, 1, 110);
// NOTE: condition is currently not attempted parsed until Gets have been
// replied to. This may change in the future.
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
@@ -915,40 +886,35 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_unknown_doc_type_fails_w
"Failed to parse test and set condition: "
"Document type 'langbein' not found at column 1 "
"when parsing selection 'langbein.headerval=1234')",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_auto_create_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition(
- "testdoctype1.headerval==120")));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().condition("testdoctype1.headerval==120"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ cb->start(_sender, framework::MilliSecTime(0));
// Both Gets return nothing at all, nothing at all.
- replyToGet(*cb, sender, 0, 100, false);
- replyToGet(*cb, sender, 1, 110, false);
+ replyToGet(*cb, _sender, 0, 100, false);
+ replyToGet(*cb, _sender, 1, 110, false);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) "
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Document did not exist)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_auto_create_sends_puts) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(
- sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions()
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions()
.condition("testdoctype1.headerval==120")
- .createIfNonExistent(true)));
+ .createIfNonExistent(true));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
- replyToGet(*cb, sender, 0, 100, false);
- replyToGet(*cb, sender, 1, 110, false);
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ cb->start(_sender, framework::MilliSecTime(0));
+ replyToGet(*cb, _sender, 0, 100, false);
+ replyToGet(*cb, _sender, 1, 110, false);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
}
void
@@ -966,11 +932,10 @@ TwoPhaseUpdateOperationTest::assertAbortedUpdateReplyWithContextPresent(
TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
setupDistributor(1, 1, "storage:1 distributor:1");
// Only 1 replica; consistent with itself by definition.
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3"));
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3");
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Update => 0", sender.getCommands(true));
+ ASSERT_EQ("Update => 0", _sender.getCommands(true));
// Close the operation. This should generate a single reply that is
// bound to the original command. We can identify rogue replies by these
// not having a transport context, as these are unique_ptrs that are
@@ -985,11 +950,10 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_close_edge_sends_correct_reply) {
TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
setupDistributor(2, 2, "storage:2 distributor:1");
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
// Closing the operation should now only return an ABORTED reply for
// the UpdateCommand, _not_ from the nested, pending Get operation (which
// will implicitly generate an ABORTED reply for the synthesized Get
@@ -1004,24 +968,23 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_re
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
- ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2));
- replyToMessage(*cb, sender, 2, old_timestamp);
- replyToMessage(*cb, sender, 3, old_timestamp);
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ replyToMessage(*cb, _sender, 2, old_timestamp);
+ replyToMessage(*cb, _sender, 3, old_timestamp);
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 500) "
"ReturnCode(NONE)",
- sender.getLastReply(true));
+ _sender.getLastReply(true));
auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
@@ -1031,17 +994,16 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(false);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
// Should _not_ be restarted with fast path, as it has been config disabled
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
@@ -1051,9 +1013,8 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
setupDistributor(3, 3, "storage:3 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
@@ -1065,27 +1026,38 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_alter
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
Timestamp old_timestamp = 500;
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, old_timestamp);
- replyToGet(*cb, sender, 1, old_timestamp);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, old_timestamp);
+ replyToGet(*cb, _sender, 1, old_timestamp);
- ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true, false, 2));
}
TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_found_on_a_replica_node) {
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- replyToGet(*cb, sender, 0, Timestamp(0), false);
- replyToGet(*cb, sender, 1, Timestamp(500));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ replyToGet(*cb, _sender, 0, Timestamp(0), false);
+ replyToGet(*cb, _sender, 1, Timestamp(500));
// Should _not_ send Update operations!
- ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
+}
+
+// Buckets must be created from scratch by Put operations, updates alone cannot do this.
+TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_no_initial_replicas_exist) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(true);
+
+ // No replicas, technically consistent but cannot use fast path.
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0",
+ _sender.getCommands(true));
}
// The weak consistency config _only_ applies to Get operations initiated directly
@@ -1095,15 +1067,218 @@ TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency
setupDistributor(2, 2, "storage:2 distributor:1");
getConfig().set_use_weak_internal_read_consistency_for_client_gets(true);
- std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
- DistributorMessageSenderStub sender;
- cb->start(sender, framework::MilliSecTime(0));
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas.
+ cb->start(_sender, framework::MilliSecTime(0));
- ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
- auto& get_cmd = dynamic_cast<const api::GetCommand&>(*sender.command(0));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
}
+struct ThreePhaseUpdateTest : TwoPhaseUpdateOperationTest {};
+
+TEST_F(ThreePhaseUpdateTest, metadata_only_gets_are_sent_if_3phase_update_enabled) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
+ EXPECT_EQ("[none]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak);
+ checkMessageSettingsPropagatedTo(_sender.command(0));
+ }
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(1));
+ EXPECT_EQ("[none]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Weak);
+ checkMessageSettingsPropagatedTo(_sender.command(1));
+ }
+}
+
+TEST_F(ThreePhaseUpdateTest, full_document_get_sent_to_replica_with_highest_timestamp) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 1000U);
+ reply_to_metadata_get(*cb, _sender, 1, 2000U);
+ // Node 1 has newest document version at ts=2000
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ {
+ auto& get_cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(2));
+ EXPECT_EQ("[all]", get_cmd.getFieldSet());
+ EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong);
+ }
+}
+
+TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 2000U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
+ replyToGet(*cb, _sender, 2, 2000U);
+ ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+}
+
+TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ api::Timestamp old_timestamp(1500);
+ reply_to_metadata_get(*cb, _sender, 0, old_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, old_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ replyToMessage(*cb, _sender, 2, old_timestamp);
+ replyToMessage(*cb, _sender, 3, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 1500) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply(true));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(ThreePhaseUpdateTest, fast_path_not_restarted_if_document_not_found_subset_of_replicas) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 0U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2)); // Not sending updates.
+}
+
+TEST_F(ThreePhaseUpdateTest, no_document_found_on_any_replicas_is_considered_consistent) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ api::Timestamp no_document_timestamp(0);
+ reply_to_metadata_get(*cb, _sender, 0, no_document_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, no_document_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true, false, 2));
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(ThreePhaseUpdateTest, metadata_get_phase_fails_if_any_replicas_return_failure) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE);
+ ASSERT_EQ("", _sender.getCommands(true, false, 2)); // No further requests sent.
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(ABORTED, One or more metadata Get operations failed; aborting Update)",
+ _sender.getLastReply(true));
+}
+
+TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_set_changed_after_metadata_gets) {
+ setupDistributor(3, 3, "storage:3 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
+ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more.
+ cb->start(_sender, framework::MilliSecTime(0));
+ // Add new replica to deterministic test bucket after gets have been sent
+ BucketId bucket(0x400000000000cac4); // Always the same in the test.
+ addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, old_timestamp);
+ reply_to_metadata_get(*cb, _sender, 1, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)",
+ _sender.getLastReply(true));
+}
+
+/*
+ * We unify checking for changed replica sets and changed bucket ownership by only
+ * checking for changed replica sets, thereby avoiding a relatively costly ideal
+ * state recomputation that is otherwise redundant. Rationale for why this shall
+ * always be safe:
+ * - for metadata gets to be sent at all, there must be at least one replica
+ * under the target bucket subtree
+ * - if there are no replicas, the bucket is implicitly considered inconsistent,
+ * triggering safe path
+ * - since there were no replicas initially, the safe path will _not_ restart in
+ * fast path
+ * - the safe path will perform the update locally and start a PutOperation,
+ * implicitly creating new replicas
+ * - this happens in the same execution context as starting the update operation itself,
+ * consequently ownership in DB cannot have changed concurrently
+ * - when the a state transition happens where a distributor loses ownership of
+ * a bucket, it will always immediately purge it from its DB
+ * - this means that the replica set will inherently change
+ *
+ * It is technically possible to have an ABA situation where, in the course of
+ * an operation's lifetime, a distributor goes from owning a bucket to not
+ * owning it, back to owning it again. Although extremely unlikely to happen,
+ * it doesn't matter since the bucket info from the resulting mutations will
+ * be applied to the current state of the database anyway.
+ */
+TEST_F(ThreePhaseUpdateTest, update_aborted_if_ownership_changed_between_gets_and_fast_restart_update) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ // See do_test_ownership_changed_between_gets_and_second_phase() for more in-depth
+ // comments on why this particular cluster state is used.
+ enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
+ getBucketDatabase().clear();
+ reply_to_metadata_get(*cb, _sender, 0, api::Timestamp(70));
+ reply_to_metadata_get(*cb, _sender, 1, api::Timestamp(71));
+
+ // As mentioned in the above comments, ownership changes trigger
+ // on the replicas changed test instead of an explicit ownership
+ // change test.
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 0) "
+ "ReturnCode(BUCKET_NOT_FOUND, Replica sets changed between update phases, client must retry)",
+ _sender.getLastReply(true));
+}
+
+TEST_F(ThreePhaseUpdateTest, safe_mode_is_implicitly_triggered_if_no_replicas_exist) {
+ setupDistributor(1, 1, "storage:1 distributor:1");
+ getConfig().set_enable_metadata_only_fetch_phase_for_inconsistent_updates(true);
+ auto cb = sendUpdate("", UpdateOptions().createIfNonExistent(true));
+ cb->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ("CreateBucketCommand(BucketId(0x400000000000cac4), active) "
+ "Reasons to start: => 0,"
+ "Put(BucketId(0x400000000000cac4), id:ns:testdoctype1::1, "
+ "timestamp 200000000, size 60) => 0",
+ _sender.getCommands(true, true));
+}
+
+TEST_F(ThreePhaseUpdateTest, metadata_gets_propagate_mbus_trace_to_reply) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ reply_to_metadata_get(*cb, _sender, 0, 0U, api::ReturnCode::INTERNAL_FAILURE,
+ "'ello 'ello what's all this then?");
+ ASSERT_EQ("", _sender.getCommands(true, false, 2));
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
+
+ std::string trace(_sender.replies().back()->getTrace().toString());
+ ASSERT_THAT(trace, HasSubstr("'ello 'ello what's all this then?"));
+}
+
+TEST_F(ThreePhaseUpdateTest, single_get_mbus_trace_is_propagated_to_reply) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 0U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ replyToGet(*cb, _sender, 2, 2000U, false, api::ReturnCode::INTERNAL_FAILURE,
+ "it is me, Leclerc! *lifts glasses*");
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
+
+ std::string trace(_sender.replies().back()->getTrace().toString());
+ ASSERT_THAT(trace, HasSubstr("it is me, Leclerc! *lifts glasses*"));
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 522561ee8a5..9f51d70ce60 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -42,6 +42,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_update_fast_path_restart_enabled(false),
_merge_operations_disabled(false),
_use_weak_internal_read_consistency_for_client_gets(false),
+ _enable_metadata_only_fetch_phase_for_inconsistent_updates(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -155,6 +156,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent;
_merge_operations_disabled = config.mergeOperationsDisabled;
_use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets;
+ _enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 333d7073715..b8e99165d69 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -235,6 +235,13 @@ public:
return _use_weak_internal_read_consistency_for_client_gets;
}
+ void set_enable_metadata_only_fetch_phase_for_inconsistent_updates(bool enable) noexcept {
+ _enable_metadata_only_fetch_phase_for_inconsistent_updates = enable;
+ }
+ bool enable_metadata_only_fetch_phase_for_inconsistent_updates() const noexcept {
+ return _enable_metadata_only_fetch_phase_for_inconsistent_updates;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -281,6 +288,7 @@ private:
bool _update_fast_path_restart_enabled;
bool _merge_operations_disabled;
bool _use_weak_internal_read_consistency_for_client_gets;
+ bool _enable_metadata_only_fetch_phase_for_inconsistent_updates;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index e4d3182ce7a..915b9b6b304 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -228,3 +228,12 @@ merge_operations_disabled bool default=false
## consistent view of fields across document versions.
## This is mostly useful in a system that is effectively read-only.
use_weak_internal_read_consistency_for_client_gets bool default=false
+
+## If true, adds an initial metadata-only fetch phase to updates that touch buckets
+## with inconsistent replicas. Metadata timestamps are compared and a single full Get
+## is sent _only_ to one node with the highest timestamp. Without a metadata phase,
+## full gets would be sent to _all_ nodes.
+## Setting this option to true always implicitly enables the fast update restart
+## feature, so it's not required to set that config to true, nor will setting it
+## to false actually disable the feature.
+enable_metadata_only_fetch_phase_for_inconsistent_updates bool default=false \ No newline at end of file
diff --git a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
index efef4e5e51d..998fc22b1d7 100644
--- a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
@@ -2,6 +2,7 @@
vespa_add_library(storage_distributoroperationexternal OBJECT
SOURCES
getoperation.cpp
+ newest_replica.cpp
putoperation.cpp
removelocationoperation.cpp
removeoperation.cpp
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 9e66c212ac6..f800166a647 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -56,11 +56,12 @@ GetOperation::GetOperation(DistributorComponent& manager,
_msg(std::move(msg)),
_returnCode(api::ReturnCode::OK),
_doc(),
- _lastModified(),
+ _newest_replica(),
_metric(metric),
_operationTimer(manager.getClock()),
_desired_read_consistency(desired_read_consistency),
- _has_replica_inconsistency(false)
+ _has_replica_inconsistency(false),
+ _any_replicas_failed(false)
{
assignTargetNodeGroups(*read_guard);
}
@@ -110,7 +111,9 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::
LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode());
- res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, res[best].copy.getNode(), command);
+ const auto target_node = res[best].copy.getNode();
+ res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, target_node, command);
+ res[best].to_node = target_node;
return true;
}
@@ -145,27 +148,31 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
bool allDone = true;
for (auto& response : _responses) {
for (uint32_t i = 0; i < response.second.size(); i++) {
- if (response.second[i].sent == getreply->getMsgId()) {
+ const auto& bucket_id = response.first.getBucketId();
+ auto& send_state = response.second[i];
+ if (send_state.sent == getreply->getMsgId()) {
LOG(debug, "Get on %s returned %s",
_msg->getDocumentId().toString().c_str(),
getreply->getResult().toString().c_str());
- response.second[i].received = true;
- response.second[i].returnCode = getreply->getResult();
+ send_state.received = true;
+ send_state.returnCode = getreply->getResult();
if (getreply->getResult().success()) {
- if (_lastModified.has_value() && (getreply->getLastModifiedTimestamp() != *_lastModified)) {
+ if (_newest_replica.has_value() && (getreply->getLastModifiedTimestamp() != _newest_replica->timestamp)) {
// At least two document versions returned had different timestamps.
_has_replica_inconsistency = true; // This is a one-way toggle.
}
- if (!_lastModified.has_value() || getreply->getLastModifiedTimestamp() > *_lastModified) {
+ if (!_newest_replica.has_value() || getreply->getLastModifiedTimestamp() > _newest_replica->timestamp) {
_returnCode = getreply->getResult();
- _lastModified = getreply->getLastModifiedTimestamp();
+ assert(response.second[i].to_node != UINT16_MAX);
+ _newest_replica = NewestReplica::of(getreply->getLastModifiedTimestamp(), bucket_id, send_state.to_node);
_doc = getreply->getDocument();
}
} else {
- if (!_lastModified.has_value()) {
- _returnCode = getreply->getResult();
+ _any_replicas_failed = true;
+ if (!_newest_replica.has_value()) {
+ _returnCode = getreply->getResult(); // Don't overwrite if we have a good response.
}
if (!all_bucket_metadata_initially_consistent()) {
// If we're sending to more than a single group of replicas it means our replica set is
@@ -175,7 +182,7 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
}
// Try to send to another node in this checksum group.
- bool sent = sendForChecksum(sender, response.first.getBucketId(), response.second);
+ bool sent = sendForChecksum(sender, bucket_id, response.second);
if (sent) {
allDone = false;
}
@@ -219,7 +226,8 @@ void
GetOperation::sendReply(DistributorMessageSender& sender)
{
if (_msg.get()) {
- auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified.value_or(0), !_has_replica_inconsistency);
+ const auto timestamp = _newest_replica.value_or(NewestReplica::make_empty()).timestamp;
+ auto repl = std::make_shared<api::GetReply>(*_msg, _doc, timestamp, !_has_replica_inconsistency);
repl->setResult(_returnCode);
update_internal_metrics();
sender.sendReply(repl);
@@ -250,9 +258,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
_replicas_in_db.emplace_back(e.getBucketId(), copy.getNode());
if (!copy.valid()) {
- _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy);
+ _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].emplace_back(copy);
} else if (!copy.empty()) {
- _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].push_back(copy);
+ _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].emplace_back(copy);
}
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 1106968bcf7..57e878d9e40 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "newest_replica.h"
#include <vespa/storageapi/defs.h>
#include <vespa/storage/distributor/operations/operation.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
@@ -38,6 +39,9 @@ public:
std::string getStatus() const override { return ""; }
bool all_bucket_metadata_initially_consistent() const;
+ bool any_replicas_failed() const noexcept {
+ return _any_replicas_failed;
+ }
// Exposed for unit testing. TODO feels a bit dirty :I
const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
@@ -50,6 +54,13 @@ public:
return _desired_read_consistency;
}
+ // Note: in the case the document could not be found on any replicas, but
+ // at least one node returned a non-error response, the returned value will
+ // have a timestamp of zero and the most recently asked node as its node.
+ const std::optional<NewestReplica>& newest_replica() const noexcept {
+ return _newest_replica;
+ }
+
private:
class GroupId {
public:
@@ -66,20 +77,19 @@ private:
int _node;
};
- class BucketChecksumGroup {
- public:
- BucketChecksumGroup(const BucketCopy& c) :
- copy(c),
- sent(0), received(false), returnCode(api::ReturnCode::OK)
+ struct BucketChecksumGroup {
+ explicit BucketChecksumGroup(const BucketCopy& c)
+ : copy(c), sent(0), returnCode(api::ReturnCode::OK), to_node(UINT16_MAX), received(false)
{}
BucketCopy copy;
api::StorageMessage::Id sent;
- bool received;
api::ReturnCode returnCode;
+ uint16_t to_node;
+ bool received;
};
- typedef std::vector<BucketChecksumGroup> GroupVector;
+ using GroupVector = std::vector<BucketChecksumGroup>;
// Organize the different copies by bucket/checksum pairs. We should
// try to request GETs from each bucket and each different checksum
@@ -94,13 +104,14 @@ private:
api::ReturnCode _returnCode;
std::shared_ptr<document::Document> _doc;
- std::optional<api::Timestamp> _lastModified;
+ std::optional<NewestReplica> _newest_replica;
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
api::InternalReadConsistency _desired_read_consistency;
bool _has_replica_inconsistency;
+ bool _any_replicas_failed;
void sendReply(DistributorMessageSender& sender);
bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res);
diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp
new file mode 100644
index 00000000000..d615095ff63
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp
@@ -0,0 +1,13 @@
+#include "newest_replica.h"
+#include <ostream>
+
+namespace storage::distributor {
+
+std::ostream& operator<<(std::ostream& os, const NewestReplica& nr) {
+ os << "NewestReplica(timestamp " << nr.timestamp
+ << ", bucket_id " << nr.bucket_id
+ << ", node " << nr.node << ')';
+ return os;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.h b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h
new file mode 100644
index 00000000000..86d5bee67d2
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/storageapi/defs.h>
+#include <iosfwd>
+#include <stddef.h>
+
+namespace storage::distributor {
+
+struct NewestReplica {
+ api::Timestamp timestamp {0};
+ document::BucketId bucket_id;
+ uint16_t node {UINT16_MAX};
+
+ static NewestReplica of(api::Timestamp timestamp,
+ const document::BucketId& bucket_id,
+ uint16_t node) noexcept {
+ return {timestamp, bucket_id, node};
+ }
+
+ static NewestReplica make_empty() {
+ return {api::Timestamp(0), document::BucketId(), 0};
+ }
+
+ bool operator==(const NewestReplica& rhs) const noexcept {
+ return ((timestamp == rhs.timestamp) &&
+ (bucket_id == rhs.bucket_id) &&
+ (node == rhs.node));
+ }
+ bool operator!=(const NewestReplica& rhs) const noexcept {
+ return !(*this == rhs);
+ }
+};
+
+
+std::ostream& operator<<(std::ostream&, const NewestReplica&);
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 47cacafee80..42ccc3d31f3 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -36,6 +36,8 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_sendState(SendState::NONE_SENT),
_mode(Mode::FAST_PATH),
_fast_path_repair_source_node(0xffff),
+ _use_initial_cheap_metadata_fetch_phase(
+ _manager.getDistributor().getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()),
_replySent(false)
{
document::BucketIdFactory idFactory;
@@ -92,10 +94,12 @@ const char*
TwoPhaseUpdateOperation::stateToString(SendState state)
{
switch (state) {
- case SendState::NONE_SENT: return "NONE_SENT";
- case SendState::UPDATES_SENT: return "UPDATES_SENT";
- case SendState::GETS_SENT: return "GETS_SENT";
- case SendState::PUTS_SENT: return "PUTS_SENT";
+ case SendState::NONE_SENT: return "NONE_SENT";
+ case SendState::UPDATES_SENT: return "UPDATES_SENT";
+ case SendState::METADATA_GETS_SENT: return "METADATA_GETS_SENT";
+ case SendState::SINGLE_GET_SENT: return "SINGLE_GET_SENT";
+ case SendState::FULL_GETS_SENT: return "FULL_GETS_SENT";
+ case SendState::PUTS_SENT: return "PUTS_SENT";
default:
assert(!"Unknown state");
return "";
@@ -159,7 +163,7 @@ void
TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
{
_mode = Mode::FAST_PATH;
- LOG(debug, "Update(%s) fast path: sending Update commands", _updateCmd->getDocumentId().toString().c_str());
+ LOG(debug, "Update(%s) fast path: sending Update commands", update_doc_id().c_str());
auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric);
UpdateOperation & op = *updateOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
@@ -174,26 +178,50 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
void
TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
{
- LOG(debug, "Update(%s) safe path: sending Get commands", _updateCmd->getDocumentId().toString().c_str());
-
_mode = Mode::SLOW_PATH;
- document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
- auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(),"[all]");
- copyMessageSettings(*_updateCmd, *get);
- auto getOperation = std::make_shared<GetOperation>(
- _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric);
- GetOperation & op = *getOperation;
- IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender);
+ auto get_operation = create_initial_safe_path_get_operation();
+ GetOperation& op = *get_operation;
+ IntermediateMessageSender intermediate(_sentMessageMap, std::move(get_operation), sender);
_replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time
op.start(intermediate, _manager.getClock().getTimeInMillis());
- transitionTo(SendState::GETS_SENT);
+
+ transitionTo(_use_initial_cheap_metadata_fetch_phase
+ ? SendState::METADATA_GETS_SENT
+ : SendState::FULL_GETS_SENT);
if (intermediate._reply.get()) {
assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
+ // We always trigger the safe path Get reply handling here regardless of whether
+ // metadata-only or full Gets were sent. This is because we might get an early
+ // reply due to there being no replicas in existence at all for the target bucket.
+ // In this case, we rely on the safe path fallback to implicitly create the bucket
+ // by performing the update locally and sending CreateBucket+Put to the ideal nodes.
handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply));
}
}
+std::shared_ptr<GetOperation>
+TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() {
+ document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
+ const char* field_set = _use_initial_cheap_metadata_fetch_phase ? "[none]" : "[all]";
+ auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), field_set);
+ copyMessageSettings(*_updateCmd, *get);
+ // Metadata-only Gets just look at the data in the meta-store, not any fields.
+ // The meta-store is always updated before any ACK is returned for a mutation,
+ // so all the information we need is guaranteed to be consistent even with a
+ // weak read. But since weak reads allow the Get operation to bypass commit
+ // queues, latency may be greatly reduced in contended situations.
+ auto read_consistency = (_use_initial_cheap_metadata_fetch_phase
+ ? api::InternalReadConsistency::Weak
+ : api::InternalReadConsistency::Strong);
+ LOG(debug, "Update(%s) safe path: sending Get commands with field set '%s' "
+ "and internal read consistency %s",
+ update_doc_id().c_str(), field_set, api::to_string(read_consistency));
+ return std::make_shared<GetOperation>(
+ _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(),
+ get, _getMetric, read_consistency);
+}
+
void
TwoPhaseUpdateOperation::onStart(DistributorMessageSender& sender) {
if (isFastPathPossible()) {
@@ -247,7 +275,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
transitionTo(SendState::PUTS_SENT);
LOG(debug, "Update(%s): sending Put commands with doc %s",
- _updateCmd->getDocumentId().toString().c_str(), doc->toString(true).c_str());
+ update_doc_id().c_str(), doc->toString(true).c_str());
if (intermediate._reply.get()) {
sendReplyWithResult(sender, intermediate._reply->getResult());
@@ -269,13 +297,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
const std::shared_ptr<api::StorageReply>& msg)
{
if (msg->getType() == api::MessageType::GET_REPLY) {
- assert(_sendState == SendState::GETS_SENT);
- api::GetReply& getReply = static_cast<api::GetReply&> (*msg);
+ assert(_sendState == SendState::FULL_GETS_SENT);
+ auto& getReply = static_cast<api::GetReply&>(*msg);
addTraceFromReply(getReply);
- LOG(debug, "Update(%s) Get reply had result: %s",
- _updateCmd->getDocumentId().toString().c_str(),
- getReply.getResult().toString().c_str());
+ LOG(debug, "Update(%s) fast path: Get reply had result %s",
+ update_doc_id().c_str(), getReply.getResult().toString().c_str());
if (!getReply.getResult().success()) {
sendReplyWithResult(sender, getReply.getResult());
@@ -310,7 +337,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
// Failed or was consistent
sendReply(sender, intermediate._reply);
} else {
- LOG(debug, "Update(%s) fast path: was inconsistent!", _updateCmd->getDocumentId().toString().c_str());
+ LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str());
_updateReply = intermediate._reply;
_fast_path_repair_source_node = bestNode.second;
@@ -319,7 +346,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
copyMessageSettings(*_updateCmd, *cmd);
sender.sendToNode(lib::NodeType::STORAGE, _fast_path_repair_source_node, cmd);
- transitionTo(SendState::GETS_SENT);
+ transitionTo(SendState::FULL_GETS_SENT);
}
}
} else {
@@ -328,7 +355,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
addTraceFromReply(*intermediate._reply);
sendReplyWithResult(sender, intermediate._reply->getResult());
LOG(warning, "Forced convergence of '%s' using document from node %u",
- _updateCmd->getDocumentId().toString().c_str(), _fast_path_repair_source_node);
+ update_doc_id().c_str(), _fast_path_repair_source_node);
}
}
}
@@ -337,6 +364,15 @@ void
TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
const std::shared_ptr<api::StorageReply>& msg)
{
+ // No explicit operation is associated with the direct replica Get operation,
+ // so we handle its reply separately.
+ if (_sendState == SendState::SINGLE_GET_SENT) {
+ assert(msg->getType() == api::MessageType::GET_REPLY);
+ LOG(spam, "Received single full Get reply for '%s'", update_doc_id().c_str());
+ addTraceFromReply(*msg);
+ handleSafePathReceivedGet(sender, dynamic_cast<api::GetReply&>(*msg));
+ return;
+ }
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
assert(callback.get());
Operation & callbackOp = *callback;
@@ -348,7 +384,12 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
return; // Not enough replies received yet or we're draining callbacks.
}
addTraceFromReply(*intermediate._reply);
- if (_sendState == SendState::GETS_SENT) {
+ if (_sendState == SendState::METADATA_GETS_SENT) {
+ assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
+ const auto& get_op = dynamic_cast<const GetOperation&>(*intermediate.callback);
+ handle_safe_path_received_metadata_get(sender, static_cast<api::GetReply&>(*intermediate._reply),
+ get_op.newest_replica(), get_op.any_replicas_failed());
+ } else if (_sendState == SendState::FULL_GETS_SENT) {
assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply));
} else if (_sendState == SendState::PUTS_SENT) {
@@ -359,6 +400,60 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
}
}
+void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
+ DistributorMessageSender& sender, api::GetReply& reply,
+ const std::optional<NewestReplica>& newest_replica,
+ bool any_replicas_failed)
+{
+ LOG(debug, "Update(%s): got (metadata only) Get reply with result %s",
+ update_doc_id().c_str(), reply.getResult().toString().c_str());
+
+ if (!reply.getResult().success()) {
+ sendReplyWithResult(sender, reply.getResult());
+ return;
+ }
+ // It's possible for a single replica to fail during processing without the entire
+ // Get operation failing. Although we know a priori if replicas are out of sync,
+ // we don't know which one has the highest timestamp (it might have been the one
+ // on the node that the metadata Get just failed towards). To err on the side of
+ // caution we abort the update if this happens. If a simple metadata Get fails, it
+ // is highly likely that a full partial update or put operation would fail as well.
+ if (any_replicas_failed) {
+ LOG(debug, "Update(%s): had failed replicas, aborting update", update_doc_id().c_str());
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::Result::ABORTED,
+ "One or more metadata Get operations failed; aborting Update"));
+ return;
+ }
+ if (!replica_set_unchanged_after_get_operation()) {
+ // Use BUCKET_NOT_FOUND to trigger a silent retry.
+ LOG(debug, "Update(%s): replica set has changed after metadata get phase", update_doc_id().c_str());
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::Result::BUCKET_NOT_FOUND,
+ "Replica sets changed between update phases, client must retry"));
+ return;
+ }
+ if (reply.had_consistent_replicas()) {
+ LOG(debug, "Update(%s): metadata Gets consistent; restarting in fast path", update_doc_id().c_str());
+ restart_with_fast_path_due_to_consistent_get_timestamps(sender);
+ return;
+ }
+ // If we've gotten here, we must have had no Get failures and replicas must
+ // be somehow inconsistent. Replicas can only be inconsistent if their timestamps
+ // mismatch, so we must have observed at least one non-zero timestamp.
+ assert(newest_replica.has_value() && (newest_replica->timestamp != api::Timestamp(0)));
+ // Timestamps were not in sync, so we have to fetch the document from the highest
+ // timestamped replica, apply the update to it and then explicitly Put the result
+ // to all replicas.
+ document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_replica->bucket_id);
+ LOG(debug, "Update(%s): sending single payload Get to %s on node %u (had timestamp %" PRIu64 ")",
+ update_doc_id().c_str(), bucket.toString().c_str(),
+ newest_replica->node, newest_replica->timestamp);
+ auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), "[all]");
+ copyMessageSettings(*_updateCmd, *cmd);
+ sender.sendToNode(lib::NodeType::STORAGE, newest_replica->node, cmd);
+
+ transitionTo(SendState::SINGLE_GET_SENT);
+}
+
void
TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sender, api::GetReply& reply)
{
@@ -395,7 +490,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
return;
} else if (shouldCreateIfNonExistent()) {
LOG(debug, "No existing documents found for %s, creating blank document to update",
- _updateCmd->getUpdate()->getId().toString().c_str());
+ update_doc_id().c_str());
docToUpdate = createBlankDocument();
setUpdatedForTimestamp(putTimestamp);
} else {
@@ -412,7 +507,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) {
return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() &&
- reply.wasFound() &&
+ !_replicas_at_get_send_time.empty() && // To ensure we send CreateBucket+Put if no replicas exist.
reply.had_consistent_replicas() &&
replica_set_unchanged_after_get_operation());
}
@@ -434,12 +529,15 @@ bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const
void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) {
LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
- _updateCmd->getDocumentId().toString().c_str());
+ update_doc_id().c_str());
if (lostBucketOwnershipBetweenPhases()) {
sendLostOwnershipTransientErrorReply(sender);
return;
}
_updateMetric.fast_path_restarts.inc();
+ // Must not be any other messages in flight, or we might mis-interpret them when we
+ // have switched back to fast-path mode.
+ assert(_sentMessageMap.empty());
startFastPathUpdate(sender);
}
@@ -553,4 +651,9 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) {
}
}
+vespalib::string TwoPhaseUpdateOperation::update_doc_id() const {
+ assert(_updateCmd.get() != nullptr);
+ return _updateCmd->getDocumentId().toString();
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index f9e32c31a9c..f714232b427 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -1,11 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <set>
+#include "newest_replica.h"
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/storage/distributor/persistencemessagetracker.h>
#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/document/update/documentupdate.h>
+#include <set>
namespace document {
class Document;
@@ -23,6 +24,7 @@ class UpdateMetricSet;
namespace distributor {
class DistributorBucketSpace;
+class GetOperation;
/*
* General functional outline:
@@ -45,6 +47,7 @@ class DistributorBucketSpace;
*
* Note that the above case also implicitly handles the case in which a
* bucket does not exist.
+ *
*/
@@ -56,7 +59,7 @@ public:
const std::shared_ptr<api::UpdateCommand> & msg,
DistributorMetricSet& metrics,
SequencingHandle sequencingHandle = SequencingHandle());
- ~TwoPhaseUpdateOperation();
+ ~TwoPhaseUpdateOperation() override;
void onStart(DistributorMessageSender& sender) override;
@@ -69,13 +72,13 @@ public:
void onClose(DistributorMessageSender& sender) override;
- bool canSendHeaderOnly() const;
-
private:
enum class SendState {
NONE_SENT,
UPDATES_SENT,
- GETS_SENT,
+ METADATA_GETS_SENT,
+ SINGLE_GET_SENT,
+ FULL_GETS_SENT,
PUTS_SENT,
};
@@ -85,7 +88,7 @@ private:
};
void transitionTo(SendState newState);
- const char* stateToString(SendState);
+ static const char* stateToString(SendState);
void sendReply(DistributorMessageSender&,
std::shared_ptr<api::StorageReply>&);
@@ -108,10 +111,13 @@ private:
const std::shared_ptr<api::StorageReply>&);
void handleSafePathReceive(DistributorMessageSender&,
const std::shared_ptr<api::StorageReply>&);
- void handleSafePathReceivedGet(DistributorMessageSender&,
- api::GetReply&);
- void handleSafePathReceivedPut(DistributorMessageSender&,
- const api::PutReply&);
+ std::shared_ptr<GetOperation> create_initial_safe_path_get_operation();
+ void handle_safe_path_received_metadata_get(DistributorMessageSender&,
+ api::GetReply&,
+ const std::optional<NewestReplica>&,
+ bool any_replicas_failed);
+ void handleSafePathReceivedGet(DistributorMessageSender&, api::GetReply&);
+ void handleSafePathReceivedPut(DistributorMessageSender&, const api::PutReply&);
bool shouldCreateIfNonExistent() const;
bool processAndMatchTasCondition(
DistributorMessageSender& sender,
@@ -124,6 +130,8 @@ private:
bool may_restart_with_fast_path(const api::GetReply& reply);
bool replica_set_unchanged_after_get_operation() const;
void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender);
+ // Precondition: reply has not yet been sent.
+ vespalib::string update_doc_id() const;
UpdateMetricSet& _updateMetric;
PersistenceOperationMetricSet& _putMetric;
@@ -139,6 +147,7 @@ private:
document::BucketId _updateDocBucketId;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time;
uint16_t _fast_path_repair_source_node;
+ bool _use_initial_cheap_metadata_fetch_phase;
bool _replySent;
};
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 3364c6606eb..405e59f7b31 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -16,26 +16,18 @@ class SentMessageMap
{
public:
SentMessageMap();
-
~SentMessageMap();
std::shared_ptr<Operation> pop(api::StorageMessage::Id id);
-
std::shared_ptr<Operation> pop();
void insert(api::StorageMessage::Id id, const std::shared_ptr<Operation> & msg);
-
void clear();
-
uint32_t size() const { return _map.size(); }
-
- uint32_t empty() const { return _map.empty(); }
-
+ [[nodiscard]] bool empty() const noexcept { return _map.empty(); }
std::string toString() const;
-
private:
- typedef std::map<api::StorageMessage::Id, std::shared_ptr<Operation> > Map;
-
+ using Map = std::map<api::StorageMessage::Id, std::shared_ptr<Operation>>;
Map _map;
};