aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-15 14:20:49 +0100
committerGitHub <noreply@github.com>2019-11-15 14:20:49 +0100
commit4059e8d8e9ba7825d398a9397f2cd0dbfdb68278 (patch)
treeb1802084a5944b4e2b9424298996e00e44053bd3
parente390ccd415cc6ecb3187f559eb95abf0738d8745 (diff)
parent2af589636d76af4ad3d26eaf199ddc4ce8cf011e (diff)
Merge pull request #11319 from vespa-engine/vekterli/restart-two-phase-updates-in-fast-path-if-docs-in-sync
Use fast updates when replica metadata is out of sync but document itself is in sync
-rw-r--r--storage/src/tests/distributor/distributortest.cpp17
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp47
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp85
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h8
-rw-r--r--storage/src/vespa/storage/config/stor-distributormanager.def9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp25
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/update_metric_set.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/update_metric_set.h1
-rw-r--r--storageapi/src/vespa/storageapi/message/persistence.cpp8
-rw-r--r--storageapi/src/vespa/storageapi/message/persistence.h6
14 files changed, 213 insertions, 40 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index e6c617a32e4..94d33e50047 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -174,6 +174,12 @@ struct DistributorTest : Test, DistributorTestUtil {
configureDistributor(builder);
}
+ void configure_update_fast_path_restart_enabled(bool enabled) {
+ ConfigBuilder builder;
+ builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled;
+ configureDistributor(builder);
+ }
+
void configureMaxClusterClockSkew(int seconds);
void sendDownClusterStateCommand();
void replyToSingleRequestBucketInfoCommandWith1Bucket();
@@ -1001,6 +1007,17 @@ TEST_F(DistributorTest, stale_reads_config_is_propagated_to_external_operation_h
EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled());
}
+TEST_F(DistributorTest, fast_path_on_consistent_gets_config_is_propagated_to_internal_config) {
+ createLinks(true);
+ setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
+
+ configure_update_fast_path_restart_enabled(true);
+ EXPECT_TRUE(getConfig().update_fast_path_restart_enabled());
+
+ configure_update_fast_path_restart_enabled(false);
+ EXPECT_FALSE(getConfig().update_fast_path_restart_enabled());
+}
+
TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) {
createLinks(false);
setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1");
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 99d7c12551d..72a124d45b4 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -117,6 +117,13 @@ struct GetOperationTest : Test, DistributorTestUtil {
}
}
+ bool last_reply_had_consistent_replicas() {
+ assert(!_sender.replies().empty());
+ auto& msg = *_sender.replies().back();
+ assert(msg.getType() == api::MessageType::GET_REPLY);
+ return dynamic_cast<api::GetReply&>(msg).had_consistent_replicas();
+ }
+
void setClusterState(const std::string& clusterState) {
enableDistributorClusterState(clusterState);
}
@@ -139,22 +146,25 @@ TEST_F(GetOperationTest, simple) {
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
-TEST_F(GetOperationTest, ask_trusted_node_if_bucket_is_inconsistent) {
+TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) {
setClusterState("distributor:1 storage:4");
addNodesToBucketDB(bucketId, "0=100/3/10,1=200/4/12/t");
sendGet();
- ASSERT_EQ("Get => 1", _sender.getCommands(true));
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
- ASSERT_NO_FATAL_FAILURE(replyWithDocument());
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "newauthor", 2));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "oldauthor", 1));
EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
- "timestamp 100) ReturnCode(NONE)",
+ "timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
@@ -174,6 +184,7 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, send_to_all_invalid_copies) {
@@ -193,6 +204,7 @@ TEST_F(GetOperationTest, send_to_all_invalid_copies) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, send_to_all_invalid_nodes_when_inconsistent) {
@@ -235,6 +247,7 @@ TEST_F(GetOperationTest, inconsistent_split) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
@@ -252,6 +265,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 2) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
@@ -271,6 +285,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 3) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket) {
@@ -290,6 +305,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket) {
_sender.getLastReply());
EXPECT_EQ("newauthor", getLastReplyAuthor());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
@@ -312,6 +328,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
@@ -322,6 +339,7 @@ TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas()); // Nothing in the bucket, so nothing to be inconsistent with.
}
TEST_F(GetOperationTest, not_found) {
@@ -340,8 +358,9 @@ TEST_F(GetOperationTest, not_found) {
"timestamp 0) ReturnCode(NONE)",
_sender.getLastReply());
- EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
+ EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT].
failures.notfound.getValue());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, resend_on_storage_failure) {
@@ -366,6 +385,21 @@ TEST_F(GetOperationTest, resend_on_storage_failure) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ // Replica had read failure, but they're still in sync. An immutable Get won't change that fact.
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
+}
+
+TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) {
+ setClusterState("distributor:1 storage:3");
+ addNodesToBucketDB(bucketId, "1=100,2=200");
+ sendGet();
+ ASSERT_EQ("Get => 1,Get => 2", _sender.getCommands(true));
+ ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::TIMEOUT, "", 0));
+ ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "newestauthor", 3));
+ ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
+ "timestamp 3) ReturnCode(NONE)",
+ _sender.getLastReply());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
@@ -390,6 +424,7 @@ TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 0) ReturnCode(IO_FAILURE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas()); // Doesn't really matter since operation itself failed
}
TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
@@ -408,6 +443,7 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, "
"timestamp 100) ReturnCode(NONE)",
_sender.getLastReply());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
@@ -435,6 +471,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
_sender.getLastReply());
EXPECT_EQ("newestauthor", getLastReplyAuthor());
+ EXPECT_TRUE(last_reply_had_consistent_replicas());
}
TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index df9bf683326..67ef3374633 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -126,6 +126,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil {
void assertAbortedUpdateReplyWithContextPresent(
const DistributorMessageSenderStub& closeSender) const;
+ void do_test_ownership_changed_between_gets_and_second_phase(Timestamp lowest_get_timestamp,
+ Timestamp highest_get_timestamp,
+ Timestamp expected_response_timestamp);
+
};
TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default;
@@ -142,7 +146,9 @@ TwoPhaseUpdateOperationTest::replyToMessage(
std::shared_ptr<api::StorageMessage> msg2 = sender.command(index);
auto& updatec = dynamic_cast<UpdateCommand&>(*msg2);
std::unique_ptr<api::StorageReply> reply(updatec.makeReply());
- static_cast<api::UpdateReply*>(reply.get())->setOldTimestamp(oldTimestamp);
+ auto& update_reply = dynamic_cast<api::UpdateReply&>(*reply);
+ update_reply.setOldTimestamp(oldTimestamp);
+ update_reply.setBucketInfo(api::BucketInfo(0x123, 1, 100)); // Dummy info to avoid invalid info being returned
reply->setResult(api::ReturnCode(result, ""));
callback.receive(sender,
@@ -193,7 +199,7 @@ TwoPhaseUpdateOperationTest::replyToGet(
api::ReturnCode::Result result,
const std::string& traceMsg)
{
- auto& get = static_cast<const api::GetCommand&>(*sender.command(index));
+ auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index));
std::shared_ptr<api::StorageReply> reply;
if (haveDocument) {
@@ -786,7 +792,11 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replie
ASSERT_THAT(trace, HasSubstr("baaa"));
}
-TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
+void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_second_phase(
+ Timestamp lowest_get_timestamp,
+ Timestamp highest_get_timestamp,
+ Timestamp expected_response_timestamp)
+{
setupDistributor(2, 2, "storage:2 distributor:1");
// Update towards inconsistent bucket invokes safe path.
@@ -803,21 +813,31 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
// to a bucket we no longer own.
enableDistributorClusterState("storage:2 distributor:1 .0.s:d");
getBucketDatabase().clear();
- replyToGet(*cb, sender, 0, 70);
- replyToGet(*cb, sender, 1, 70);
-
+ replyToGet(*cb, sender, 0, lowest_get_timestamp);
+ replyToGet(*cb, sender, 1, highest_get_timestamp);
+
// BUCKET_NOT_FOUND is a transient error code which should cause the client
// to re-send the operation, presumably to the correct distributor the next
// time.
+ // Timestamp of updated doc varies depending on whether fast or safe path
+ // was triggered, as the reply is created via different paths.
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
"BucketId(0x0000000000000000), "
- "timestamp 0, timestamp of updated doc: 70) "
+ "timestamp 0, timestamp of updated doc: " + std::to_string(expected_response_timestamp) + ") "
"ReturnCode(BUCKET_NOT_FOUND, Distributor lost "
- "ownership of bucket between executing the read "
- "and write phases of a two-phase update operation)",
+ "ownership of bucket between executing the read "
+ "and write phases of a two-phase update operation)",
sender.getLastReply(true));
}
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) {
+ do_test_ownership_changed_between_gets_and_second_phase(70, 71, 71);
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) {
+ do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart
+}
+
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
setupDistributor(2, 2, "storage:2 distributor:1");
std::shared_ptr<TwoPhaseUpdateOperation> cb(
@@ -977,6 +997,53 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) {
assertAbortedUpdateReplyWithContextPresent(closeSender);
}
+TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_restarts_with_fast_path_if_enabled) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(true);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ replyToGet(*cb, sender, 0, old_timestamp);
+ replyToGet(*cb, sender, 1, old_timestamp);
+
+ ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2));
+ replyToMessage(*cb, sender, 2, old_timestamp);
+ replyToMessage(*cb, sender, 3, old_timestamp);
+
+ EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 0, timestamp of updated doc: 500) "
+ "ReturnCode(NONE)",
+ sender.getLastReply(true));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.fast_path_restarts.getValue());
+}
+
+TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_does_not_restart_with_fast_path_if_disabled) {
+ setupDistributor(2, 2, "storage:2 distributor:1");
+ getConfig().set_update_fast_path_restart_enabled(false);
+
+ std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
+ DistributorMessageSenderStub sender;
+ cb->start(sender, framework::MilliSecTime(0));
+
+ Timestamp old_timestamp = 500;
+ ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
+ replyToGet(*cb, sender, 0, old_timestamp);
+ replyToGet(*cb, sender, 1, old_timestamp);
+
+ // Should _not_ be restarted with fast path, as it has been config disabled
+ ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2));
+
+ auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index 341f0f39d8f..e89b9e6b1aa 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -40,6 +40,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_disableBucketActivation(false),
_sequenceMutatingOperations(true),
_allowStaleReadsDuringClusterStateTransitions(false),
+ _update_fast_path_restart_enabled(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{ }
@@ -150,6 +151,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_disableBucketActivation = config.disableBucketActivation;
_sequenceMutatingOperations = config.sequenceMutatingOperations;
_allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions;
+ _update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 28a219dc3f6..3cb84943508 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -250,6 +250,13 @@ public:
_allowStaleReadsDuringClusterStateTransitions = allow;
}
+ bool update_fast_path_restart_enabled() const noexcept {
+ return _update_fast_path_restart_enabled;
+ }
+ void set_update_fast_path_restart_enabled(bool enabled) noexcept {
+ _update_fast_path_restart_enabled = enabled;
+ }
+
bool containsTimeStatement(const std::string& documentSelection) const;
private:
@@ -293,6 +300,7 @@ private:
bool _disableBucketActivation;
bool _sequenceMutatingOperations;
bool _allowStaleReadsDuringClusterStateTransitions;
+ bool _update_fast_path_restart_enabled;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def
index e789f03bb14..f292b60a70e 100644
--- a/storage/src/vespa/storage/config/stor-distributormanager.def
+++ b/storage/src/vespa/storage/config/stor-distributormanager.def
@@ -205,3 +205,12 @@ simulated_db_merging_latency_msec int default=0
## read only operations, as the B-tree database is thread safe for concurrent reads.
use_btree_database bool default=false restart
+## If a bucket is inconsistent and an Update operation is received, a two-phase
+## write-repair path is triggered in which a Get is sent to all diverging replicas.
+## Once received, the update is applied on the distributor and pushed out to the
+## content nodes as Puts.
+## Iff this config is set to true AND all Gets return the same timestamp from all
+## content nodes, the two-phase update path reverts back to the regular fast path.
+## Since all replicas of the document were in sync, applying the update in-place
+## shall be considered safe.
+restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=true
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index ad2e5cf6478..7d7cd37e848 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -57,7 +57,8 @@ GetOperation::GetOperation(DistributorComponent& manager,
_doc(),
_lastModified(0),
_metric(metric),
- _operationTimer(manager.getClock())
+ _operationTimer(manager.getClock()),
+ _has_replica_inconsistency(false)
{
assignTargetNodeGroups(*read_guard);
}
@@ -150,6 +151,10 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
response.second[i].returnCode = getreply->getResult();
if (getreply->getResult().success()) {
+ if ((_lastModified != 0) && (getreply->getLastModifiedTimestamp() != _lastModified)) {
+ // At least two document versions returned had different timestamps.
+ _has_replica_inconsistency = true; // This is a one-way toggle.
+ }
if (getreply->getLastModifiedTimestamp() > _lastModified) {
_returnCode = getreply->getResult();
_lastModified = getreply->getLastModifiedTimestamp();
@@ -159,6 +164,12 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<
if (_lastModified == 0) {
_returnCode = getreply->getResult();
}
+ if (!all_bucket_metadata_initially_consistent()) {
+ // If we're sending to more than a single group of replicas it means our replica set is
+ // out of sync. Since we are unable to verify the timestamp of at least one replicated
+ // document, we fail safe by marking the entire operation as inconsistent.
+ _has_replica_inconsistency = true;
+ }
// Try to send to another node in this checksum group.
bool sent = sendForChecksum(sender, response.first.getBucketId(), response.second);
@@ -205,7 +216,7 @@ void
GetOperation::sendReply(DistributorMessageSender& sender)
{
if (_msg.get()) {
- auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified);
+ auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified, !_has_replica_inconsistency);
repl->setResult(_returnCode);
update_internal_metrics();
sender.sendReply(repl);
@@ -229,23 +240,6 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
LOG(spam, "Entry for %s: %s", e.getBucketId().toString().c_str(),
e->toString().c_str());
- bool haveTrusted = false;
- for (uint32_t i = 0; i < e->getNodeCount(); i++) {
- const BucketCopy& c = e->getNodeRef(i);
-
- if (!c.trusted()) {
- continue;
- }
-
- _responses[GroupId(e.getBucketId(), c.getChecksum(), -1)].push_back(c);
- haveTrusted = true;
- break;
- }
-
- if (haveTrusted) {
- continue;
- }
-
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
const BucketCopy& copy = e->getNodeRef(i);
@@ -259,8 +253,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
}
bool
-GetOperation::hasConsistentCopies() const
+GetOperation::all_bucket_metadata_initially_consistent() const
{
+ // TODO rename, calling this "responses" is confusing as it's populated before sending anything.
return _responses.size() == 1;
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 4f2d7c3d963..a30a9a721bd 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -35,7 +35,7 @@ public:
const char* getName() const override { return "get"; }
std::string getStatus() const override { return ""; }
- bool hasConsistentCopies() const;
+ bool all_bucket_metadata_initially_consistent() const;
// Exposed for unit testing. TODO feels a bit dirty :I
const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
@@ -88,6 +88,7 @@ private:
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
+ bool _has_replica_inconsistency;
void sendReply(DistributorMessageSender& sender);
bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res);
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index b3326a43be2..4283386a1bc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -158,6 +158,7 @@ void
TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
{
_mode = Mode::FAST_PATH;
+ LOG(debug, "Update(%s) fast path: sending Update commands", _updateCmd->getDocumentId().toString().c_str());
auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric);
UpdateOperation & op = *updateOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
@@ -364,6 +365,11 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
sendReplyWithResult(sender, reply.getResult());
return;
}
+ if (may_restart_with_fast_path(reply)) {
+ restart_with_fast_path_due_to_consistent_get_timestamps(sender);
+ return;
+ }
+
document::Document::SP docToUpdate;
api::Timestamp putTimestamp = _manager.getUniqueTimestamp();
@@ -399,6 +405,23 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
}
}
+bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) {
+ return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() &&
+ reply.wasFound() &&
+ reply.had_consistent_replicas());
+}
+
+void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) {
+ LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode",
+ _updateCmd->getDocumentId().toString().c_str());
+ if (lostBucketOwnershipBetweenPhases()) {
+ sendLostOwnershipTransientErrorReply(sender);
+ return;
+ }
+ _updateMetric.fast_path_restarts.inc();
+ startFastPathUpdate(sender);
+}
+
bool
TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorMessageSender& sender,
const document::Document& candidateDoc)
@@ -487,7 +510,7 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) {
std::shared_ptr<Operation> cb = _sentMessageMap.pop();
if (cb) {
- IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation > (), sender);
+ IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation>(), sender);
cb->onClose(intermediate);
// We will _only_ forward UpdateReply instances up, since those
// are created by UpdateOperation and are bound to the original
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index f9787141a19..84bcf2beff8 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -121,6 +121,8 @@ private:
bool hasTasCondition() const noexcept;
void replyWithTasFailure(DistributorMessageSender& sender,
vespalib::stringref message);
+ bool may_restart_with_fast_path(const api::GetReply& reply);
+ void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender);
UpdateMetricSet& _updateMetric;
PersistenceOperationMetricSet& _putMetric;
diff --git a/storage/src/vespa/storage/distributor/update_metric_set.cpp b/storage/src/vespa/storage/distributor/update_metric_set.cpp
index 2505003d2ba..1edde270517 100644
--- a/storage/src/vespa/storage/distributor/update_metric_set.cpp
+++ b/storage/src/vespa/storage/distributor/update_metric_set.cpp
@@ -12,7 +12,10 @@ UpdateMetricSet::UpdateMetricSet(MetricSet* owner)
: PersistenceOperationMetricSet("updates", owner),
diverging_timestamp_updates("diverging_timestamp_updates", {},
"Number of updates that report they were performed against "
- "divergent version timestamps on different replicas", this)
+ "divergent version timestamps on different replicas", this),
+ fast_path_restarts("fast_path_restarts", {}, "Number of safe path (write repair) updates "
+ "that were restarted as fast path updates because all replicas returned "
+ "documents with the same timestamp in the initial read phase", this)
{
}
diff --git a/storage/src/vespa/storage/distributor/update_metric_set.h b/storage/src/vespa/storage/distributor/update_metric_set.h
index de8474c949c..8a88aa6b9a9 100644
--- a/storage/src/vespa/storage/distributor/update_metric_set.h
+++ b/storage/src/vespa/storage/distributor/update_metric_set.h
@@ -10,6 +10,7 @@ namespace storage {
class UpdateMetricSet : public PersistenceOperationMetricSet {
public:
metrics::LongCountMetric diverging_timestamp_updates;
+ metrics::LongCountMetric fast_path_restarts;
explicit UpdateMetricSet(MetricSet* owner = nullptr);
~UpdateMetricSet() override;
diff --git a/storageapi/src/vespa/storageapi/message/persistence.cpp b/storageapi/src/vespa/storageapi/message/persistence.cpp
index 0b641eea253..8bd4f648f30 100644
--- a/storageapi/src/vespa/storageapi/message/persistence.cpp
+++ b/storageapi/src/vespa/storageapi/message/persistence.cpp
@@ -207,13 +207,17 @@ GetCommand::print(std::ostream& out, bool verbose, const std::string& indent) co
}
}
-GetReply::GetReply(const GetCommand& cmd, const DocumentSP& doc, Timestamp lastModified)
+GetReply::GetReply(const GetCommand& cmd,
+ const DocumentSP& doc,
+ Timestamp lastModified,
+ bool had_consistent_replicas)
: BucketInfoReply(cmd),
_docId(cmd.getDocumentId()),
_fieldSet(cmd.getFieldSet()),
_doc(doc),
_beforeTimestamp(cmd.getBeforeTimestamp()),
- _lastModifiedTime(lastModified)
+ _lastModifiedTime(lastModified),
+ _had_consistent_replicas(had_consistent_replicas)
{
}
diff --git a/storageapi/src/vespa/storageapi/message/persistence.h b/storageapi/src/vespa/storageapi/message/persistence.h
index 1fdd5c369bf..f2161421feb 100644
--- a/storageapi/src/vespa/storageapi/message/persistence.h
+++ b/storageapi/src/vespa/storageapi/message/persistence.h
@@ -217,11 +217,13 @@ class GetReply : public BucketInfoReply {
DocumentSP _doc; // Null pointer if not found
Timestamp _beforeTimestamp;
Timestamp _lastModifiedTime;
+ bool _had_consistent_replicas;
public:
GetReply(const GetCommand& cmd,
const DocumentSP& doc = DocumentSP(),
- Timestamp lastModified = 0);
+ Timestamp lastModified = 0,
+ bool had_consistent_replicas = false);
~GetReply() override;
const DocumentSP& getDocument() const { return _doc; }
@@ -231,6 +233,8 @@ public:
Timestamp getLastModifiedTimestamp() const { return _lastModifiedTime; }
Timestamp getBeforeTimestamp() const { return _beforeTimestamp; }
+ bool had_consistent_replicas() const noexcept { return _had_consistent_replicas; }
+
bool wasFound() const { return (_doc.get() != 0); }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGEREPLY(GetReply, onGetReply)