diff options
9 files changed, 71 insertions, 20 deletions
diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp index 43ed724e010..9d0e921c502 100644 --- a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp +++ b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp @@ -2,6 +2,7 @@ #include "compile_cache.h" #include <vespa/eval/eval/key_gen.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <thread> namespace vespalib::eval { @@ -64,6 +65,7 @@ CompileCache::compile(const Function &function, PassParams pass_params) assert(res.second); token = std::make_unique<Token>(res.first, Token::ctor_tag()); task = std::make_unique<CompileTask>(function, pass_params, res.first->second.result); + task = CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP); if (!_executor_stack.empty()) { executor = _executor_stack.back().second; } diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp index 58dc473b85e..0978a0a9bae 100644 --- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp @@ -6,6 +6,7 @@ #include <vespa/vespalib/data/smart_buffer.h> #include <vespa/vespalib/data/slime/binary_format.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/log/log.h> @@ -34,12 +35,14 @@ public: }; VESPA_THREAD_STACK_TAG(match_engine_executor) +VESPA_THREAD_STACK_TAG(match_engine_thread_bundle) } // namespace anon namespace proton { using namespace vespalib::slime; +using vespalib::CpuUsage; MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t distributionKey, bool async) : _lock(), @@ -48,8 +51,10 @@ MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t di _closed(false), _forward_issues(true), _handlers(), - _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki, match_engine_executor), - _threadBundlePool(std::max(size_t(1), threadsPerSearch)), + _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki, + CpuUsage::wrap(match_engine_executor, CpuUsage::Category::READ)), + _threadBundlePool(std::max(size_t(1), threadsPerSearch), + CpuUsage::wrap(match_engine_thread_bundle, CpuUsage::Category::READ)), _nodeUp(false), _nodeMaintenance(false) { diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp index 95643c7d1a8..a42fab782cb 100644 --- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp @@ -2,6 +2,7 @@ #include "summaryengine.h" #include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/log/log.h> LOG_SETUP(".proton.summaryengine.summaryengine"); @@ -10,6 +11,7 @@ using namespace search::engine; using namespace proton; using vespalib::Memory; using vespalib::slime::Inspector; +using vespalib::CpuUsage; namespace { @@ -61,7 +63,7 @@ SummaryEngine::SummaryEngine(size_t numThreads, bool async) _closed(false), _forward_issues(true), _handlers(), - _executor(numThreads, 128_Ki, summary_engine_executor), + _executor(numThreads, 128_Ki, CpuUsage::wrap(summary_engine_executor, CpuUsage::Category::READ)), _metrics(std::make_unique<DocsumMetrics>()) { } diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp index 5307239118d..97dbedad66f 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp @@ -263,4 +263,29 @@ CpuUsage::sample() return self().sample_or_wait(); } +Runnable::init_fun_t +CpuUsage::wrap(Runnable::init_fun_t init, Category cat) +{ + return [init,cat](Runnable &target) { + auto my_usage = CpuUsage::use(cat); + return init(target); + }; +} + +Executor::Task::UP +CpuUsage::wrap(Executor::Task::UP task, Category cat) +{ + struct CpuTask : Executor::Task { + UP task; + Category cat; + CpuTask(UP task_in, Category cat_in) + : task(std::move(task_in)), cat(cat_in) {} + void run() override { + auto my_usage = CpuUsage::use(cat); + task->run(); + } + }; + return std::make_unique<CpuTask>(std::move(task), cat); +} + } // namespace diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h index 87e0a289e87..6fd1a370887 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.h +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "runnable.h" +#include "executor.h" #include "spin_lock.h" #include <vespa/vespalib/util/time.h> #include <vespa/vespalib/stllike/string.h> @@ -178,6 +180,8 @@ private: public: static MyUsage use(Category cat) { return MyUsage(cat); } static TimedSample sample(); + static Runnable::init_fun_t wrap(Runnable::init_fun_t init, Category cat); + static Executor::Task::UP wrap(Executor::Task::UP task, Category cat); }; /** diff --git a/vespalib/src/vespa/vespalib/util/runnable.cpp b/vespalib/src/vespa/vespalib/util/runnable.cpp index c67c4696d88..3b75efe93f6 100644 --- a/vespalib/src/vespa/vespalib/util/runnable.cpp +++ b/vespalib/src/vespa/vespalib/util/runnable.cpp @@ -4,4 +4,11 @@ namespace vespalib { +int +Runnable::default_init_function(Runnable &target) +{ + target.run(); + return 1; +} + } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/runnable.h b/vespalib/src/vespa/vespalib/util/runnable.h index 43144ebc2cd..bc89a9decac 100644 --- a/vespalib/src/vespa/vespalib/util/runnable.h +++ b/vespalib/src/vespa/vespalib/util/runnable.h @@ -23,6 +23,7 @@ namespace vespalib { struct Runnable { using UP = std::unique_ptr<Runnable>; using init_fun_t = std::function<int(Runnable&)>; + static int default_init_function(Runnable &target); /** * Entry point called by the running thread @@ -36,4 +37,3 @@ struct Runnable { }; } // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index ab83d4e05fd..99ab298864f 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -60,9 +60,10 @@ Signal::Signal() noexcept {} Signal::~Signal() = default; -SimpleThreadBundle::Pool::Pool(size_t bundleSize) +SimpleThreadBundle::Pool::Pool(size_t bundleSize, init_fun_t init_fun) : _lock(), _bundleSize(bundleSize), + _init_fun(init_fun), _bundles() { } @@ -86,7 +87,7 @@ SimpleThreadBundle::Pool::obtain() return ret; } } - return std::make_unique<SimpleThreadBundle>(_bundleSize); + return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun); } void @@ -99,7 +100,7 @@ SimpleThreadBundle::Pool::release(SimpleThreadBundle::UP bundle) //----------------------------------------------------------------------------- -SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy) +SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Runnable::init_fun_t init_fun, Strategy strategy) : _work(), _signals(), _workers(), @@ -134,7 +135,7 @@ SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy) _hook = std::move(hook); } else { size_t signal_idx = (strategy == USE_BROADCAST) ? 0 : (i - 1); - _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], std::move(hook))); + _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], init_fun, std::move(hook))); } } } @@ -175,19 +176,19 @@ SimpleThreadBundle::run(const std::vector<Runnable*> &targets) latch.await(); } -SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::UP h) - : thread(*this, simple_thread_bundle_executor), - signal(s), - hook(std::move(h)) +SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h) + : thread(*this, std::move(init_fun)), + signal(s), + hook(std::move(h)) { thread.start(); } + void SimpleThreadBundle::Worker::run() { for (size_t gen = 0; signal.wait(gen) > 0; ) { - hook->run(); -} - + hook->run(); + } } } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h index d9a29ee7bef..b7434d09ac3 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h @@ -86,8 +86,9 @@ struct Signal { class SimpleThreadBundle : public ThreadBundle { public: - typedef fixed_thread_bundle::Work Work; - typedef fixed_thread_bundle::Signal Signal; + using Work = fixed_thread_bundle::Work; + using Signal = fixed_thread_bundle::Signal; + using init_fun_t = Runnable::init_fun_t; typedef std::unique_ptr<SimpleThreadBundle> UP; enum Strategy { USE_SIGNAL_LIST, USE_SIGNAL_TREE, USE_BROADCAST }; @@ -97,10 +98,12 @@ public: private: std::mutex _lock; size_t _bundleSize; + init_fun_t _init_fun; std::vector<SimpleThreadBundle*> _bundles; public: - Pool(size_t bundleSize); + Pool(size_t bundleSize, init_fun_t init_fun); + Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {} ~Pool(); SimpleThreadBundle::UP obtain(); void release(SimpleThreadBundle::UP bundle); @@ -112,7 +115,7 @@ private: Thread thread; Signal &signal; Runnable::UP hook; - Worker(Signal &s, Runnable::UP h); + Worker(Signal &s, init_fun_t init_fun, Runnable::UP h); void run() override; }; @@ -122,7 +125,9 @@ private: Runnable::UP _hook; public: - SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST); + SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy = USE_SIGNAL_LIST); + SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST) + : SimpleThreadBundle(size, Runnable::default_init_function, strategy) {} ~SimpleThreadBundle(); size_t size() const override; void run(const std::vector<Runnable*> &targets) override; |