aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-15 12:31:55 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-15 12:31:55 +0000
commit2af589636d76af4ad3d26eaf199ddc4ce8cf011e (patch)
treeed3b59977eb7181bd756a42c909e32d7fe207fca
parentbf057fb22f9c917d616031a0cd32597b315bb803 (diff)
Use fast updates when replica metadata is out of sync but document itself is in sync
When a bucket has replicas with mismatching metadata (i.e. they are out of sync), the distributor will initiate a write-repair for updates to avoid divergence of replica content. This is done by first sending a Get to all diverging replica sets, picking the highest timestamp and applying the update locally. The updated document is then sent out as a Put. This can be very expensive if document Put operations are disproportionally more expensive than partial updates, and also makes the distributor thread part of a contended critical path. This commit lets `TwoPhaseUpdateOperation` restart an update as a "fast path" update (partial updates sent directly to the nodes) if the initial read phase returns the same timestamp for the document across all replicas. It also removes an old (but now presumed unsafe) optimization where Get operations are only sent to replicas marked "trusted" even if others are out of sync with it. Since trustedness is a transient state that does not persist across restarts or bucket handoffs, it's not robust enough to be used for such purposes. Gets will now be sent to all out of sync replica groups regardless of trusted status.
-rw-r--r--storage/src/tests/distributor/distributortest.cpp17
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp47
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp85
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp25
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/update_metric_set.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/update_metric_set.h1
-rw-r--r--storageapi/src/vespa/storageapi/message/persistence.cpp8
-rw-r--r--storageapi/src/vespa/storageapi/message/persistence.h6
14 files changed, 213 insertions, 40 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index e6c617a32e4..94d33e50047 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -174,6 +174,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_update_fast_path_restart_enabled(bool enabled) {
+ ConfigBuilder builder;
+ builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -1001,6 +1007,17 @@ TEST_F(DistributorTest, stale_reads_config_is_propagated_to_external_operation_h
EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled());
}
+TEST_F(DistributorTest, fast_path_on_consistent_gets_config_is_propagated_to_internal_config) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_update_fast_path_restart_enabled(true);
+ EXPECT_TRUE(getConfig().update_fast_path_restart_enabled());
+
+ configure_update_fast_path_restart_enabled(false);
+ EXPECT_FALSE(getConfig().update_fast_path_restart_enabled());
+}
+
TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) {
createLinks(false);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 99d7c12551d..72a124d45b4 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -117,6 +117,13 @@ struct GetOperationTest : Test, DistributorTestUtil {
}
}
+ bool last_reply_had_consistent_replicas() {
+ assert(!_sender.replies().empty());
+ auto& msg = *_sender.replies().back();
+ assert(msg.getType() == api::MessageType::GET_REPLY);
+ return dynamic_cast<api::GetReply&>(msg).had_consistent_replicas();
+ }
+
void setClusterState(const std::string& clusterState) {
enableDistributorClusterState(clusterState);
}
@@ -139,22 +146,25 @@ TEST_F(GetOperationTest, simple) {
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
-TEST_F(GetOperationTest, ask_trusted_node_if_bucket_is_inconsistent) {
+TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) {
setClusterState("distributor:1 storage:4");
addNodesToBucketDB(bucketId, "0=100/3/10,1=200/4/12/t");
sendGet();
- ASSERT_EQ("Get => 1", _sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
- ASSERT_NO_FATAL_FAILURE(replyWithDocument());
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "newauthor", 2));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "oldauthor", 1));
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
- "timestamp 100) ReturnCode(NONE)",
+ "timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
@@ -174,6 +184,7 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, send_to_all_invalid_copies) {
@@ -193,6 +204,7 @@ TEST_F(GetOperationTest, send_to_all_invalid_copies) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, send_to_all_invalid_nodes_when_inconsistent) {
@@ -235,6 +247,7 @@ TEST_F(GetOperationTest, inconsistent_split) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
@@ -252,6 +265,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
@@ -271,6 +285,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket) {
@@ -290,6 +305,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
@@ -312,6 +328,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
@@ -322,6 +339,7 @@ TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas()); // Nothing in the bucket, so nothing to be inconsistent with.
}
TEST_F(GetOperationTest, not_found) {
@@ -340,8 +358,9 @@ TEST_F(GetOperationTest, not_found) {
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
- EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
+ EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
failures.notfound.getValue());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, resend_on_storage_failure) {
@@ -366,6 +385,21 @@ TEST_F(GetOperationTest, resend_on_storage_failure) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ // Replica had read failure, but they're still in sync. An immutable Get won't change that fact.
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
+}
+
+TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) {
+ setClusterState("distributor:1 storage:3");
+ addNodesToBucketDB(bucketId, "1=100,2=200");
+ sendGet();
+ ASSERT_EQ("Get => 1,Get => 2", _sender.getCommands(true));
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::TIMEOUT, "", 0));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "newestauthor", 3));
+ ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
+ "timestamp 3) ReturnCode(NONE)",
+ _sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
@@ -390,6 +424,7 @@ TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(IO_FAILURE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas()); // Doesn't really matter since operation itself failed
}
TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
@@ -408,6 +443,7 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
@@ -435,6 +471,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
_sender.getLastReply());
EXPECT_EQ("newestauthor", getLastReplyAuthor());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index df9bf683326..67ef3374633 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -126,6 +126,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
void assertAbortedUpdateReplyWithContextPresent(
const DistributorMessageSenderStub& closeSender) const;
+ void do_test_ownership_changed_between_gets_and_second_phase(Timestamp lowest_get_timestamp,
+ Timestamp highest_get_timestamp,
+ Timestamp expected_response_timestamp);
+
};
TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default;
@@ -142,7 +146,9 @@ TwoPhaseUpdateOperationTest::replyToMessage(
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& updatec = dynamic_cast<UpdateCommand&>(*msg2);
std::unique_ptr<api::StorageReply> reply(updatec.makeReply());
- static_cast<api::UpdateReply*>(reply.get())->setOldTimestamp(oldTimestamp);
+ auto& update_reply = dynamic_cast<api::UpdateReply&>(*reply);
+ update_reply.setOldTimestamp(oldTimestamp);
+ update_reply.setBucketInfo(api::BucketInfo(0x123, 1, 100)); // Dummy info to avoid invalid info being returned
reply->setResult(api::ReturnCode(result, ""));
callback.receive(sender,
@@ -193,7 +199,7 @@ TwoPhaseUpdateOperationTest::replyToGet(
api::ReturnCode::Result result,
const std::string& traceMsg)
{
- auto& get = static_cast<const api::GetCommand&>(*sender.command(index));
+ auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index));
std::shared_ptr<api::StorageReply> reply;
if (haveDocument) {
@@ -786,7 +792,11 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replie
ASSERT_THAT(trace, HasSubstr("baaa"));
}
-TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
+void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_second_phase(
+ Timestamp lowest_get_timestamp,
+ Timestamp highest_get_timestamp,
+ Timestamp expected_response_timestamp)
+{
setupDistributor(2, 2, "storage:2 distributor:1");
// Update towards inconsistent bucket invokes safe path.
@@ -803,21 +813,31 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
// to a bucket we no longer own.
enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
getBucketDatabase().clear();
- replyToGet(*cb, sender, 0, 70);
- replyToGet(*cb, sender, 1, 70);
-
+ replyToGet(*cb, sender, 0, lowest_get_timestamp);
+ replyToGet(*cb, sender, 1, highest_get_timestamp);
+
// BUCKET_NOT_FOUND is a transient error code which should cause the client
// to re-send the operation, presumably to the correct distributor the next
// time.
+ // Timestamp of updated doc varies depending on whether fast or safe path
+ // was triggered, as the reply is created via different paths.
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
- "timestamp 0, timestamp of updated doc: 70) "
+ "timestamp 0, timestamp of updated doc: " + std::to_string(expected_response_timestamp) + ") "
"ReturnCode(BUCKET_NOT_FOUND, Distributor lost "
- "ownership of bucket between executing the read "
- "and write phases of a two-phase update operation)",
+ "ownership of bucket between executing the read "
+ "and write phases of a two-phase update operation)",
sender.getLastReply(true));
}
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
+ do_test_ownership_changed_between_gets_and_second_phase(70, 71, 71);
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) {
+ do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart
+}
+
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
std::shared_ptr<TwoPhaseUpdateOperation> cb(
@@ -977,6 +997,53 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
assertAbortedUpdateReplyWithContextPresent(closeSender);
}
+TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_restarts_with_fast_path_if_enabled) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(true);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ replyToGet(*cb, sender, 0, old_timestamp);
+ replyToGet(*cb, sender, 1, old_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2));
+ replyToMessage(*cb, sender, 2, old_timestamp);
+ replyToMessage(*cb, sender, 3, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 500) "
+ "ReturnCode(NONE)",
+ sender.getLastReply(true));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_does_not_restart_with_fast_path_if_disabled) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(false);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ replyToGet(*cb, sender, 0, old_timestamp);
+ replyToGet(*cb, sender, 1, old_timestamp);
+
+ // Should _not_ be restarted with fast path, as it has been config disabled
+ ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 341f0f39d8f..e89b9e6b1aa 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -40,6 +40,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_disableBucketActivation(false),
_sequenceMutatingOperations(true),
_allowStaleReadsDuringClusterStateTransitions(false),
+ _update_fast_path_restart_enabled(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -150,6 +151,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_disableBucketActivation = config.disableBucketActivation;
_sequenceMutatingOperations = config.sequenceMutatingOperations;
_allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions;
+ _update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 28a219dc3f6..3cb84943508 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -250,6 +250,13 @@ public:
_allowStaleReadsDuringClusterStateTransitions = allow;
}
+ bool update_fast_path_restart_enabled() const noexcept {
+ return _update_fast_path_restart_enabled;
+ }
+ void set_update_fast_path_restart_enabled(bool enabled) noexcept {
+ _update_fast_path_restart_enabled = enabled;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -293,6 +300,7 @@ private:
bool _disableBucketActivation;
bool _sequenceMutatingOperations;
bool _allowStaleReadsDuringClusterStateTransitions;
+ bool _update_fast_path_restart_enabled;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index e789f03bb14..f292b60a70e 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -205,3 +205,12 @@ simulated_db_merging_latency_msec int default=0
## read only operations, as the B-tree database is thread safe for concurrent reads.
use_btree_database bool default=false restart
+## If a bucket is inconsistent and an Update operation is received, a two-phase
+## write-repair path is triggered in which a Get is sent to all diverging replicas.
+## Once received, the update is applied on the distributor and pushed out to the
+## content nodes as Puts.
+## Iff this config is set to true AND all Gets return the same timestamp from all
+## content nodes, the two-phase update path reverts back to the regular fast path.
+## Since all replicas of the document were in sync, applying the update in-place
+## shall be considered safe.
+restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=true
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index ad2e5cf6478..7d7cd37e848 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -57,7 +57,8 @@ GetOperation::GetOperation(DistributorComponent& manager,
_doc(),
_lastModified(0),
_metric(metric),
- _operationTimer(manager.getClock())
+ _operationTimer(manager.getClock()),
+ _has_replica_inconsistency(false)
{
assignTargetNodeGroups(*read_guard);
}
@@ -150,6 +151,10 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
response.second[i].returnCode = getreply->getResult();
if (getreply->getResult().success()) {
+ if ((_lastModified != 0) && (getreply->getLastModifiedTimestamp() != _lastModified)) {
+ // At least two document versions returned had different timestamps.
+ _has_replica_inconsistency = true; // This is a one-way toggle.
+ }
if (getreply->getLastModifiedTimestamp() > _lastModified) {
_returnCode = getreply->getResult();
_lastModified = getreply->getLastModifiedTimestamp();
@@ -159,6 +164,12 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
if (_lastModified == 0) {
_returnCode = getreply->getResult();
}
+ if (!all_bucket_metadata_initially_consistent()) {
+ // If we're sending to more than a single group of replicas it means our replica set is
+ // out of sync. Since we are unable to verify the timestamp of at least one replicated
+ // document, we fail safe by marking the entire operation as inconsistent.
+ _has_replica_inconsistency = true;
+ }
// Try to send to another node in this checksum group.
bool sent = sendForChecksum(sender, response.first.getBucketId(), response.second);
@@ -205,7 +216,7 @@ void
GetOperation::sendReply(DistributorMessageSender& sender)
{
if (_msg.get()) {
- auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified);
+ auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified, !_has_replica_inconsistency);
repl->setResult(_returnCode);
update_internal_metrics();
sender.sendReply(repl);
@@ -229,23 +240,6 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
LOG(spam, "Entry for %s: %s", e.getBucketId().toString().c_str(),
e->toString().c_str());
- bool haveTrusted = false;
- for (uint32_t i = 0; i < e->getNodeCount(); i++) {
- const BucketCopy& c = e->getNodeRef(i);
-
- if (!c.trusted()) {
- continue;
- }
-
- _responses[GroupId(e.getBucketId(), c.getChecksum(), -1)].push_back(c);
- haveTrusted = true;
- break;
- }
-
- if (haveTrusted) {
- continue;
- }
-
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
const BucketCopy& copy = e->getNodeRef(i);
@@ -259,8 +253,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
}
bool
-GetOperation::hasConsistentCopies() const
+GetOperation::all_bucket_metadata_initially_consistent() const
{
+ // TODO rename, calling this "responses" is confusing as it's populated before sending anything.
return _responses.size() == 1;
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 4f2d7c3d963..a30a9a721bd 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -35,7 +35,7 @@ public:
const char* getName() const override { return "get"; }
std::string getStatus() const override { return ""; }
- bool hasConsistentCopies() const;
+ bool all_bucket_metadata_initially_consistent() const;
// Exposed for unit testing. TODO feels a bit dirty :I
const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
@@ -88,6 +88,7 @@ private:
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
+ bool _has_replica_inconsistency;
void sendReply(DistributorMessageSender& sender);
bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res);
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index b3326a43be2..4283386a1bc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -158,6 +158,7 @@ void
TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
{
_mode = Mode::FAST_PATH;
+ LOG(debug, "Update(%s) fast path: sending Update commands", _updateCmd->getDocumentId().toString().c_str());
auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric);
UpdateOperation & op = *updateOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
@@ -364,6 +365,11 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
sendReplyWithResult(sender, reply.getResult());
return;
}
+ if (may_restart_with_fast_path(reply)) {
+ restart_with_fast_path_due_to_consistent_get_timestamps(sender);
+ return;
+ }
+
document::Document::SP docToUpdate;
api::Timestamp putTimestamp = _manager.getUniqueTimestamp();
@@ -399,6 +405,23 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
}
}
+bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) {
+ return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() &&
+ reply.wasFound() &&
+ reply.had_consistent_replicas());
+}
+
+void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) {
+ LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
+ _updateCmd->getDocumentId().toString().c_str());
+ if (lostBucketOwnershipBetweenPhases()) {
+ sendLostOwnershipTransientErrorReply(sender);
+ return;
+ }
+ _updateMetric.fast_path_restarts.inc();
+ startFastPathUpdate(sender);
+}
+
bool
TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorMessageSender& sender,
const document::Document& candidateDoc)
@@ -487,7 +510,7 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) {
std::shared_ptr<Operation> cb = _sentMessageMap.pop();
if (cb) {
- IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation > (), sender);
+ IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation>(), sender);
cb->onClose(intermediate);
// We will _only_ forward UpdateReply instances up, since those
// are created by UpdateOperation and are bound to the original
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index f9787141a19..84bcf2beff8 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -121,6 +121,8 @@ private:
bool hasTasCondition() const noexcept;
void replyWithTasFailure(DistributorMessageSender& sender,
vespalib::stringref message);
+ bool may_restart_with_fast_path(const api::GetReply& reply);
+ void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender);
UpdateMetricSet& _updateMetric;
PersistenceOperationMetricSet& _putMetric;
diff --git a/storage/src/vespa/storage/distributor/update_metric_set.cpp b/storage/src/vespa/storage/distributor/update_metric_set.cpp
index 2505003d2ba..1edde270517 100644
--- a/storage/src/vespa/storage/distributor/update_metric_set.cpp
+++ b/storage/src/vespa/storage/distributor/update_metric_set.cpp
@@ -12,7 +12,10 @@ UpdateMetricSet::UpdateMetricSet(MetricSet* owner)
: PersistenceOperationMetricSet("updates", owner),
diverging_timestamp_updates("diverging_timestamp_updates", {},
"Number of updates that report they were performed against "
- "divergent version timestamps on different replicas", this)
+ "divergent version timestamps on different replicas", this),
+ fast_path_restarts("fast_path_restarts", {}, "Number of safe path (write repair) updates "
+ "that were restarted as fast path updates because all replicas returned "
+ "documents with the same timestamp in the initial read phase", this)
{
}
diff --git a/storage/src/vespa/storage/distributor/update_metric_set.h b/storage/src/vespa/storage/distributor/update_metric_set.h
index de8474c949c..8a88aa6b9a9 100644
--- a/storage/src/vespa/storage/distributor/update_metric_set.h
+++ b/storage/src/vespa/storage/distributor/update_metric_set.h
@@ -10,6 +10,7 @@ namespace storage {
class UpdateMetricSet : public PersistenceOperationMetricSet {
public:
metrics::LongCountMetric diverging_timestamp_updates;
+ metrics::LongCountMetric fast_path_restarts;
explicit UpdateMetricSet(MetricSet* owner = nullptr);
~UpdateMetricSet() override;
diff --git a/storageapi/src/vespa/storageapi/message/persistence.cpp b/storageapi/src/vespa/storageapi/message/persistence.cpp
index 0b641eea253..8bd4f648f30 100644
--- a/storageapi/src/vespa/storageapi/message/persistence.cpp
+++ b/storageapi/src/vespa/storageapi/message/persistence.cpp
@@ -207,13 +207,17 @@ GetCommand::print(std::ostream& out, bool verbose, const std::string& indent) co
}
}
-GetReply::GetReply(const GetCommand& cmd, const DocumentSP& doc, Timestamp lastModified)
+GetReply::GetReply(const GetCommand& cmd,
+ const DocumentSP& doc,
+ Timestamp lastModified,
+ bool had_consistent_replicas)
: BucketInfoReply(cmd),
_docId(cmd.getDocumentId()),
_fieldSet(cmd.getFieldSet()),
_doc(doc),
_beforeTimestamp(cmd.getBeforeTimestamp()),
- _lastModifiedTime(lastModified)
+ _lastModifiedTime(lastModified),
+ _had_consistent_replicas(had_consistent_replicas)
{
}
diff --git a/storageapi/src/vespa/storageapi/message/persistence.h b/storageapi/src/vespa/storageapi/message/persistence.h
index 1fdd5c369bf..f2161421feb 100644
--- a/storageapi/src/vespa/storageapi/message/persistence.h
+++ b/storageapi/src/vespa/storageapi/message/persistence.h
@@ -217,11 +217,13 @@ class GetReply : public BucketInfoReply {
DocumentSP _doc; // Null pointer if not found
Timestamp _beforeTimestamp;
Timestamp _lastModifiedTime;
+ bool _had_consistent_replicas;
public:
GetReply(const GetCommand& cmd,
const DocumentSP& doc = DocumentSP(),
- Timestamp lastModified = 0);
+ Timestamp lastModified = 0,
+ bool had_consistent_replicas = false);
~GetReply() override;
const DocumentSP& getDocument() const { return _doc; }
@@ -231,6 +233,8 @@ public:
Timestamp getLastModifiedTimestamp() const { return _lastModifiedTime; }
Timestamp getBeforeTimestamp() const { return _beforeTimestamp; }
+ bool had_consistent_replicas() const noexcept { return _had_consistent_replicas; }
+
bool wasFound() const { return (_doc.get() != 0); }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGEREPLY(GetReply, onGetReply)