From 72cec61c615bfe25628c375eade4c8d8832cd2da Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Fri, 17 Jan 2020 09:53:05 +0000 Subject: Add configurable support for weakly consistent client Gets If configured, Get operations initiated by the client are flagged with weak internal consistency. This allows the backend to bypass certain internal synchronization mechanisms, which minimizes latency at the cost of possibly not observing a consistent view of the document. This config should only be used in a very restricted set of cases where the document set is effectively read-only, or cross- field consistency or freshness does not matter. To enable the weak consistency, use an explicit config override: ``` true ``` This closes #11811 --- storage/src/tests/distributor/distributortest.cpp | 19 +++++++++++++++ .../distributor/externaloperationhandlertest.cpp | 27 ++++++++++++++++++++++ storage/src/tests/distributor/getoperationtest.cpp | 26 +++++++++++++++++++-- .../distributor/twophaseupdateoperationtest.cpp | 16 +++++++++++++ .../storage/config/distributorconfiguration.cpp | 2 ++ .../storage/config/distributorconfiguration.h | 8 +++++++ .../storage/config/stor-distributormanager.def | 9 ++++++++ .../src/vespa/storage/distributor/distributor.cpp | 2 ++ .../distributor/externaloperationhandler.cpp | 12 ++++++++-- .../storage/distributor/externaloperationhandler.h | 11 +++++++++ .../operations/external/getoperation.cpp | 5 +++- .../distributor/operations/external/getoperation.h | 8 ++++++- .../storage/persistence/persistencethread.cpp | 14 +++++++++++ 13 files changed, 153 insertions(+), 6 deletions(-) diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 83ef7891630..afc3b254c64 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -187,6 +187,12 @@ struct DistributorTest : Test, DistributorTestUtil { configureDistributor(builder); } + void configure_use_weak_internal_read_consistency(bool use_weak) { + ConfigBuilder builder; + builder.useWeakInternalReadConsistencyForClientGets = use_weak; + configureDistributor(builder); + } + void configureMaxClusterClockSkew(int seconds); void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); @@ -1038,6 +1044,19 @@ TEST_F(DistributorTest, merge_disabling_config_is_propagated_to_internal_config) EXPECT_FALSE(getConfig().merge_operations_disabled()); } +TEST_F(DistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) { + createLinks(true); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_use_weak_internal_read_consistency(true); + EXPECT_TRUE(getConfig().use_weak_internal_read_consistency_for_client_gets()); + EXPECT_TRUE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets()); + + configure_use_weak_internal_read_consistency(false); + EXPECT_FALSE(getConfig().use_weak_internal_read_consistency_for_client_gets()); + EXPECT_FALSE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets()); +} + TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) { createLinks(false); setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index f2f949a6d56..03ba148277e 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -83,6 +83,7 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil { close(); } + void do_test_get_weak_consistency_is_propagated(bool use_weak); }; TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { @@ -534,6 +535,32 @@ TEST_F(ExternalOperationHandlerTest, gets_are_busy_bounced_during_transition_per _sender.reply(0)->getResult().toString()); } +void ExternalOperationHandlerTest::do_test_get_weak_consistency_is_propagated(bool use_weak) { + createLinks(); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); + // Explicitly only touch config in the case weak consistency is enabled to ensure the + // default is strong. + if (use_weak) { + getExternalOperationHandler().set_use_weak_internal_read_consistency_for_gets(true); + } + document::BucketId b(16, 1234); + Operation::SP op; + ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected( + makeGetCommandForUser(b.withoutCountBits()), op)); + auto& get_op = dynamic_cast(*op); + EXPECT_EQ(get_op.desired_read_consistency(), + (use_weak ? api::InternalReadConsistency::Weak + : api::InternalReadConsistency::Strong)); +} + +TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_strong_consistency_by_default) { + do_test_get_weak_consistency_is_propagated(false); +} + +TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_config_enabled) { + do_test_get_weak_consistency_is_propagated(true); +} + // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with // the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket // sub tree under a given location, while the sequencer works on individual GIDs. Mapping the diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index db790639869..0dbec8444cc 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -51,12 +51,13 @@ struct GetOperationTest : Test, DistributorTestUtil { op.reset(); } - void sendGet() { + void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) { auto msg = std::make_shared(makeDocumentBucket(document::BucketId(0)), docId, "[all]"); op = std::make_unique( getExternalOperationHandler(), getDistributorBucketSpace(), getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(), - msg, getDistributor().getMetrics(). gets[msg->getLoadType()]); + msg, getDistributor().getMetrics(). gets[msg->getLoadType()], + consistency); op->start(_sender, framework::MilliSecTime(0)); } @@ -127,6 +128,8 @@ struct GetOperationTest : Test, DistributorTestUtil { void setClusterState(const std::string& clusterState) { enableDistributorClusterState(clusterState); } + + void do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency); }; GetOperationTest::GetOperationTest() = default; @@ -497,4 +500,23 @@ TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) { EXPECT_EQ("Get => 0", _sender.getCommands(true)); } +void GetOperationTest::do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency) { + setClusterState("distributor:1 storage:1"); + addNodesToBucketDB(bucketId, "0=4"); + sendGet(consistency); + ASSERT_TRUE(op); + EXPECT_EQ(dynamic_cast(*op).desired_read_consistency(), consistency); + ASSERT_EQ("Get => 0", _sender.getCommands(true)); + auto& cmd = dynamic_cast(*_sender.command(0)); + EXPECT_EQ(cmd.internal_read_consistency(), consistency); +} + +TEST_F(GetOperationTest, can_send_gets_with_strong_internal_read_consistency) { + do_test_read_consistency_is_propagated(api::InternalReadConsistency::Strong); +} + +TEST_F(GetOperationTest, can_send_gets_with_weak_internal_read_consistency) { + do_test_read_consistency_is_propagated(api::InternalReadConsistency::Weak); +} + } diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index c8bfcd754f7..f1900c40d56 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -1088,6 +1088,22 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_foun ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); } +// The weak consistency config _only_ applies to Get operations initiated directly +// by the client, not those indirectly initiated by the distributor in order to +// fulfill update write-repairs. +TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency_even_if_weak_consistency_configured) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_use_weak_internal_read_consistency_for_client_gets(true); + + std::shared_ptr cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + auto& get_cmd = dynamic_cast(*sender.command(0)); + EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong); +} + // XXX currently differs in behavior from content nodes in that updates for // document IDs without explicit doctypes will _not_ be auto-failed on the // distributor. diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index ed14e227fc1..522561ee8a5 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -41,6 +41,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _allowStaleReadsDuringClusterStateTransitions(false), _update_fast_path_restart_enabled(false), _merge_operations_disabled(false), + _use_weak_internal_read_consistency_for_client_gets(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -153,6 +154,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions; _update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent; _merge_operations_disabled = config.mergeOperationsDisabled; + _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 5b01e37992b..333d7073715 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -228,6 +228,13 @@ public: _merge_operations_disabled = disabled; } + void set_use_weak_internal_read_consistency_for_client_gets(bool use_weak) noexcept { + _use_weak_internal_read_consistency_for_client_gets = use_weak; + } + bool use_weak_internal_read_consistency_for_client_gets() const noexcept { + return _use_weak_internal_read_consistency_for_client_gets; + } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -273,6 +280,7 @@ private: bool _allowStaleReadsDuringClusterStateTransitions; bool _update_fast_path_restart_enabled; bool _merge_operations_disabled; + bool _use_weak_internal_read_consistency_for_client_gets; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 4587e7b3ebe..e4d3182ce7a 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -219,3 +219,12 @@ restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default= ## This is ONLY intended for system testing of certain transient edge cases and ## MUST NOT be set to true in a production environment. merge_operations_disabled bool default=false + +## If set, Get operations that are initiated by the client (i.e. _not_ Get operations +## that are initiated by the distributor) will be forwarded to the backend with +## a flag signalling that weak read consistency may be used. This allows the +## backend to minimize internal locking. The downside is that it's not guaranteed +## to observe the most recent writes to the document, nor to observe an atomically +## consistent view of fields across document versions. +## This is mostly useful in a system that is effectively read-only. +use_weak_internal_read_consistency_for_client_gets bool default=false diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 5c6773ea5bf..988af43a7be 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -849,6 +849,8 @@ Distributor::enableNextConfig() // Concurrent reads are only safe if the B-tree DB implementation is used. _externalOperationHandler.set_concurrent_gets_enabled( _use_btree_database && getConfig().allowStaleReadsDuringClusterStateTransitions()); + _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets( + getConfig().use_weak_internal_read_consistency_for_client_gets()); } void diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index adeab7ba132..b946b788391 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -62,7 +62,8 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, _rejectFeedBeforeTimeReached(), // At epoch _non_main_thread_ops_mutex(), _non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()), - _concurrent_gets_enabled(false) + _concurrent_gets_enabled(false), + _use_weak_internal_read_consistency_for_gets(false) { } @@ -323,6 +324,12 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) return true; } +api::InternalReadConsistency ExternalOperationHandler::desired_get_read_consistency() const noexcept { + return (use_weak_internal_read_consistency_for_gets() + ? api::InternalReadConsistency::Weak + : api::InternalReadConsistency::Strong); +} + std::shared_ptr ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr& cmd) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); auto& metrics = getMetrics().gets[cmd->getLoadType()]; @@ -342,7 +349,8 @@ std::shared_ptr ExternalOperationHandler::try_generate_get_operation( const auto* space_repo = snapshot.bucket_space_repo(); assert(space_repo != nullptr); return std::make_shared(*this, space_repo->get(bucket.getBucketSpace()), - snapshot.steal_read_guard(), cmd, metrics); + snapshot.steal_read_guard(), cmd, metrics, + desired_get_read_consistency()); } IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index e38f6792717..96875a3644a 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -66,6 +66,14 @@ public: return _concurrent_gets_enabled.load(std::memory_order_relaxed); } + void set_use_weak_internal_read_consistency_for_gets(bool use_weak) noexcept { + _use_weak_internal_read_consistency_for_gets.store(use_weak, std::memory_order_relaxed); + } + + bool use_weak_internal_read_consistency_for_gets() const noexcept { + return _use_weak_internal_read_consistency_for_gets.load(std::memory_order_relaxed); + } + private: std::unique_ptr _direct_dispatch_sender; const MaintenanceOperationGenerator& _operationGenerator; @@ -75,6 +83,7 @@ private: mutable std::mutex _non_main_thread_ops_mutex; OperationOwner _non_main_thread_ops_owner; std::atomic _concurrent_gets_enabled; + std::atomic _use_weak_internal_read_consistency_for_gets; template void bounce_or_invoke_read_only_op(api::StorageCommand& cmd, @@ -103,6 +112,8 @@ private: PersistenceOperationMetricSet& persistenceMetrics) const; bool allowMutation(const SequencingHandle& handle) const; + api::InternalReadConsistency desired_get_read_consistency() const noexcept; + DistributorMetricSet& getMetrics() { return getDistributor().getMetrics(); } }; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 52b6da09a76..9e66c212ac6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -48,7 +48,8 @@ GetOperation::GetOperation(DistributorComponent& manager, const DistributorBucketSpace &bucketSpace, std::shared_ptr read_guard, std::shared_ptr msg, - PersistenceOperationMetricSet& metric) + PersistenceOperationMetricSet& metric, + api::InternalReadConsistency desired_read_consistency) : Operation(), _manager(manager), _bucketSpace(bucketSpace), @@ -58,6 +59,7 @@ GetOperation::GetOperation(DistributorComponent& manager, _lastModified(), _metric(metric), _operationTimer(manager.getClock()), + _desired_read_consistency(desired_read_consistency), _has_replica_inconsistency(false) { assignTargetNodeGroups(*read_guard); @@ -104,6 +106,7 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document:: auto command = std::make_shared(bucket, _msg->getDocumentId(), _msg->getFieldSet(), _msg->getBeforeTimestamp()); copyMessageSettings(*_msg, *command); + command->set_internal_read_consistency(_desired_read_consistency); LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode()); diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 69f7b5580f4..1106968bcf7 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -28,7 +28,8 @@ public: const DistributorBucketSpace &bucketSpace, std::shared_ptr read_guard, std::shared_ptr msg, - PersistenceOperationMetricSet& metric); + PersistenceOperationMetricSet& metric, + api::InternalReadConsistency desired_read_consistency = api::InternalReadConsistency::Strong); void onClose(DistributorMessageSender& sender) override; void onStart(DistributorMessageSender& sender) override; @@ -45,6 +46,10 @@ public: return _replicas_in_db; } + api::InternalReadConsistency desired_read_consistency() const noexcept { + return _desired_read_consistency; + } + private: class GroupId { public: @@ -94,6 +99,7 @@ private: PersistenceOperationMetricSet& _metric; framework::MilliSecTimer _operationTimer; std::vector> _replicas_in_db; + api::InternalReadConsistency _desired_read_consistency; bool _has_replica_inconsistency; void sendReply(DistributorMessageSender& sender); diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index b738a9ecf00..7d0ce26b83d 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -164,6 +164,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd) return tracker; } +namespace { + +spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept { + switch (consistency) { + case api::InternalReadConsistency::Strong: return spi::ReadConsistency::STRONG; + case api::InternalReadConsistency::Weak: return spi::ReadConsistency::WEAK; + default: abort(); + } +} + +} + MessageTracker::UP PersistenceThread::handleGet(api::GetCommand& cmd) { @@ -173,6 +185,8 @@ PersistenceThread::handleGet(api::GetCommand& cmd) document::FieldSetRepo repo; document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); + // _context is reset per command, so it's safe to modify it like this. + _context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); spi::GetResult result = _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context); -- cgit v1.2.3