diff options
14 files changed, 213 insertions, 40 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index e6c617a32e4..94d33e50047 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -174,6 +174,12 @@ struct DistributorTest : Test, DistributorTestUtil { configureDistributor(builder); } + void configure_update_fast_path_restart_enabled(bool enabled) { + ConfigBuilder builder; + builder.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent = enabled; + configureDistributor(builder); + } + void configureMaxClusterClockSkew(int seconds); void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); @@ -1001,6 +1007,17 @@ TEST_F(DistributorTest, stale_reads_config_is_propagated_to_external_operation_h EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled()); } +TEST_F(DistributorTest, fast_path_on_consistent_gets_config_is_propagated_to_internal_config) { + createLinks(true); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_update_fast_path_restart_enabled(true); + EXPECT_TRUE(getConfig().update_fast_path_restart_enabled()); + + configure_update_fast_path_restart_enabled(false); + EXPECT_FALSE(getConfig().update_fast_path_restart_enabled()); +} + TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) { createLinks(false); setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 99d7c12551d..72a124d45b4 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -117,6 +117,13 @@ struct GetOperationTest : Test, DistributorTestUtil { } } + bool last_reply_had_consistent_replicas() { + assert(!_sender.replies().empty()); + auto& msg = *_sender.replies().back(); + assert(msg.getType() == api::MessageType::GET_REPLY); + return dynamic_cast<api::GetReply&>(msg).had_consistent_replicas(); + } + void setClusterState(const std::string& clusterState) { enableDistributorClusterState(clusterState); } @@ -139,22 +146,25 @@ TEST_F(GetOperationTest, simple) { EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_TRUE(last_reply_had_consistent_replicas()); } -TEST_F(GetOperationTest, ask_trusted_node_if_bucket_is_inconsistent) { +TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) { setClusterState("distributor:1 storage:4"); addNodesToBucketDB(bucketId, "0=100/3/10,1=200/4/12/t"); sendGet(); - ASSERT_EQ("Get => 1", _sender.getCommands(true)); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); - ASSERT_NO_FATAL_FAILURE(replyWithDocument()); + ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::OK, "newauthor", 2)); + ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "oldauthor", 1)); EXPECT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " - "timestamp 100) ReturnCode(NONE)", + "timestamp 2) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) { @@ -174,6 +184,7 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) { _sender.getLastReply()); EXPECT_EQ("newauthor", getLastReplyAuthor()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, send_to_all_invalid_copies) { @@ -193,6 +204,7 @@ TEST_F(GetOperationTest, send_to_all_invalid_copies) { _sender.getLastReply()); EXPECT_EQ("newauthor", getLastReplyAuthor()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, send_to_all_invalid_nodes_when_inconsistent) { @@ -235,6 +247,7 @@ TEST_F(GetOperationTest, inconsistent_split) { _sender.getLastReply()); EXPECT_EQ("newauthor", getLastReplyAuthor()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) { @@ -252,6 +265,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 2) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) { @@ -271,6 +285,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 3) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, multi_inconsistent_bucket) { @@ -290,6 +305,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket) { _sender.getLastReply()); EXPECT_EQ("newauthor", getLastReplyAuthor()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) { @@ -312,6 +328,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) { @@ -322,6 +339,7 @@ TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 0) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_TRUE(last_reply_had_consistent_replicas()); // Nothing in the bucket, so nothing to be inconsistent with. } TEST_F(GetOperationTest, not_found) { @@ -340,8 +358,9 @@ TEST_F(GetOperationTest, not_found) { "timestamp 0) ReturnCode(NONE)", _sender.getLastReply()); - EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT]. + EXPECT_EQ(1, getDistributor().getMetrics().gets[documentapi::LoadType::DEFAULT]. failures.notfound.getValue()); + EXPECT_TRUE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, resend_on_storage_failure) { @@ -366,6 +385,21 @@ TEST_F(GetOperationTest, resend_on_storage_failure) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); + // Replica had read failure, but they're still in sync. An immutable Get won't change that fact. + EXPECT_TRUE(last_reply_had_consistent_replicas()); +} + +TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) { + setClusterState("distributor:1 storage:3"); + addNodesToBucketDB(bucketId, "1=100,2=200"); + sendGet(); + ASSERT_EQ("Get => 1,Get => 2", _sender.getCommands(true)); + ASSERT_NO_FATAL_FAILURE(sendReply(0, api::ReturnCode::TIMEOUT, "", 0)); + ASSERT_NO_FATAL_FAILURE(sendReply(1, api::ReturnCode::OK, "newestauthor", 3)); + ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " + "timestamp 3) ReturnCode(NONE)", + _sender.getLastReply()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) { @@ -390,6 +424,7 @@ TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 0) ReturnCode(IO_FAILURE)", _sender.getLastReply()); + EXPECT_TRUE(last_reply_had_consistent_replicas()); // Doesn't really matter since operation itself failed } TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) { @@ -408,6 +443,7 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) { ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, " "timestamp 100) ReturnCode(NONE)", _sender.getLastReply()); + EXPECT_TRUE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) { @@ -435,6 +471,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) { _sender.getLastReply()); EXPECT_EQ("newestauthor", getLastReplyAuthor()); + EXPECT_TRUE(last_reply_had_consistent_replicas()); } TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) { diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index df9bf683326..67ef3374633 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -126,6 +126,10 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorTestUtil { void assertAbortedUpdateReplyWithContextPresent( const DistributorMessageSenderStub& closeSender) const; + void do_test_ownership_changed_between_gets_and_second_phase(Timestamp lowest_get_timestamp, + Timestamp highest_get_timestamp, + Timestamp expected_response_timestamp); + }; TwoPhaseUpdateOperationTest::TwoPhaseUpdateOperationTest() = default; @@ -142,7 +146,9 @@ TwoPhaseUpdateOperationTest::replyToMessage( std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& updatec = dynamic_cast<UpdateCommand&>(*msg2); std::unique_ptr<api::StorageReply> reply(updatec.makeReply()); - static_cast<api::UpdateReply*>(reply.get())->setOldTimestamp(oldTimestamp); + auto& update_reply = dynamic_cast<api::UpdateReply&>(*reply); + update_reply.setOldTimestamp(oldTimestamp); + update_reply.setBucketInfo(api::BucketInfo(0x123, 1, 100)); // Dummy info to avoid invalid info being returned reply->setResult(api::ReturnCode(result, "")); callback.receive(sender, @@ -193,7 +199,7 @@ TwoPhaseUpdateOperationTest::replyToGet( api::ReturnCode::Result result, const std::string& traceMsg) { - auto& get = static_cast<const api::GetCommand&>(*sender.command(index)); + auto& get = dynamic_cast<const api::GetCommand&>(*sender.command(index)); std::shared_ptr<api::StorageReply> reply; if (haveDocument) { @@ -786,7 +792,11 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_propagates_mbus_traces_from_replie ASSERT_THAT(trace, HasSubstr("baaa")); } -TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) { +void TwoPhaseUpdateOperationTest::do_test_ownership_changed_between_gets_and_second_phase( + Timestamp lowest_get_timestamp, + Timestamp highest_get_timestamp, + Timestamp expected_response_timestamp) +{ setupDistributor(2, 2, "storage:2 distributor:1"); // Update towards inconsistent bucket invokes safe path. @@ -803,21 +813,31 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge // to a bucket we no longer own. enableDistributorClusterState("storage:2 distributor:1 .0.s:d"); getBucketDatabase().clear(); - replyToGet(*cb, sender, 0, 70); - replyToGet(*cb, sender, 1, 70); - + replyToGet(*cb, sender, 0, lowest_get_timestamp); + replyToGet(*cb, sender, 1, highest_get_timestamp); + // BUCKET_NOT_FOUND is a transient error code which should cause the client // to re-send the operation, presumably to the correct distributor the next // time. + // Timestamp of updated doc varies depending on whether fast or safe path + // was triggered, as the reply is created via different paths. EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " "BucketId(0x0000000000000000), " - "timestamp 0, timestamp of updated doc: 70) " + "timestamp 0, timestamp of updated doc: " + std::to_string(expected_response_timestamp) + ") " "ReturnCode(BUCKET_NOT_FOUND, Distributor lost " - "ownership of bucket between executing the read " - "and write phases of a two-phase update operation)", + "ownership of bucket between executing the read " + "and write phases of a two-phase update operation)", sender.getLastReply(true)); } +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_put) { + do_test_ownership_changed_between_gets_and_second_phase(70, 71, 71); +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) { + do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart +} + TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) { setupDistributor(2, 2, "storage:2 distributor:1"); std::shared_ptr<TwoPhaseUpdateOperation> cb( @@ -977,6 +997,53 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_close_edge_sends_correct_reply) { assertAbortedUpdateReplyWithContextPresent(closeSender); } +TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_restarts_with_fast_path_if_enabled) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_update_fast_path_restart_enabled(true); + + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + Timestamp old_timestamp = 500; + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + replyToGet(*cb, sender, 0, old_timestamp); + replyToGet(*cb, sender, 1, old_timestamp); + + ASSERT_EQ("Update => 0,Update => 1", sender.getCommands(true, false, 2)); + replyToMessage(*cb, sender, 2, old_timestamp); + replyToMessage(*cb, sender, 3, old_timestamp); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 500) " + "ReturnCode(NONE)", + sender.getLastReply(true)); + + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + EXPECT_EQ(1, metrics.fast_path_restarts.getValue()); +} + +TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_does_not_restart_with_fast_path_if_disabled) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_update_fast_path_restart_enabled(false); + + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + Timestamp old_timestamp = 500; + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + replyToGet(*cb, sender, 0, old_timestamp); + replyToGet(*cb, sender, 1, old_timestamp); + + // Should _not_ be restarted with fast path, as it has been config disabled + ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); + + auto& metrics = getDistributor().getMetrics().updates[documentapi::LoadType::DEFAULT]; + EXPECT_EQ(0, metrics.fast_path_restarts.getValue()); +} + // XXX currently differs in behavior from content nodes in that updates for // document IDs without explicit doctypes will _not_ be auto-failed on the // distributor. diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 341f0f39d8f..e89b9e6b1aa 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -40,6 +40,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _disableBucketActivation(false), _sequenceMutatingOperations(true), _allowStaleReadsDuringClusterStateTransitions(false), + _update_fast_path_restart_enabled(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -150,6 +151,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _disableBucketActivation = config.disableBucketActivation; _sequenceMutatingOperations = config.sequenceMutatingOperations; _allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions; + _update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 28a219dc3f6..3cb84943508 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -250,6 +250,13 @@ public: _allowStaleReadsDuringClusterStateTransitions = allow; } + bool update_fast_path_restart_enabled() const noexcept { + return _update_fast_path_restart_enabled; + } + void set_update_fast_path_restart_enabled(bool enabled) noexcept { + _update_fast_path_restart_enabled = enabled; + } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -293,6 +300,7 @@ private: bool _disableBucketActivation; bool _sequenceMutatingOperations; bool _allowStaleReadsDuringClusterStateTransitions; + bool _update_fast_path_restart_enabled; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index e789f03bb14..f292b60a70e 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -205,3 +205,12 @@ simulated_db_merging_latency_msec int default=0 ## read only operations, as the B-tree database is thread safe for concurrent reads. use_btree_database bool default=false restart +## If a bucket is inconsistent and an Update operation is received, a two-phase +## write-repair path is triggered in which a Get is sent to all diverging replicas. +## Once received, the update is applied on the distributor and pushed out to the +## content nodes as Puts. +## Iff this config is set to true AND all Gets return the same timestamp from all +## content nodes, the two-phase update path reverts back to the regular fast path. +## Since all replicas of the document were in sync, applying the update in-place +## shall be considered safe. +restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=true diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index ad2e5cf6478..7d7cd37e848 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -57,7 +57,8 @@ GetOperation::GetOperation(DistributorComponent& manager, _doc(), _lastModified(0), _metric(metric), - _operationTimer(manager.getClock()) + _operationTimer(manager.getClock()), + _has_replica_inconsistency(false) { assignTargetNodeGroups(*read_guard); } @@ -150,6 +151,10 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr< response.second[i].returnCode = getreply->getResult(); if (getreply->getResult().success()) { + if ((_lastModified != 0) && (getreply->getLastModifiedTimestamp() != _lastModified)) { + // At least two document versions returned had different timestamps. + _has_replica_inconsistency = true; // This is a one-way toggle. + } if (getreply->getLastModifiedTimestamp() > _lastModified) { _returnCode = getreply->getResult(); _lastModified = getreply->getLastModifiedTimestamp(); @@ -159,6 +164,12 @@ GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr< if (_lastModified == 0) { _returnCode = getreply->getResult(); } + if (!all_bucket_metadata_initially_consistent()) { + // If we're sending to more than a single group of replicas it means our replica set is + // out of sync. Since we are unable to verify the timestamp of at least one replicated + // document, we fail safe by marking the entire operation as inconsistent. + _has_replica_inconsistency = true; + } // Try to send to another node in this checksum group. bool sent = sendForChecksum(sender, response.first.getBucketId(), response.second); @@ -205,7 +216,7 @@ void GetOperation::sendReply(DistributorMessageSender& sender) { if (_msg.get()) { - auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified); + auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified, !_has_replica_inconsistency); repl->setResult(_returnCode); update_internal_metrics(); sender.sendReply(repl); @@ -229,23 +240,6 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard LOG(spam, "Entry for %s: %s", e.getBucketId().toString().c_str(), e->toString().c_str()); - bool haveTrusted = false; - for (uint32_t i = 0; i < e->getNodeCount(); i++) { - const BucketCopy& c = e->getNodeRef(i); - - if (!c.trusted()) { - continue; - } - - _responses[GroupId(e.getBucketId(), c.getChecksum(), -1)].push_back(c); - haveTrusted = true; - break; - } - - if (haveTrusted) { - continue; - } - for (uint32_t i = 0; i < e->getNodeCount(); i++) { const BucketCopy& copy = e->getNodeRef(i); @@ -259,8 +253,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard } bool -GetOperation::hasConsistentCopies() const +GetOperation::all_bucket_metadata_initially_consistent() const { + // TODO rename, calling this "responses" is confusing as it's populated before sending anything. return _responses.size() == 1; } diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 4f2d7c3d963..a30a9a721bd 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -35,7 +35,7 @@ public: const char* getName() const override { return "get"; } std::string getStatus() const override { return ""; } - bool hasConsistentCopies() const; + bool all_bucket_metadata_initially_consistent() const; // Exposed for unit testing. TODO feels a bit dirty :I const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; } @@ -88,6 +88,7 @@ private: PersistenceOperationMetricSet& _metric; framework::MilliSecTimer _operationTimer; + bool _has_replica_inconsistency; void sendReply(DistributorMessageSender& sender); bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index b3326a43be2..4283386a1bc 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -158,6 +158,7 @@ void TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) { _mode = Mode::FAST_PATH; + LOG(debug, "Update(%s) fast path: sending Update commands", _updateCmd->getDocumentId().toString().c_str()); auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric); UpdateOperation & op = *updateOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender); @@ -364,6 +365,11 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen sendReplyWithResult(sender, reply.getResult()); return; } + if (may_restart_with_fast_path(reply)) { + restart_with_fast_path_due_to_consistent_get_timestamps(sender); + return; + } + document::Document::SP docToUpdate; api::Timestamp putTimestamp = _manager.getUniqueTimestamp(); @@ -399,6 +405,23 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen } } +bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) { + return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() && + reply.wasFound() && + reply.had_consistent_replicas()); +} + +void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) { + LOG(debug, "Update(%s): all Gets returned in initial safe path were consistent, restarting in fast path mode", + _updateCmd->getDocumentId().toString().c_str()); + if (lostBucketOwnershipBetweenPhases()) { + sendLostOwnershipTransientErrorReply(sender); + return; + } + _updateMetric.fast_path_restarts.inc(); + startFastPathUpdate(sender); +} + bool TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorMessageSender& sender, const document::Document& candidateDoc) @@ -487,7 +510,7 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) { std::shared_ptr<Operation> cb = _sentMessageMap.pop(); if (cb) { - IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation > (), sender); + IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation>(), sender); cb->onClose(intermediate); // We will _only_ forward UpdateReply instances up, since those // are created by UpdateOperation and are bound to the original diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index f9787141a19..84bcf2beff8 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -121,6 +121,8 @@ private: bool hasTasCondition() const noexcept; void replyWithTasFailure(DistributorMessageSender& sender, vespalib::stringref message); + bool may_restart_with_fast_path(const api::GetReply& reply); + void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender); UpdateMetricSet& _updateMetric; PersistenceOperationMetricSet& _putMetric; diff --git a/storage/src/vespa/storage/distributor/update_metric_set.cpp b/storage/src/vespa/storage/distributor/update_metric_set.cpp index 2505003d2ba..1edde270517 100644 --- a/storage/src/vespa/storage/distributor/update_metric_set.cpp +++ b/storage/src/vespa/storage/distributor/update_metric_set.cpp @@ -12,7 +12,10 @@ UpdateMetricSet::UpdateMetricSet(MetricSet* owner) : PersistenceOperationMetricSet("updates", owner), diverging_timestamp_updates("diverging_timestamp_updates", {}, "Number of updates that report they were performed against " - "divergent version timestamps on different replicas", this) + "divergent version timestamps on different replicas", this), + fast_path_restarts("fast_path_restarts", {}, "Number of safe path (write repair) updates " + "that were restarted as fast path updates because all replicas returned " + "documents with the same timestamp in the initial read phase", this) { } diff --git a/storage/src/vespa/storage/distributor/update_metric_set.h b/storage/src/vespa/storage/distributor/update_metric_set.h index de8474c949c..8a88aa6b9a9 100644 --- a/storage/src/vespa/storage/distributor/update_metric_set.h +++ b/storage/src/vespa/storage/distributor/update_metric_set.h @@ -10,6 +10,7 @@ namespace storage { class UpdateMetricSet : public PersistenceOperationMetricSet { public: metrics::LongCountMetric diverging_timestamp_updates; + metrics::LongCountMetric fast_path_restarts; explicit UpdateMetricSet(MetricSet* owner = nullptr); ~UpdateMetricSet() override; diff --git a/storageapi/src/vespa/storageapi/message/persistence.cpp b/storageapi/src/vespa/storageapi/message/persistence.cpp index 0b641eea253..8bd4f648f30 100644 --- a/storageapi/src/vespa/storageapi/message/persistence.cpp +++ b/storageapi/src/vespa/storageapi/message/persistence.cpp @@ -207,13 +207,17 @@ GetCommand::print(std::ostream& out, bool verbose, const std::string& indent) co } } -GetReply::GetReply(const GetCommand& cmd, const DocumentSP& doc, Timestamp lastModified) +GetReply::GetReply(const GetCommand& cmd, + const DocumentSP& doc, + Timestamp lastModified, + bool had_consistent_replicas) : BucketInfoReply(cmd), _docId(cmd.getDocumentId()), _fieldSet(cmd.getFieldSet()), _doc(doc), _beforeTimestamp(cmd.getBeforeTimestamp()), - _lastModifiedTime(lastModified) + _lastModifiedTime(lastModified), + _had_consistent_replicas(had_consistent_replicas) { } diff --git a/storageapi/src/vespa/storageapi/message/persistence.h b/storageapi/src/vespa/storageapi/message/persistence.h index 1fdd5c369bf..f2161421feb 100644 --- a/storageapi/src/vespa/storageapi/message/persistence.h +++ b/storageapi/src/vespa/storageapi/message/persistence.h @@ -217,11 +217,13 @@ class GetReply : public BucketInfoReply { DocumentSP _doc; // Null pointer if not found Timestamp _beforeTimestamp; Timestamp _lastModifiedTime; + bool _had_consistent_replicas; public: GetReply(const GetCommand& cmd, const DocumentSP& doc = DocumentSP(), - Timestamp lastModified = 0); + Timestamp lastModified = 0, + bool had_consistent_replicas = false); ~GetReply() override; const DocumentSP& getDocument() const { return _doc; } @@ -231,6 +233,8 @@ public: Timestamp getLastModifiedTimestamp() const { return _lastModifiedTime; } Timestamp getBeforeTimestamp() const { return _beforeTimestamp; } + bool had_consistent_replicas() const noexcept { return _had_consistent_replicas; } + bool wasFound() const { return (_doc.get() != 0); } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGEREPLY(GetReply, onGetReply) |