diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-10-04 12:58:21 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-10-08 12:20:52 +0000 |
commit | ecd57816948aeaa143f9686dea971fa01763e2e4 (patch) | |
tree | 8544b9ca2e047a7d1999f3ae7df593b27dc6727a /storage | |
parent | 5bedd856c54812faa780d52236c1cf6394ed3fd9 (diff) |
Allow executing Get operations outside the main distributor thread
Requires _both_ B-tree DB to be used _and_ stale reads to be enabled.
Diffstat (limited to 'storage')
8 files changed, 83 insertions, 11 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 637c214033d..5edd8c23394 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -974,4 +974,28 @@ TEST_F(DistributorTest, pending_to_no_pending_global_merges_edge_immediately_sen do_test_pending_merge_getnodestate_reply_edge(FixedBucketSpaces::global_space()); } +TEST_F(DistributorTest, stale_reads_config_is_propagated_to_external_operation_handler) { + createLinks(true); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + ConfigBuilder builder; + builder.allowStaleReadsDuringClusterStateTransitions = true; + configureDistributor(builder); + EXPECT_TRUE(getExternalOperationHandler().concurrent_gets_enabled()); + + builder.allowStaleReadsDuringClusterStateTransitions = false; + configureDistributor(builder); + EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled()); +} + +TEST_F(DistributorTest, concurrent_reads_not_enabled_if_btree_db_is_not_enabled) { + createLinks(false); + setupDistributor(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + ConfigBuilder builder; + builder.allowStaleReadsDuringClusterStateTransitions = true; + configureDistributor(builder); + EXPECT_FALSE(getExternalOperationHandler().concurrent_gets_enabled()); +} + } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 15820b64ff9..9f0c56c3fa5 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -20,7 +20,7 @@ DistributorTestUtil::DistributorTestUtil() DistributorTestUtil::~DistributorTestUtil() { } void -DistributorTestUtil::createLinks() +DistributorTestUtil::createLinks(bool use_btree_db) { _node.reset(new TestDistributorApp(_config.getConfigId())); _threadPool = framework::TickingThreadPool::createDefault("distributor"); @@ -29,7 +29,7 @@ DistributorTestUtil::createLinks() *_threadPool, *this, true, - false, // TODO swap default + use_btree_db, _hostInfo, &_messageSender)); _component.reset(new storage::DistributorComponent(_node->getComponentRegister(), "distrtestutil")); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 3dc71bcb433..a6e7ea16798 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -34,7 +34,7 @@ public: /** * Sets up the storage link chain. */ - void createLinks(); + void createLinks(bool use_btree_db = false); void setTypeRepo(const std::shared_ptr<const document::DocumentTypeRepo> &repo); void close(); diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index ab6776717aa..d903db4f6ed 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -102,7 +102,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _ownershipSafeTimeCalc( std::make_unique<OwnershipTransferSafeTimePointCalculator>( std::chrono::seconds(0))), // Set by config later - _must_send_updated_host_info(false) + _must_send_updated_host_info(false), + _use_btree_database(use_btree_database) { if (use_btree_database) { LOG(info, "Using new B-tree bucket database implementation instead of legacy implementation"); // TODO remove this once default is swapped @@ -244,6 +245,9 @@ Distributor::sendDown(const std::shared_ptr<api::StorageMessage>& msg) bool Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { + if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) { + return true; + } framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); MBUS_TRACE(msg->getTrace(), 9, "Distributor: Added to message queue. Thread state: " @@ -837,6 +841,9 @@ Distributor::enableNextConfig() _ownershipSafeTimeCalc->setMaxClusterClockSkew(getConfig().getMaxClusterClockSkew()); _pendingMessageTracker.setNodeBusyDuration(getConfig().getInhibitMergesOnBusyNodeDuration()); _bucketDBUpdater.set_stale_reads_enabled(getConfig().allowStaleReadsDuringClusterStateTransitions()); + // Concurrent reads are only safe if the B-tree DB implementation is used. + _externalOperationHandler.set_concurrent_gets_enabled( + _use_btree_database && getConfig().allowStaleReadsDuringClusterStateTransitions()); } void diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 48d9145eec7..1a3a22d28a8 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -328,6 +328,7 @@ private: DistributorHostInfoReporter _hostInfoReporter; std::unique_ptr<OwnershipTransferSafeTimePointCalculator> _ownershipSafeTimeCalc; bool _must_send_updated_host_info; + bool _use_btree_database; }; } // distributor diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 6b476ae37c5..c48dad70357 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -35,7 +35,8 @@ ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, _operationGenerator(gen), _rejectFeedBeforeTimeReached(), // At epoch _non_main_thread_ops_mutex(), - _non_main_thread_ops_owner(owner, getClock()) + _non_main_thread_ops_owner(owner, getClock()), + _concurrent_gets_enabled(false) { } @@ -290,8 +291,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) return true; } -IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) -{ +std::shared_ptr<Operation> ExternalOperationHandler::try_generate_get_operation(const std::shared_ptr<api::GetCommand>& cmd) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); auto& metrics = getMetrics().gets[cmd->getLoadType()]; auto snapshot = getDistributor().read_snapshot_for_bucket(bucket); @@ -304,13 +304,18 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) bounce_with_wrong_distribution(*cmd, *snapshot.context().default_active_cluster_state()); metrics.failures.wrongdistributor.inc(); // TODO thread safety for updates } - return true; + return std::shared_ptr<Operation>(); } // The snapshot is aware of whether stale reads are enabled, so we don't have to check that here. const auto* space_repo = snapshot.bucket_space_repo(); assert(space_repo != nullptr); - _op = std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()), - snapshot.steal_read_guard(), cmd, metrics); + return std::make_shared<GetOperation>(*this, space_repo->get(bucket.getBucketSpace()), + snapshot.steal_read_guard(), cmd, metrics); +} + +IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) +{ + _op = try_generate_get_operation(cmd); return true; } @@ -345,4 +350,25 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor) return true; } +bool ExternalOperationHandler::try_handle_message_outside_main_thread(const std::shared_ptr<api::StorageMessage>& msg) { + if (!concurrent_gets_enabled()) { + return false; + } + const auto type_id = msg->getType().getId(); + if (type_id == api::MessageType::GET_ID) { + auto op = try_generate_get_operation(std::dynamic_pointer_cast<api::GetCommand>(msg)); + if (op) { + std::lock_guard g(_non_main_thread_ops_mutex); + _non_main_thread_ops_owner.start(std::move(op), msg->getPriority()); + } + return true; + } else if (type_id == api::MessageType::GET_REPLY_ID) { + std::lock_guard g(_non_main_thread_ops_mutex); + // The Get for which this reply was created may have been sent by someone outside + // the ExternalOperationHandler, such as TwoPhaseUpdateOperation. Pass it on if so. + return _non_main_thread_ops_owner.handleReply(std::dynamic_pointer_cast<api::StorageReply>(msg)); + } + return false; +} + } diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index b64b4bc90cd..2c1a87267eb 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -7,6 +7,7 @@ #include <vespa/vdslib/state/clusterstate.h> #include <vespa/storage/distributor/distributorcomponent.h> #include <vespa/storageapi/messageapi/messagehandler.h> +#include <atomic> #include <chrono> #include <mutex> @@ -51,6 +52,17 @@ public: _rejectFeedBeforeTimeReached = timePoint; } + // Returns true iff message was handled and should not be processed further by the caller. + bool try_handle_message_outside_main_thread(const std::shared_ptr<api::StorageMessage>& msg); + + void set_concurrent_gets_enabled(bool enabled) noexcept { + _concurrent_gets_enabled.store(enabled, std::memory_order_relaxed); + } + + bool concurrent_gets_enabled() const noexcept { + return _concurrent_gets_enabled.load(std::memory_order_relaxed); + } + private: const MaintenanceOperationGenerator& _operationGenerator; OperationSequencer _mutationSequencer; @@ -58,6 +70,7 @@ private: TimePoint _rejectFeedBeforeTimeReached; mutable std::mutex _non_main_thread_ops_mutex; OperationOwner _non_main_thread_ops_owner; + std::atomic<bool> _concurrent_gets_enabled; template <typename Func> void bounce_or_invoke_read_only_op(api::StorageCommand& cmd, @@ -72,6 +85,7 @@ private: const lib::ClusterState& current_state, const lib::ClusterState& pending_state); void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result); + std::shared_ptr<Operation> try_generate_get_operation(const std::shared_ptr<api::GetCommand>&); bool checkSafeTimeReached(api::StorageCommand& cmd); api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime); diff --git a/storage/src/vespa/storage/distributor/operationstarter.h b/storage/src/vespa/storage/distributor/operationstarter.h index ee938d02226..75c794db7c7 100644 --- a/storage/src/vespa/storage/distributor/operationstarter.h +++ b/storage/src/vespa/storage/distributor/operationstarter.h @@ -11,7 +11,7 @@ class OperationStarter { public: using Priority = uint8_t; - virtual ~OperationStarter() {} + virtual ~OperationStarter() = default; virtual bool start(const std::shared_ptr<Operation>& operation, Priority priority) = 0; }; |