aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-17 19:29:43 +0100
committerGitHub <noreply@github.com>2022-01-17 19:29:43 +0100
commit13babe5abd3ecb8ffe95124662d835df157c4bea (patch)
tree173af3cfd1895432866aa195556860618a4867d7 /vespalib
parentd70502fa56157840a2308ab34cc3228ee1a1be81 (diff)
parent5110a68e5e82f8f5382c4c2447dd0226b517ae53 (diff)
Merge pull request #20815 from vespa-engine/havardpe/track-multi-threaded-cpu-usage-per-category
track cpu usage across threads and usage categories
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/cpu_usage/cpu_usage_test.cpp183
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.cpp200
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.h128
3 files changed, 511 insertions, 0 deletions
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
index 98a7bd780a7..5deb467ed17 100644
--- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
+++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
@@ -2,11 +2,14 @@
#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/benchmark_timer.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/testkit/test_kit.h>
+#include <sys/resource.h>
#include <thread>
using namespace vespalib;
+using vespalib::make_string_short::fmt;
bool verbose = false;
size_t loop_cnt = 10;
@@ -16,6 +19,16 @@ using Sampler = vespalib::cpu_usage::ThreadSampler;
//-----------------------------------------------------------------------------
+class EndTime {
+private:
+ steady_time _end_time;
+public:
+ EndTime(duration test_time) : _end_time(steady_clock::now() + test_time) {}
+ bool operator()() const { return (steady_clock::now() >= _end_time); }
+};
+
+//-----------------------------------------------------------------------------
+
void be_busy(duration d) {
if (d > 0ms) {
volatile int tmp = 123;
@@ -91,6 +104,176 @@ TEST("measure thread CPU clock overhead") {
//-----------------------------------------------------------------------------
+void verify_category(CpuUsage::Category cat, size_t idx) {
+ switch (cat) { // make sure we known all categories
+ case CpuUsage::Category::SETUP:
+ case CpuUsage::Category::READ:
+ case CpuUsage::Category::WRITE:
+ case CpuUsage::Category::COMPACT:
+ case CpuUsage::Category::MAINTAIN:
+ case CpuUsage::Category::NETWORK:
+ case CpuUsage::Category::OTHER:
+ EXPECT_EQUAL(CpuUsage::index_of(cat), idx);
+ }
+}
+
+TEST("require that CPU categories are as expected") {
+ TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u));
+ TEST_DO(verify_category(CpuUsage::Category::READ, 1u));
+ TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u));
+ TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u));
+ TEST_DO(verify_category(CpuUsage::Category::MAINTAIN,4u));
+ TEST_DO(verify_category(CpuUsage::Category::NETWORK, 5u));
+ TEST_DO(verify_category(CpuUsage::Category::OTHER, 6u));
+ EXPECT_EQUAL(CpuUsage::num_categories, 7u);
+}
+
+TEST("require that empty sample is zero") {
+ CpuUsage::Sample sample;
+ EXPECT_EQUAL(sample.size(), CpuUsage::num_categories);
+ for (uint32_t i = 0; i < sample.size(); ++i) {
+ EXPECT_EQUAL(sample[i].count(), 0);
+ }
+}
+
+TEST("require that cpu samples can be manipulated and inspected") {
+ CpuUsage::Sample a;
+ CpuUsage::Sample b;
+ const CpuUsage::Sample &c = a;
+ a[CpuUsage::Category::SETUP] = 1ms;
+ a[CpuUsage::Category::READ] = 2ms;
+ a[CpuUsage::Category::WRITE] = 3ms;
+ a[CpuUsage::Category::COMPACT] = 4ms;
+ a[CpuUsage::Category::MAINTAIN] = 5ms;
+ a[CpuUsage::Category::NETWORK] = 6ms;
+ a[CpuUsage::Category::OTHER] = 7ms;
+ for (uint32_t i = 0; i < b.size(); ++i) {
+ b[i] = 10ms * (i + 1);
+ }
+ a.merge(b);
+ for (uint32_t i = 0; i < c.size(); ++i) {
+ EXPECT_EQUAL(c[i], 11ms * (i + 1));
+ }
+ EXPECT_EQUAL(c[CpuUsage::Category::SETUP], 11ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::MAINTAIN], 55ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::NETWORK], 66ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 77ms);
+}
+
+//-----------------------------------------------------------------------------
+
+// prototype for the class we want to use to integrate CPU usage into
+// metrics as load values. NB: this class is not thread safe.
+
+class CpuMonitor {
+private:
+ duration _old_usage;
+ CpuUsage::TimedSample _old_sample;
+ duration _min_delay;
+ std::array<double,CpuUsage::num_categories+1> _load;
+
+ static duration total_usage() {
+ rusage usage;
+ memset(&usage, 0, sizeof(usage));
+ getrusage(RUSAGE_SELF, &usage);
+ return from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime);
+ }
+
+public:
+ CpuMonitor(duration min_delay)
+ : _old_usage(total_usage()),
+ _old_sample(CpuUsage::sample()),
+ _min_delay(min_delay),
+ _load() {}
+
+ std::array<double,CpuUsage::num_categories+1> get_load() {
+ if (steady_clock::now() >= (_old_sample.first + _min_delay)) {
+ auto new_usage = total_usage();
+ auto new_sample = CpuUsage::sample();
+ auto dt = to_s(new_sample.first - _old_sample.first);
+ double sampled_load = 0.0;
+ for (size_t i = 0; i < CpuUsage::num_categories; ++i) {
+ _load[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt;
+ sampled_load += _load[i];
+ }
+ _load[CpuUsage::num_categories] = (to_s(new_usage - _old_usage) / dt) - sampled_load;
+ _old_usage = new_usage;
+ _old_sample = new_sample;
+ }
+ return _load;
+ }
+};
+
+std::array<vespalib::string,CpuUsage::num_categories+1> names
+{ "SETUP", "READ", "WRITE", "COMPACT", "MAINTAIN", "NETWORK", "OTHER", "UNKNOWN" };
+
+void do_sample_cpu_usage(const EndTime &end_time) {
+ CpuMonitor monitor(8ms);
+ while (!end_time()) {
+ std::this_thread::sleep_for(verbose ? 1s : 10ms);
+ auto load = monitor.get_load();
+ vespalib::string body;
+ for (size_t i = 0; i < load.size(); ++i) {
+ if (!body.empty()) {
+ body.append(", ");
+ }
+ body.append(fmt("%s: %.2f", names[i].c_str(), load[i]));
+ }
+ fprintf(stderr, "CPU: %s\n", body.c_str());
+ }
+}
+
+void do_full_work(CpuUsage::Category cat, const EndTime &end_time) {
+ auto my_usage = CpuUsage::use(cat);
+ while (!end_time()) {
+ be_busy(4ms);
+ }
+}
+
+void do_some_work(CpuUsage::Category cat, const EndTime &end_time) {
+ auto my_usage = CpuUsage::use(cat);
+ while (!end_time()) {
+ be_busy(4ms);
+ std::this_thread::sleep_for(4ms);
+ }
+}
+
+void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) {
+ auto my_usage1 = CpuUsage::use(cat1);
+ while (!end_time()) {
+ be_busy(4ms);
+ auto my_usage2 = CpuUsage::use(cat2);
+ be_busy(4ms);
+ }
+}
+
+void do_external_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) {
+ auto my_usage1 = CpuUsage::use(cat1);
+ while (!end_time()) {
+ std::thread thread([cat2](){
+ auto my_usage2 = CpuUsage::use(cat2);
+ be_busy(4ms);
+ });
+ thread.join();
+ }
+}
+
+TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 100ms)) {
+ switch (thread_id) {
+ case 0: return do_sample_cpu_usage(f1);
+ case 1: return do_full_work(CpuUsage::Category::WRITE, f1);
+ case 2: return do_some_work(CpuUsage::Category::READ, f1);
+ case 3: return do_nested_work(CpuUsage::Category::NETWORK, CpuUsage::Category::READ, f1);
+ case 4: return do_external_work(CpuUsage::Category::SETUP, CpuUsage::Category::COMPACT, f1);
+ default: TEST_FATAL("missing thread id case");
+ }
+}
+
+//-----------------------------------------------------------------------------
+
int main(int argc, char **argv) {
TEST_MASTER.init(__FILE__);
if ((argc == 2) && (argv[1] == std::string("verbose"))) {
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
index 4eee0a63870..ca2199275de 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
@@ -3,6 +3,8 @@
#include "cpu_usage.h"
#include "require.h"
#include <pthread.h>
+#include <optional>
+#include <cassert>
namespace vespalib {
@@ -53,4 +55,202 @@ ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_lo
} // cpu_usage
+class CpuUsage::ThreadTrackerImpl : public CpuUsage::ThreadTracker {
+private:
+ SpinLock _lock;
+ uint32_t _cat_idx;
+ duration _old_usage;
+ cpu_usage::ThreadSampler::UP _sampler;
+ Sample _pending;
+
+ using Guard = std::lock_guard<SpinLock>;
+
+ struct Wrapper {
+ std::shared_ptr<ThreadTrackerImpl> self;
+ Wrapper() : self(std::make_shared<ThreadTrackerImpl>()) {
+ CpuUsage::self().add_thread(self);
+ }
+ ~Wrapper() {
+ self->set_category(CpuUsage::num_categories);
+ CpuUsage::self().remove_thread(std::move(self));
+ }
+ };
+
+public:
+ ThreadTrackerImpl()
+ : _lock(),
+ _cat_idx(num_categories),
+ _old_usage(),
+ _sampler(cpu_usage::create_thread_sampler()),
+ _pending()
+ {
+ }
+
+ uint32_t set_category(uint32_t new_cat_idx) {
+ Guard guard(_lock);
+ duration new_usage = _sampler->sample();
+ if (_cat_idx < num_categories) {
+ _pending[_cat_idx] += (new_usage - _old_usage);
+ }
+ _old_usage = new_usage;
+ size_t old_cat_idx = _cat_idx;
+ _cat_idx = new_cat_idx;
+ return old_cat_idx;
+ }
+
+ Sample sample() override {
+ Guard guard(_lock);
+ if (_cat_idx < num_categories) {
+ duration new_usage = _sampler->sample();
+ _pending[_cat_idx] += (new_usage - _old_usage);
+ _old_usage = new_usage;
+ }
+ Sample sample = _pending;
+ _pending = Sample();
+ return sample;
+ }
+
+ static ThreadTrackerImpl &self() {
+ thread_local Wrapper wrapper;
+ return *wrapper.self;
+ }
+};
+
+CpuUsage::MyUsage::MyUsage(Category cat)
+ : _old_cat_idx(ThreadTrackerImpl::self().set_category(index_of(cat)))
+{
+}
+
+CpuUsage::MyUsage::~MyUsage()
+{
+ ThreadTrackerImpl::self().set_category(_old_cat_idx);
+}
+
+CpuUsage::CpuUsage()
+ : _lock(),
+ _usage(),
+ _threads(),
+ _sampling(false),
+ _conflict(),
+ _pending_add(),
+ _pending_remove()
+{
+}
+
+CpuUsage &
+CpuUsage::self()
+{
+ static CpuUsage me;
+ return me;
+}
+
+void
+CpuUsage::do_add_thread(const Guard &, ThreadTracker::SP tracker)
+{
+ assert(!_sampling);
+ auto *key = tracker.get();
+ auto [ignore, was_inserted] = _threads.emplace(key, std::move(tracker));
+ assert(was_inserted);
+}
+
+void
+CpuUsage::do_remove_thread(const Guard &, ThreadTracker::SP tracker)
+{
+ assert(!_sampling);
+ _usage.merge(tracker->sample());
+ auto was_removed = _threads.erase(tracker.get());
+ assert(was_removed);
+}
+
+void
+CpuUsage::add_thread(ThreadTracker::SP tracker)
+{
+ Guard guard(_lock);
+ if (_sampling) {
+ _pending_add.push_back(std::move(tracker));
+ } else {
+ do_add_thread(guard, std::move(tracker));
+ }
+}
+
+void
+CpuUsage::remove_thread(ThreadTracker::SP tracker)
+{
+ Guard guard(_lock);
+ if (_sampling) {
+ _pending_remove.push_back(std::move(tracker));
+ } else {
+ do_remove_thread(guard, std::move(tracker));
+ }
+}
+
+void
+CpuUsage::handle_pending(const Guard &guard)
+{
+ for (auto &thread: _pending_add) {
+ do_add_thread(guard, std::move(thread));
+ }
+ _pending_add.clear();
+ for (auto &thread: _pending_remove) {
+ do_remove_thread(guard, std::move(thread));
+ }
+ _pending_remove.clear();
+}
+
+CpuUsage::TimedSample
+CpuUsage::do_sample()
+{
+ assert(_sampling);
+ Sample my_sample;
+ std::optional<std::promise<TimedSample>> my_promise;
+ auto t = steady_clock::now();
+ for (const auto &entry: _threads) {
+ my_sample.merge(entry.first->sample());
+ }
+ {
+ Guard guard(_lock);
+ _sampling = false;
+ handle_pending(guard);
+ if (_conflict) {
+ my_promise = std::move(_conflict->sample_promise);
+ _conflict.reset();
+ }
+ my_sample.merge(_usage);
+ _usage = my_sample;
+ }
+ TimedSample result{t, my_sample};
+ if (my_promise.has_value()) {
+ my_promise.value().set_value(result);
+ }
+ return result;
+}
+
+CpuUsage::TimedSample
+CpuUsage::sample_or_wait()
+{
+ std::shared_future<TimedSample> my_future;
+ {
+ Guard guard(_lock);
+ if (_sampling) {
+ if (!_conflict) {
+ _conflict = std::make_unique<SampleConflict>();
+ }
+ my_future = _conflict->future_sample;
+ } else {
+ _sampling = true;
+ }
+ }
+ if (my_future.valid()) {
+ return my_future.get();
+ } else {
+ return do_sample();
+ }
+}
+
+CpuUsage::TimedSample
+CpuUsage::sample()
+{
+ return self().sample_or_wait();
+}
+
} // namespace
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h
index 09509a984b5..e3333bac6b7 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.h
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h
@@ -1,7 +1,11 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "spin_lock.h"
#include <vespa/vespalib/util/time.h>
#include <memory>
+#include <future>
+#include <vector>
+#include <map>
namespace vespalib {
@@ -22,4 +26,128 @@ ThreadSampler::UP create_thread_sampler(bool force_mock_impl = false, double exp
} // cpu_usage
+/**
+ * Tracks accumulative cpu usage across threads and work
+ * categories. Use the 'use' function to signal what kind of CPU you
+ * are using in the current thread. Use the 'sample' function to get a
+ * complete view of CPU usage so far.
+ *
+ * The 'use' function returns a MyUsage object that needs to be kept
+ * alive for as long as the current thread should contribute to the
+ * specified cpu use. Note that MyUsage instances may shadow each
+ * other, but must be destructed in reverse construction order.
+ **/
+class CpuUsage
+{
+public:
+ // The kind of work performed by a thread. Used to separate
+ // different kinds of CPU usage. Note that categories are
+ // exclusive/non-overlapping; a thread can only perform one kind
+ // of CPU work at any specific time.
+ enum class Category {
+ SETUP = 0, // usage related to system setup (init/(re-)config/etc.)
+ READ = 1, // usage related to reading data from the system
+ WRITE = 2, // usage related to writing data to the system
+ COMPACT = 3, // usage related to internal data re-structuring
+ MAINTAIN = 4, // usage related to distributed cluster maintainance
+ NETWORK = 5, // usage related to network communication
+ OTHER = 6 // unspecified usage
+ };
+ static constexpr size_t index_of(Category cat) { return static_cast<size_t>(cat); }
+ static constexpr size_t num_categories = 7;
+
+ // A sample contains how much CPU has been spent in various
+ // categories.
+ class Sample {
+ private:
+ std::array<duration,num_categories> _usage;
+ public:
+ Sample() : _usage() {}
+ size_t size() const { return _usage.size(); }
+ duration &operator[](size_t idx) { return _usage[idx]; }
+ duration &operator[](Category cat) { return _usage[index_of(cat)]; }
+ const duration &operator[](size_t idx) const { return _usage[idx]; }
+ const duration &operator[](Category cat) const { return _usage[index_of(cat)]; }
+ void merge(const Sample &rhs) {
+ for (size_t i = 0; i < size(); ++i) {
+ _usage[i] += rhs._usage[i];
+ }
+ }
+ };
+
+ // a sample tagged with the time it was taken
+ using TimedSample = std::pair<steady_time, Sample>;
+
+ // Used when multiple threads call the 'sample' function at the
+ // same time. One thread will sample while the others will wait
+ // for the result.
+ struct SampleConflict {
+ std::promise<TimedSample> sample_promise;
+ std::shared_future<TimedSample> future_sample;
+ SampleConflict() : sample_promise(),
+ future_sample(sample_promise.get_future()) {}
+ };
+
+ // Interface used to perform destructive sampling of the CPU spent
+ // in various categories since the last time it was sampled.
+ struct ThreadTracker {
+ using SP = std::shared_ptr<ThreadTracker>;
+ virtual Sample sample() = 0;
+ virtual ~ThreadTracker() {}
+ };
+ class ThreadTrackerImpl;
+
+ // Used by threads to signal what kind of CPU they are currently
+ // using. The thread will contribute to the declared CPU usage
+ // category while this object lives. MyUsage instances may shadow
+ // each other, but must be destructed in reverse construction
+ // order. The preferred way to use this class is by doing:
+ //
+ // auto my_usage = CpuUsage::use(my_cat);
+ class MyUsage {
+ private:
+ uint32_t _old_cat_idx;
+ public:
+ MyUsage(Category cat);
+ MyUsage(MyUsage &&) = delete;
+ MyUsage(const MyUsage &) = delete;
+ MyUsage &operator=(MyUsage &&) = delete;
+ MyUsage &operator=(const MyUsage &) = delete;
+ ~MyUsage();
+ };
+
+private:
+ SpinLock _lock;
+ Sample _usage;
+ std::map<ThreadTracker*,ThreadTracker::SP> _threads;
+ bool _sampling;
+ std::unique_ptr<SampleConflict> _conflict;
+ std::vector<ThreadTracker::SP> _pending_add;
+ std::vector<ThreadTracker::SP> _pending_remove;
+
+ using Guard = std::lock_guard<SpinLock>;
+
+ CpuUsage();
+ CpuUsage(CpuUsage &&) = delete;
+ CpuUsage(const CpuUsage &) = delete;
+ CpuUsage &operator=(CpuUsage &&) = delete;
+ CpuUsage &operator=(const CpuUsage &) = delete;
+
+ static CpuUsage &self();
+
+ void do_add_thread(const Guard &guard, ThreadTracker::SP tracker);
+ void do_remove_thread(const Guard &guard, ThreadTracker::SP tracker);
+
+ void add_thread(ThreadTracker::SP tracker);
+ void remove_thread(ThreadTracker::SP tracker);
+
+ void handle_pending(const Guard &guard);
+ TimedSample do_sample();
+ TimedSample sample_or_wait();
+
+public:
+ static MyUsage use(Category cat) { return MyUsage(cat); }
+ static TimedSample sample();
+};
+
} // namespace