diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-01-17 12:58:28 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-17 12:58:28 +0100 |
commit | ca80f4634d471c078047337e1d10a7f73c163900 (patch) | |
tree | 9228ba22c84ef0814453f555b3dfff925204a195 /storage | |
parent | 2635cb9c2a8b1cd0a548185e3c7a07ee64f842b1 (diff) | |
parent | 72cec61c615bfe25628c375eade4c8d8832cd2da (diff) |
Merge pull request #11830 from vespa-engine/vekterli/support-weak-internal-read-consistency-for-client-gets
Vekterli/support weak internal read consistency for client gets
Diffstat (limited to 'storage')
14 files changed, 154 insertions, 7 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 83ef7891630..afc3b254c64 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -187,6 +187,12 @@ struct DistributorTest : Test, DistributorTestUtil { configureDistributor(builder); } + void configure_use_weak_internal_read_consistency(bool use_weak) { + ConfigBuilder builder; + builder.useWeakInternalReadConsistencyForClientGets = use_weak; + configureDistributor(builder); + } + void configureMaxClusterClockSkew(int seconds); void sendDownClusterStateCommand(); void replyToSingleRequestBucketInfoCommandWith1Bucket(); @@ -1038,6 +1044,19 @@ TEST_F(DistributorTest, merge_disabling_config_is_propagated_to_internal_config) EXPECT_FALSE(getConfig().merge_operations_disabled()); } +TEST_F(DistributorTest, weak_internal_read_consistency_config_is_propagated_to_internal_configs) { + createLinks(true); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_use_weak_internal_read_consistency(true); + EXPECT_TRUE(getConfig().use_weak_internal_read_consistency_for_client_gets()); + EXPECT_TRUE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets()); + + configure_use_weak_internal_read_consistency(false); + EXPECT_FALSE(getConfig().use_weak_internal_read_consistency_for_client_gets()); + EXPECT_FALSE(getExternalOperationHandler().use_weak_internal_read_consistency_for_gets()); +} + TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) { createLinks(false); setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index f2f949a6d56..03ba148277e 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -83,6 +83,7 @@ struct ExternalOperationHandlerTest : Test, DistributorTestUtil { close(); } + void do_test_get_weak_consistency_is_propagated(bool use_weak); }; TEST_F(ExternalOperationHandlerTest, bucket_split_mask) { @@ -534,6 +535,32 @@ TEST_F(ExternalOperationHandlerTest, gets_are_busy_bounced_during_transition_per _sender.reply(0)->getResult().toString()); } +void ExternalOperationHandlerTest::do_test_get_weak_consistency_is_propagated(bool use_weak) { + createLinks(); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); + // Explicitly only touch config in the case weak consistency is enabled to ensure the + // default is strong. + if (use_weak) { + getExternalOperationHandler().set_use_weak_internal_read_consistency_for_gets(true); + } + document::BucketId b(16, 1234); + Operation::SP op; + ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected( + makeGetCommandForUser(b.withoutCountBits()), op)); + auto& get_op = dynamic_cast<GetOperation&>(*op); + EXPECT_EQ(get_op.desired_read_consistency(), + (use_weak ? api::InternalReadConsistency::Weak + : api::InternalReadConsistency::Strong)); +} + +TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_strong_consistency_by_default) { + do_test_get_weak_consistency_is_propagated(false); +} + +TEST_F(ExternalOperationHandlerTest, gets_are_sent_with_weak_consistency_if_config_enabled) { + do_test_get_weak_consistency_is_propagated(true); +} + // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with // the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket // sub tree under a given location, while the sequencer works on individual GIDs. Mapping the diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index db790639869..0dbec8444cc 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -51,12 +51,13 @@ struct GetOperationTest : Test, DistributorTestUtil { op.reset(); } - void sendGet() { + void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) { auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), docId, "[all]"); op = std::make_unique<GetOperation>( getExternalOperationHandler(), getDistributorBucketSpace(), getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(), - msg, getDistributor().getMetrics(). gets[msg->getLoadType()]); + msg, getDistributor().getMetrics(). gets[msg->getLoadType()], + consistency); op->start(_sender, framework::MilliSecTime(0)); } @@ -127,6 +128,8 @@ struct GetOperationTest : Test, DistributorTestUtil { void setClusterState(const std::string& clusterState) { enableDistributorClusterState(clusterState); } + + void do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency); }; GetOperationTest::GetOperationTest() = default; @@ -497,4 +500,23 @@ TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) { EXPECT_EQ("Get => 0", _sender.getCommands(true)); } +void GetOperationTest::do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency) { + setClusterState("distributor:1 storage:1"); + addNodesToBucketDB(bucketId, "0=4"); + sendGet(consistency); + ASSERT_TRUE(op); + EXPECT_EQ(dynamic_cast<GetOperation&>(*op).desired_read_consistency(), consistency); + ASSERT_EQ("Get => 0", _sender.getCommands(true)); + auto& cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0)); + EXPECT_EQ(cmd.internal_read_consistency(), consistency); +} + +TEST_F(GetOperationTest, can_send_gets_with_strong_internal_read_consistency) { + do_test_read_consistency_is_propagated(api::InternalReadConsistency::Strong); +} + +TEST_F(GetOperationTest, can_send_gets_with_weak_internal_read_consistency) { + do_test_read_consistency_is_propagated(api::InternalReadConsistency::Weak); +} + } diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index c8bfcd754f7..f1900c40d56 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -1088,6 +1088,22 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_document_not_foun ASSERT_EQ("Put => 1,Put => 0", sender.getCommands(true, false, 2)); } +// The weak consistency config _only_ applies to Get operations initiated directly +// by the client, not those indirectly initiated by the distributor in order to +// fulfill update write-repairs. +TEST_F(TwoPhaseUpdateOperationTest, update_gets_are_sent_with_strong_consistency_even_if_weak_consistency_configured) { + setupDistributor(2, 2, "storage:2 distributor:1"); + getConfig().set_use_weak_internal_read_consistency_for_client_gets(true); + + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + auto& get_cmd = dynamic_cast<const api::GetCommand&>(*sender.command(0)); + EXPECT_EQ(get_cmd.internal_read_consistency(), api::InternalReadConsistency::Strong); +} + // XXX currently differs in behavior from content nodes in that updates for // document IDs without explicit doctypes will _not_ be auto-failed on the // distributor. diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index ed14e227fc1..522561ee8a5 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -41,6 +41,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _allowStaleReadsDuringClusterStateTransitions(false), _update_fast_path_restart_enabled(false), _merge_operations_disabled(false), + _use_weak_internal_read_consistency_for_client_gets(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -153,6 +154,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions; _update_fast_path_restart_enabled = config.restartWithFastUpdatePathIfAllGetTimestampsAreConsistent; _merge_operations_disabled = config.mergeOperationsDisabled; + _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 5b01e37992b..333d7073715 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -228,6 +228,13 @@ public: _merge_operations_disabled = disabled; } + void set_use_weak_internal_read_consistency_for_client_gets(bool use_weak) noexcept { + _use_weak_internal_read_consistency_for_client_gets = use_weak; + } + bool use_weak_internal_read_consistency_for_client_gets() const noexcept { + return _use_weak_internal_read_consistency_for_client_gets; + } + bool containsTimeStatement(const std::string& documentSelection) const; private: @@ -273,6 +280,7 @@ private: bool _allowStaleReadsDuringClusterStateTransitions; bool _update_fast_path_restart_enabled; bool _merge_operations_disabled; + bool _use_weak_internal_read_consistency_for_client_gets; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 4587e7b3ebe..e4d3182ce7a 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -219,3 +219,12 @@ restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default= ## This is ONLY intended for system testing of certain transient edge cases and ## MUST NOT be set to true in a production environment. merge_operations_disabled bool default=false + +## If set, Get operations that are initiated by the client (i.e. _not_ Get operations +## that are initiated by the distributor) will be forwarded to the backend with +## a flag signalling that weak read consistency may be used. This allows the +## backend to minimize internal locking. The downside is that it's not guaranteed +## to observe the most recent writes to the document, nor to observe an atomically +## consistent view of fields across document versions. +## This is mostly useful in a system that is effectively read-only. +use_weak_internal_read_consistency_for_client_gets bool default=false diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 5c6773ea5bf..988af43a7be 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -849,6 +849,8 @@ Distributor::enableNextConfig() // Concurrent reads are only safe if the B-tree DB implementation is used. _externalOperationHandler.set_concurrent_gets_enabled( _use_btree_database && getConfig().allowStaleReadsDuringClusterStateTransitions()); + _externalOperationHandler.set_use_weak_internal_read_consistency_for_gets( + getConfig().use_weak_internal_read_consistency_for_client_gets()); } void diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index adeab7ba132..b946b788391 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -62,7 +62,8 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, _rejectFeedBeforeTimeReached(), // At epoch _non_main_thread_ops_mutex(), _non_main_thread_ops_owner(*_direct_dispatch_sender, getClock()), - _concurrent_gets_enabled(false) + _concurrent_gets_enabled(false), + _use_weak_internal_read_consistency_for_gets(false) { } @@ -323,6 +324,12 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) return true; } +api::InternalReadConsistency ExternalOperationHandler::desired_get_read_consistency() const noexcept { + return (use_weak_internal_read_consistency_for_gets() + ? api::InternalReadConsistency::Weak + : api::InternalReadConsistency::Strong); +} + std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr<api::GetCommand>& cmd) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); auto& metrics = getMetrics().gets[cmd->getLoadType()]; @@ -342,7 +349,8 @@ std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation( const auto* space_repo = snapshot.bucket_space_repo(); assert(space_repo != nullptr); return std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()), - snapshot.steal_read_guard(), cmd, metrics); + snapshot.steal_read_guard(), cmd, metrics, + desired_get_read_consistency()); } IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index e38f6792717..96875a3644a 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -66,6 +66,14 @@ public: return _concurrent_gets_enabled.load(std::memory_order_relaxed); } + void set_use_weak_internal_read_consistency_for_gets(bool use_weak) noexcept { + _use_weak_internal_read_consistency_for_gets.store(use_weak, std::memory_order_relaxed); + } + + bool use_weak_internal_read_consistency_for_gets() const noexcept { + return _use_weak_internal_read_consistency_for_gets.load(std::memory_order_relaxed); + } + private: std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender; const MaintenanceOperationGenerator& _operationGenerator; @@ -75,6 +83,7 @@ private: mutable std::mutex _non_main_thread_ops_mutex; OperationOwner _non_main_thread_ops_owner; std::atomic<bool> _concurrent_gets_enabled; + std::atomic<bool> _use_weak_internal_read_consistency_for_gets; template <typename Func> void bounce_or_invoke_read_only_op(api::StorageCommand& cmd, @@ -103,6 +112,8 @@ private: PersistenceOperationMetricSet& persistenceMetrics) const; bool allowMutation(const SequencingHandle& handle) const; + api::InternalReadConsistency desired_get_read_consistency() const noexcept; + DistributorMetricSet& getMetrics() { return getDistributor().getMetrics(); } }; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 52b6da09a76..9e66c212ac6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -48,7 +48,8 @@ GetOperation::GetOperation(DistributorComponent& manager, const DistributorBucketSpace &bucketSpace, std::shared_ptr<BucketDatabase::ReadGuard> read_guard, std::shared_ptr<api::GetCommand> msg, - PersistenceOperationMetricSet& metric) + PersistenceOperationMetricSet& metric, + api::InternalReadConsistency desired_read_consistency) : Operation(), _manager(manager), _bucketSpace(bucketSpace), @@ -58,6 +59,7 @@ GetOperation::GetOperation(DistributorComponent& manager, _lastModified(), _metric(metric), _operationTimer(manager.getClock()), + _desired_read_consistency(desired_read_consistency), _has_replica_inconsistency(false) { assignTargetNodeGroups(*read_guard); @@ -104,6 +106,7 @@ GetOperation::sendForChecksum(DistributorMessageSender& sender, const document:: auto command = std::make_shared<api::GetCommand>(bucket, _msg->getDocumentId(), _msg->getFieldSet(), _msg->getBeforeTimestamp()); copyMessageSettings(*_msg, *command); + command->set_internal_read_consistency(_desired_read_consistency); LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode()); diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 69f7b5580f4..1106968bcf7 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -28,7 +28,8 @@ public: const DistributorBucketSpace &bucketSpace, std::shared_ptr<BucketDatabase::ReadGuard> read_guard, std::shared_ptr<api::GetCommand> msg, - PersistenceOperationMetricSet& metric); + PersistenceOperationMetricSet& metric, + api::InternalReadConsistency desired_read_consistency = api::InternalReadConsistency::Strong); void onClose(DistributorMessageSender& sender) override; void onStart(DistributorMessageSender& sender) override; @@ -45,6 +46,10 @@ public: return _replicas_in_db; } + api::InternalReadConsistency desired_read_consistency() const noexcept { + return _desired_read_consistency; + } + private: class GroupId { public: @@ -94,6 +99,7 @@ private: PersistenceOperationMetricSet& _metric; framework::MilliSecTimer _operationTimer; std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db; + api::InternalReadConsistency _desired_read_consistency; bool _has_replica_inconsistency; void sendReply(DistributorMessageSender& sender); diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index d0572e7dbf8..82dc69156ab 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -25,7 +25,7 @@ public: GetIterCommand(const document::Bucket &bucket, const spi::IteratorId iteratorId, uint32_t maxByteSize); - ~GetIterCommand(); + ~GetIterCommand() override; std::unique_ptr<api::StorageReply> makeReply() override; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index b738a9ecf00..7d0ce26b83d 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -164,6 +164,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd) return tracker; } +namespace { + +spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency consistency) noexcept { + switch (consistency) { + case api::InternalReadConsistency::Strong: return spi::ReadConsistency::STRONG; + case api::InternalReadConsistency::Weak: return spi::ReadConsistency::WEAK; + default: abort(); + } +} + +} + MessageTracker::UP PersistenceThread::handleGet(api::GetCommand& cmd) { @@ -173,6 +185,8 @@ PersistenceThread::handleGet(api::GetCommand& cmd) document::FieldSetRepo repo; document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); + // _context is reset per command, so it's safe to modify it like this. + _context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); spi::GetResult result = _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context); |