diff options
author | Geir Storli <geirst@verizonmedia.com> | 2020-12-15 11:05:00 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2020-12-15 11:06:47 +0000 |
commit | f31f8dffef680bef44e56d2c5e22900533ab272f (patch) | |
tree | dc72fbe4de83f8fbc235bda85a24f244e80a1ea8 | |
parent | 6f479ad61a4a6d973ce6a985e25206e9adfdcfee (diff) |
Add explorer for the ExecutorThreadingService used in a document database.
16 files changed, 206 insertions, 13 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 57445775df3..81dd8a64a6c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -36,6 +36,7 @@ vespa_add_library(searchcore_server STATIC documentsubdbcollection.cpp emptysearchview.cpp executor_thread_service.cpp + executor_threading_service_explorer.cpp executorthreadingservice.cpp fast_access_document_retriever.cpp fast_access_doc_subdb.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp index 0c3772cab5b..a5c3e8d6078 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_explorer.cpp @@ -1,12 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "document_db_explorer.h" - #include "document_meta_store_read_guards.h" #include "document_subdb_collection_explorer.h" +#include "executor_threading_service_explorer.h" #include "maintenance_controller_explorer.h" -#include <vespa/searchcore/proton/common/state_reporter_utils.h> #include <vespa/searchcore/proton/bucketdb/bucket_db_explorer.h> +#include <vespa/searchcore/proton/common/state_reporter_utils.h> #include <vespa/searchcore/proton/matching/session_manager_explorer.h> #include <vespa/vespalib/data/slime/slime.h> @@ -42,6 +42,7 @@ DocumentDBExplorer::get_state(const Inserter &inserter, bool full) const } const vespalib::string SUB_DB = "subdb"; +const vespalib::string THREADING_SERVICE = "threadingservice"; const vespalib::string BUCKET_DB = "bucketdb"; const vespalib::string MAINTENANCE_CONTROLLER = "maintenancecontroller"; const vespalib::string SESSION = "session"; @@ -49,25 +50,24 @@ const vespalib::string SESSION = "session"; std::vector<vespalib::string> DocumentDBExplorer::get_children_names() const { - return {SUB_DB, BUCKET_DB, MAINTENANCE_CONTROLLER, SESSION}; + return {SUB_DB, THREADING_SERVICE, BUCKET_DB, MAINTENANCE_CONTROLLER, SESSION}; } std::unique_ptr<StateExplorer> DocumentDBExplorer::get_child(vespalib::stringref name) const { if (name == SUB_DB) { - return std::unique_ptr<StateExplorer> - (new DocumentSubDBCollectionExplorer(_docDb->getDocumentSubDBs())); + return std::make_unique<DocumentSubDBCollectionExplorer>(_docDb->getDocumentSubDBs()); + } else if (name == THREADING_SERVICE) { + return std::make_unique<ExecutorThreadingServiceExplorer>(_docDb->getWriteService()); } else if (name == BUCKET_DB) { // TODO(geirst): const_cast can be avoided if we add const guard to BucketDBOwner. - return std::unique_ptr<StateExplorer>(new BucketDBExplorer( - (const_cast<DocumentSubDBCollection &>(_docDb->getDocumentSubDBs())).getBucketDB().takeGuard())); + return std::make_unique<BucketDBExplorer>( + (const_cast<DocumentSubDBCollection &>(_docDb->getDocumentSubDBs())).getBucketDB().takeGuard()); } else if (name == MAINTENANCE_CONTROLLER) { - return std::unique_ptr<StateExplorer> - (new MaintenanceControllerExplorer(_docDb->getMaintenanceController().getJobList())); + return std::make_unique<MaintenanceControllerExplorer>(_docDb->getMaintenanceController().getJobList()); } else if (name == SESSION) { - return std::unique_ptr<StateExplorer> - (new matching::SessionManagerExplorer(_docDb->session_manager())); + return std::make_unique<matching::SessionManagerExplorer>(_docDb->session_manager()); } return std::unique_ptr<StateExplorer>(nullptr); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp index 24f5fa9a5e6..49dc038096c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp @@ -81,6 +81,10 @@ void ExecutorThreadService::setTaskLimit(uint32_t taskLimit) { _executor.setTaskLimit(taskLimit); } +uint32_t ExecutorThreadService::getTaskLimit() const { + return _executor.getTaskLimit(); +} + void ExecutorThreadService::wakeup() { _executor.wakeup(); diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index 4f27a8f86c2..325c36b7270 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -39,6 +39,7 @@ public: size_t getNumThreads() const override { return _executor.getNumThreads(); } void setTaskLimit(uint32_t taskLimit) override; + uint32_t getTaskLimit() const override; void wakeup() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp new file mode 100644 index 00000000000..0ecdca54e27 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp @@ -0,0 +1,122 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "executor_threading_service_explorer.h" +#include "executorthreadingservice.h" +#include <vespa/vespalib/data/slime/cursor.h> +#include <vespa/vespalib/util/adaptive_sequenced_executor.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <vespa/vespalib/util/singleexecutor.h> +#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/threadstackexecutor.h> + +using vespalib::AdaptiveSequencedExecutor; +using vespalib::BlockingThreadStackExecutor; +using vespalib::ISequencedTaskExecutor; +using vespalib::SequencedTaskExecutor; +using vespalib::SingleExecutor; +using vespalib::SyncableThreadExecutor; +using vespalib::ThreadStackExecutor; +using vespalib::slime::Cursor; + +namespace proton { + +namespace { + +void +set_type(Cursor& object, const vespalib::string& type) +{ + object.setString("type", type); +} + +void +convert_syncable_executor_to_slime(const SyncableThreadExecutor& executor, const vespalib::string& type, Cursor& object) +{ + set_type(object, type); + object.setLong("num_threads", executor.getNumThreads()); + object.setLong("task_limit", executor.getTaskLimit()); +} + +void +convert_single_executor_to_slime(const SingleExecutor& executor, Cursor& object) +{ + convert_syncable_executor_to_slime(executor, "SingleExecutor", object); + object.setLong("watermark", executor.get_watermark()); + object.setDouble("reaction_time_sec", vespalib::to_s(executor.get_reaction_time())); +} + +void +convert_executor_to_slime(const SyncableThreadExecutor* executor, Cursor& object) +{ + if (executor == nullptr) { + return; + } + if (const auto* single = dynamic_cast<const SingleExecutor*>(executor)) { + convert_single_executor_to_slime(*single, object); + } else if (const auto* blocking = dynamic_cast<const BlockingThreadStackExecutor*>(executor)) { + convert_syncable_executor_to_slime(*blocking, "BlockingThreadStackExecutor", object); + } else if (const auto* thread = dynamic_cast<const ThreadStackExecutor*>(executor)) { + convert_syncable_executor_to_slime(*thread, "ThreadStackExecutor", object); + } else { + convert_syncable_executor_to_slime(*executor, "SyncableThreadExecutor", object); + } +} + +void +convert_sequenced_executor_to_slime(const SequencedTaskExecutor& executor, Cursor& object) +{ + set_type(object, "SequencedTaskExecutor"); + object.setLong("num_executors", executor.getNumExecutors()); + convert_executor_to_slime(executor.first_executor(), object.setObject("executor")); +} + +void +convert_adaptive_executor_to_slime(const AdaptiveSequencedExecutor& executor, Cursor& object) +{ + set_type(object, "AdaptiveSequencedExecutor"); + object.setLong("num_strands", executor.getNumExecutors()); + auto cfg = executor.get_config(); + object.setLong("num_threads", cfg.num_threads); + object.setLong("max_waiting", cfg.max_waiting); + object.setLong("max_pending", cfg.max_pending); + object.setLong("wakeup_limit", cfg.wakeup_limit); +} + +void +convert_executor_to_slime(const ISequencedTaskExecutor* executor, Cursor& object) +{ + if (const auto* seq = dynamic_cast<const SequencedTaskExecutor*>(executor)) { + convert_sequenced_executor_to_slime(*seq, object); + } else if (const auto* ada = dynamic_cast<const AdaptiveSequencedExecutor*>(executor)) { + convert_adaptive_executor_to_slime(*ada, object); + } else { + set_type(object, "ISequencedTaskExecutor"); + object.setLong("num_executors", executor->getNumExecutors()); + } +} + +} + +ExecutorThreadingServiceExplorer::ExecutorThreadingServiceExplorer(ExecutorThreadingService& service) + : _service(service) +{ +} + +ExecutorThreadingServiceExplorer::~ExecutorThreadingServiceExplorer() = default; + +void +ExecutorThreadingServiceExplorer::get_state(const vespalib::slime::Inserter& inserter, bool full) const +{ + auto& object = inserter.insertObject(); + if (full) { + convert_executor_to_slime(&_service.getMasterExecutor(), object.setObject("master")); + convert_executor_to_slime(&_service.getIndexExecutor(), object.setObject("index")); + convert_executor_to_slime(&_service.getSummaryExecutor(), object.setObject("summary")); + convert_executor_to_slime(&_service.indexFieldInverter(), object.setObject("index_field_inverter")); + convert_executor_to_slime(&_service.indexFieldWriter(), object.setObject("index_field_writer")); + convert_executor_to_slime(&_service.attributeFieldWriter(), object.setObject("attribute_field_writer")); + } +} + +} + diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h new file mode 100644 index 00000000000..14d25add8d6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h @@ -0,0 +1,25 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/net/state_explorer.h> + +namespace proton { + +class ExecutorThreadingService; + +/** + * Class used to explore the state of the ExecutorThreadingService used in a document database. + */ +class ExecutorThreadingServiceExplorer : public vespalib::StateExplorer { +private: + ExecutorThreadingService& _service; + +public: + ExecutorThreadingServiceExplorer(ExecutorThreadingService& service); + ~ExecutorThreadingServiceExplorer(); + + void get_state(const vespalib::slime::Inserter& inserter, bool full) const override; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h index 350673b9c90..0b1b9730363 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h @@ -48,6 +48,10 @@ public: _service.setTaskLimit(taskLimit); } + uint32_t getTaskLimit() const override { + return _service.getTaskLimit(); + } + void wakeup() override { _service.wakeup(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 407129199e3..e4b64b19739 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -334,4 +334,11 @@ AdaptiveSequencedExecutor::getStats() return stats; } +AdaptiveSequencedExecutor::Config +AdaptiveSequencedExecutor::get_config() const +{ + auto guard = std::lock_guard(_mutex); + return _cfg; +} + } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index c52b9b22245..ed2209d130a 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -123,7 +123,7 @@ private: }; std::unique_ptr<ThreadTools> _thread_tools; - std::mutex _mutex; + mutable std::mutex _mutex; std::vector<Strand> _strands; vespalib::ArrayQueue<Strand*> _wait_queue; vespalib::ArrayQueue<Worker*> _worker_stack; @@ -149,6 +149,7 @@ public: void sync() override; void setTaskLimit(uint32_t task_limit) override; vespalib::ExecutorStats getStats() override; + Config get_config() const; }; } diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index ff523b1e35d..1862de910ab 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -26,6 +26,7 @@ public: return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); } void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } + uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); } void wakeup() override { } }; diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index d1c6b1aba53..b0c67e14c5b 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -123,4 +123,13 @@ SequencedTaskExecutor::getExecutorId(uint64_t componentId) const { return ExecutorId(executorId); } +const vespalib::SyncableThreadExecutor* +SequencedTaskExecutor::first_executor() const +{ + if (_executors->empty()) { + return nullptr; + } + return _executors->front().get(); +} + } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 050b00ef011..496b183af8a 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -41,6 +41,8 @@ public: */ uint32_t getComponentHashSize() const { return _component2Id.size(); } uint32_t getComponentEffectiveHashSize() const { return _nextId; } + const vespalib::SyncableThreadExecutor* first_executor() const; + private: explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 58cec52b2b0..721df3bf881 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -26,7 +26,9 @@ public: SingleExecutor & sync() override; void wakeup() override; size_t getNumThreads() const override; - uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } + uint32_t getTaskLimit() const override { return _taskLimit.load(std::memory_order_relaxed); } + uint32_t get_watermark() const { return _watermark; } + duration get_reaction_time() const { return _reactionTime; } Stats getStats() override; SingleExecutor & shutdown() override; private: diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h index 61a5d9d5ac7..7d06007d7be 100644 --- a/vespalib/src/vespa/vespalib/util/threadexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h @@ -32,6 +32,11 @@ public: * Sets a new upper limit for accepted number of tasks. */ virtual void setTaskLimit(uint32_t taskLimit) = 0; + + /** + * Gets the limit for accepted number of tasks. + */ + virtual uint32_t getTaskLimit() const = 0; }; /** diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 28ee2da2d8c..01546b80d66 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -174,6 +174,13 @@ ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit) internalSetTaskLimit(taskLimit); } +uint32_t +ThreadStackExecutorBase::getTaskLimit() const +{ + unique_lock guard(_lock); + return _taskLimit; +} + void ThreadStackExecutorBase::wakeup() { // Nothing to do here as workers are always attentive. diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index c86597ca153..c0653c19516 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -211,6 +211,8 @@ public: size_t getNumThreads() const override; void setTaskLimit(uint32_t taskLimit) override; + uint32_t getTaskLimit() const override; + void wakeup() override; /** |