summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eval/src/vespa/eval/eval/llvm/compile_cache.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp4
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.cpp25
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.h4
-rw-r--r--vespalib/src/vespa/vespalib/util/runnable.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/runnable.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp23
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.h15
9 files changed, 71 insertions, 20 deletions
diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
index 43ed724e010..9d0e921c502 100644
--- a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
+++ b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
@@ -2,6 +2,7 @@
#include "compile_cache.h"
#include <vespa/eval/eval/key_gen.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <thread>
namespace vespalib::eval {
@@ -64,6 +65,7 @@ CompileCache::compile(const Function &function, PassParams pass_params)
assert(res.second);
token = std::make_unique<Token>(res.first, Token::ctor_tag());
task = std::make_unique<CompileTask>(function, pass_params, res.first->second.result);
+ task = CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP);
if (!_executor_stack.empty()) {
executor = _executor_stack.back().second;
}
diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
index 58dc473b85e..0978a0a9bae 100644
--- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
@@ -6,6 +6,7 @@
#include <vespa/vespalib/data/smart_buffer.h>
#include <vespa/vespalib/data/slime/binary_format.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/log/log.h>
@@ -34,12 +35,14 @@ public:
};
VESPA_THREAD_STACK_TAG(match_engine_executor)
+VESPA_THREAD_STACK_TAG(match_engine_thread_bundle)
} // namespace anon
namespace proton {
using namespace vespalib::slime;
+using vespalib::CpuUsage;
MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t distributionKey, bool async)
: _lock(),
@@ -48,8 +51,10 @@ MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t di
_closed(false),
_forward_issues(true),
_handlers(),
- _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki, match_engine_executor),
- _threadBundlePool(std::max(size_t(1), threadsPerSearch)),
+ _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256_Ki,
+ CpuUsage::wrap(match_engine_executor, CpuUsage::Category::READ)),
+ _threadBundlePool(std::max(size_t(1), threadsPerSearch),
+ CpuUsage::wrap(match_engine_thread_bundle, CpuUsage::Category::READ)),
_nodeUp(false),
_nodeMaintenance(false)
{
diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
index 95643c7d1a8..a42fab782cb 100644
--- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
@@ -2,6 +2,7 @@
#include "summaryengine.h"
#include <vespa/vespalib/data/slime/slime.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.summaryengine.summaryengine");
@@ -10,6 +11,7 @@ using namespace search::engine;
using namespace proton;
using vespalib::Memory;
using vespalib::slime::Inspector;
+using vespalib::CpuUsage;
namespace {
@@ -61,7 +63,7 @@ SummaryEngine::SummaryEngine(size_t numThreads, bool async)
_closed(false),
_forward_issues(true),
_handlers(),
- _executor(numThreads, 128_Ki, summary_engine_executor),
+ _executor(numThreads, 128_Ki, CpuUsage::wrap(summary_engine_executor, CpuUsage::Category::READ)),
_metrics(std::make_unique<DocsumMetrics>())
{ }
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
index 5307239118d..97dbedad66f 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
@@ -263,4 +263,29 @@ CpuUsage::sample()
return self().sample_or_wait();
}
+Runnable::init_fun_t
+CpuUsage::wrap(Runnable::init_fun_t init, Category cat)
+{
+ return [init,cat](Runnable &target) {
+ auto my_usage = CpuUsage::use(cat);
+ return init(target);
+ };
+}
+
+Executor::Task::UP
+CpuUsage::wrap(Executor::Task::UP task, Category cat)
+{
+ struct CpuTask : Executor::Task {
+ UP task;
+ Category cat;
+ CpuTask(UP task_in, Category cat_in)
+ : task(std::move(task_in)), cat(cat_in) {}
+ void run() override {
+ auto my_usage = CpuUsage::use(cat);
+ task->run();
+ }
+ };
+ return std::make_unique<CpuTask>(std::move(task), cat);
+}
+
} // namespace
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h
index 87e0a289e87..6fd1a370887 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.h
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h
@@ -1,5 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "runnable.h"
+#include "executor.h"
#include "spin_lock.h"
#include <vespa/vespalib/util/time.h>
#include <vespa/vespalib/stllike/string.h>
@@ -178,6 +180,8 @@ private:
public:
static MyUsage use(Category cat) { return MyUsage(cat); }
static TimedSample sample();
+ static Runnable::init_fun_t wrap(Runnable::init_fun_t init, Category cat);
+ static Executor::Task::UP wrap(Executor::Task::UP task, Category cat);
};
/**
diff --git a/vespalib/src/vespa/vespalib/util/runnable.cpp b/vespalib/src/vespa/vespalib/util/runnable.cpp
index c67c4696d88..3b75efe93f6 100644
--- a/vespalib/src/vespa/vespalib/util/runnable.cpp
+++ b/vespalib/src/vespa/vespalib/util/runnable.cpp
@@ -4,4 +4,11 @@
namespace vespalib {
+int
+Runnable::default_init_function(Runnable &target)
+{
+ target.run();
+ return 1;
+}
+
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/runnable.h b/vespalib/src/vespa/vespalib/util/runnable.h
index 43144ebc2cd..bc89a9decac 100644
--- a/vespalib/src/vespa/vespalib/util/runnable.h
+++ b/vespalib/src/vespa/vespalib/util/runnable.h
@@ -23,6 +23,7 @@ namespace vespalib {
struct Runnable {
using UP = std::unique_ptr<Runnable>;
using init_fun_t = std::function<int(Runnable&)>;
+ static int default_init_function(Runnable &target);
/**
* Entry point called by the running thread
@@ -36,4 +37,3 @@ struct Runnable {
};
} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index ab83d4e05fd..99ab298864f 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -60,9 +60,10 @@ Signal::Signal() noexcept
{}
Signal::~Signal() = default;
-SimpleThreadBundle::Pool::Pool(size_t bundleSize)
+SimpleThreadBundle::Pool::Pool(size_t bundleSize, init_fun_t init_fun)
: _lock(),
_bundleSize(bundleSize),
+ _init_fun(init_fun),
_bundles()
{
}
@@ -86,7 +87,7 @@ SimpleThreadBundle::Pool::obtain()
return ret;
}
}
- return std::make_unique<SimpleThreadBundle>(_bundleSize);
+ return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun);
}
void
@@ -99,7 +100,7 @@ SimpleThreadBundle::Pool::release(SimpleThreadBundle::UP bundle)
//-----------------------------------------------------------------------------
-SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy)
+SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Runnable::init_fun_t init_fun, Strategy strategy)
: _work(),
_signals(),
_workers(),
@@ -134,7 +135,7 @@ SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy)
_hook = std::move(hook);
} else {
size_t signal_idx = (strategy == USE_BROADCAST) ? 0 : (i - 1);
- _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], std::move(hook)));
+ _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], init_fun, std::move(hook)));
}
}
}
@@ -175,19 +176,19 @@ SimpleThreadBundle::run(const std::vector<Runnable*> &targets)
latch.await();
}
-SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::UP h)
- : thread(*this, simple_thread_bundle_executor),
- signal(s),
- hook(std::move(h))
+SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h)
+ : thread(*this, std::move(init_fun)),
+ signal(s),
+ hook(std::move(h))
{
thread.start();
}
+
void
SimpleThreadBundle::Worker::run() {
for (size_t gen = 0; signal.wait(gen) > 0; ) {
- hook->run();
-}
-
+ hook->run();
+ }
}
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
index d9a29ee7bef..b7434d09ac3 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
@@ -86,8 +86,9 @@ struct Signal {
class SimpleThreadBundle : public ThreadBundle
{
public:
- typedef fixed_thread_bundle::Work Work;
- typedef fixed_thread_bundle::Signal Signal;
+ using Work = fixed_thread_bundle::Work;
+ using Signal = fixed_thread_bundle::Signal;
+ using init_fun_t = Runnable::init_fun_t;
typedef std::unique_ptr<SimpleThreadBundle> UP;
enum Strategy { USE_SIGNAL_LIST, USE_SIGNAL_TREE, USE_BROADCAST };
@@ -97,10 +98,12 @@ public:
private:
std::mutex _lock;
size_t _bundleSize;
+ init_fun_t _init_fun;
std::vector<SimpleThreadBundle*> _bundles;
public:
- Pool(size_t bundleSize);
+ Pool(size_t bundleSize, init_fun_t init_fun);
+ Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {}
~Pool();
SimpleThreadBundle::UP obtain();
void release(SimpleThreadBundle::UP bundle);
@@ -112,7 +115,7 @@ private:
Thread thread;
Signal &signal;
Runnable::UP hook;
- Worker(Signal &s, Runnable::UP h);
+ Worker(Signal &s, init_fun_t init_fun, Runnable::UP h);
void run() override;
};
@@ -122,7 +125,9 @@ private:
Runnable::UP _hook;
public:
- SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST);
+ SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy = USE_SIGNAL_LIST);
+ SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST)
+ : SimpleThreadBundle(size, Runnable::default_init_function, strategy) {}
~SimpleThreadBundle();
size_t size() const override;
void run(const std::vector<Runnable*> &targets) override;