diff options
author | Håvard Pettersen <havardpe@oath.com> | 2022-01-20 12:52:49 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2022-01-21 14:57:53 +0000 |
commit | 4ea7718e942e69fd6b0a52fe373dc3493a389a16 (patch) | |
tree | 833cd5c65309a22bc96463ac01b3196400fdd855 /vespalib | |
parent | 645a093e74c383d1632681e78b0d8f179dcc763e (diff) |
improve cpu usage tracking
- add cpu_usage::RUsage primitive used for getrusage
- reduce the number of categories (small initial set)
- use RUsage to fill OTHER category (more complete samples)
- threads no longer track OTHER category explicitly (less overhead)
- changing cpu category to itself is now nop (less overhead)
- make ThreadTracker and SampleConflict private (less API clutter)
- improved testing (sample conflict resolution)
- improves testing (thread tracker implementation)
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/cpu_usage/cpu_usage_test.cpp | 289 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/thread_meets.cpp | 12 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/thread_meets.h | 34 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/cpu_usage.cpp | 133 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/cpu_usage.h | 88 |
6 files changed, 420 insertions, 137 deletions
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp index 5deb467ed17..b2da9db857c 100644 --- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp +++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp @@ -3,12 +3,14 @@ #include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/test/thread_meets.h> #include <vespa/vespalib/testkit/test_kit.h> #include <sys/resource.h> #include <thread> using namespace vespalib; +using namespace vespalib::test; using vespalib::make_string_short::fmt; bool verbose = false; @@ -58,18 +60,28 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*> TEST_BARRIER(); // #1 auto t0 = steady_clock::now(); std::vector<duration> pre_usage = sample(samplers); + auto pre_total = cpu_usage::RUsage::sample(); TEST_BARRIER(); // #2 TEST_BARRIER(); // #3 auto t1 = steady_clock::now(); std::vector<duration> post_usage = sample(samplers); + auto post_total = cpu_usage::RUsage::sample(); TEST_BARRIER(); // #4 double wall = to_s(t1 - t0); std::vector<double> load(4, 0.0); for (size_t i = 0; i < 4; ++i) { load[i] = to_s(post_usage[i] - pre_usage[i]) / wall; } + double user_load = to_s(post_total.user - pre_total.user) / wall; + double system_load = to_s(post_total.system - pre_total.system) / wall; + double total_load = to_s(post_total.total() - pre_total.total()) / wall; EXPECT_GREATER(load[3], load[0]); + // NB: cannot expect total_load to be greater than load[3] + // here due to mock loads being 'as expected' while valgrind + // will cut all loads in about half. + EXPECT_GREATER(total_load, load[0]); fprintf(stderr, "loads: { %.2f, %.2f, %.2f, %.2f }\n", load[0], load[1], load[2], load[3]); + fprintf(stderr, "total load: %.2f (user: %.2f, system: %.2f)\n", total_load, user_load, system_load); } else { int idx = (thread_id - 1); double target_load = double(thread_id - 1) / (num_threads - 2); @@ -102,30 +114,33 @@ TEST("measure thread CPU clock overhead") { fprintf(stderr, "approx overhead per sample (thread CPU clock): %f us\n", min_time_us); } +TEST("measure RUsage overhead") { + duration d; + double min_time_us = BenchmarkTimer::benchmark([&d]() noexcept { d = cpu_usage::RUsage::sample().total(); }, budget) * 1000000.0; + fprintf(stderr, "approx overhead per RUsage sample: %f us\n", min_time_us); +} + //----------------------------------------------------------------------------- -void verify_category(CpuUsage::Category cat, size_t idx) { +void verify_category(CpuUsage::Category cat, size_t idx, const vespalib::string &name) { switch (cat) { // make sure we known all categories case CpuUsage::Category::SETUP: case CpuUsage::Category::READ: case CpuUsage::Category::WRITE: case CpuUsage::Category::COMPACT: - case CpuUsage::Category::MAINTAIN: - case CpuUsage::Category::NETWORK: case CpuUsage::Category::OTHER: EXPECT_EQUAL(CpuUsage::index_of(cat), idx); + EXPECT_EQUAL(CpuUsage::name_of(cat), name); } } TEST("require that CPU categories are as expected") { - TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u)); - TEST_DO(verify_category(CpuUsage::Category::READ, 1u)); - TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u)); - TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u)); - TEST_DO(verify_category(CpuUsage::Category::MAINTAIN,4u)); - TEST_DO(verify_category(CpuUsage::Category::NETWORK, 5u)); - TEST_DO(verify_category(CpuUsage::Category::OTHER, 6u)); - EXPECT_EQUAL(CpuUsage::num_categories, 7u); + TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u, "setup")); + TEST_DO(verify_category(CpuUsage::Category::READ, 1u, "read")); + TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u, "write")); + TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u, "compact")); + TEST_DO(verify_category(CpuUsage::Category::OTHER, 4u, "other")); + EXPECT_EQUAL(CpuUsage::num_categories, 5u); } TEST("require that empty sample is zero") { @@ -144,9 +159,7 @@ TEST("require that cpu samples can be manipulated and inspected") { a[CpuUsage::Category::READ] = 2ms; a[CpuUsage::Category::WRITE] = 3ms; a[CpuUsage::Category::COMPACT] = 4ms; - a[CpuUsage::Category::MAINTAIN] = 5ms; - a[CpuUsage::Category::NETWORK] = 6ms; - a[CpuUsage::Category::OTHER] = 7ms; + a[CpuUsage::Category::OTHER] = 5ms; for (uint32_t i = 0; i < b.size(); ++i) { b[i] = 10ms * (i + 1); } @@ -158,59 +171,241 @@ TEST("require that cpu samples can be manipulated and inspected") { EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms); EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms); EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms); - EXPECT_EQUAL(c[CpuUsage::Category::MAINTAIN], 55ms); - EXPECT_EQUAL(c[CpuUsage::Category::NETWORK], 66ms); - EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 77ms); + EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 55ms); } //----------------------------------------------------------------------------- -// prototype for the class we want to use to integrate CPU usage into -// metrics as load values. NB: this class is not thread safe. +struct CpuUsage::Test { + struct BlockingTracker : ThreadTracker { + std::atomic<size_t> called; + ThreadMeets::Nop sync_entry; + ThreadMeets::Swap<Sample> swap_sample; + BlockingTracker() + : called(0), sync_entry(2), swap_sample() {} + Sample sample() noexcept override { + if (called++) { + return Sample(); + } + sync_entry(); + return swap_sample(Sample()); + } + }; + struct SimpleTracker : ThreadTracker { + Sample my_sample; + std::atomic<size_t> called; + SimpleTracker(Sample sample) noexcept + : my_sample(sample), called(0) {} + Sample sample() noexcept override { + ++called; + return my_sample; + } + }; + struct Fixture { + CpuUsage my_usage; + std::shared_ptr<BlockingTracker> blocking; + std::vector<std::shared_ptr<SimpleTracker>> simple_list; + Fixture() : my_usage() {} + void add_blocking() { + ASSERT_TRUE(!blocking); + blocking = std::make_unique<BlockingTracker>(); + my_usage.add_thread(blocking); + } + void add_simple(Sample sample) { + auto simple = std::make_shared<SimpleTracker>(sample); + simple_list.push_back(simple); + my_usage.add_thread(simple); + } + void add_remove_simple(Sample sample) { + auto simple = std::make_shared<SimpleTracker>(sample); + my_usage.add_thread(simple); + my_usage.remove_thread(simple); + } + size_t count_threads() { + Guard guard(my_usage._lock); + return my_usage._threads.size(); + } + size_t count_simple_samples() { + size_t result = 0; + for (const auto &simple: simple_list) { + result += simple->called; + } + return result; + } + TimedSample sample() { return my_usage.sample_or_wait(); } + ~Fixture() { + if (blocking) { + my_usage.remove_thread(std::move(blocking)); + } + for (auto &simple: simple_list) { + my_usage.remove_thread(std::move(simple)); + } + ASSERT_EQUAL(count_threads(), 0u); + } + }; + struct TrackerImpl { + ThreadTrackerImpl impl; + TrackerImpl(cpu_usage::ThreadSampler::UP sampler) + : impl(std::move(sampler)) {} + CpuUsage::Sample sample() { return impl.sample(); } + CpuUsage::Category set_category(CpuUsage::Category cat) { return impl.set_category(cat); } + }; +}; + +TEST_F("require that CpuUsage sample calls sample on thread trackers", CpuUsage::Test::Fixture()) { + CpuUsage::Sample sample; + sample[CpuUsage::Category::READ] = 10ms; + f1.add_simple(sample); + f1.add_simple(sample); + f1.add_simple(sample); + EXPECT_EQUAL(f1.count_threads(), 3u); + auto result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); + EXPECT_EQUAL(f1.count_simple_samples(), 3u); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(60ms)); + EXPECT_EQUAL(f1.count_simple_samples(), 6u); +} + +TEST_F("require that threads added and removed between CpuUsage sample calls are tracked", CpuUsage::Test::Fixture()) { + CpuUsage::Sample sample; + sample[CpuUsage::Category::READ] = 10ms; + auto result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(0ms)); + f1.add_remove_simple(sample); + f1.add_remove_simple(sample); + f1.add_remove_simple(sample); + EXPECT_EQUAL(f1.count_threads(), 0u); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); +} + +TEST_MT_FF("require that sample conflicts are resolved correctly", 4, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(3)) { + if (thread_id == 0) { + CpuUsage::Sample s1; + s1[CpuUsage::Category::SETUP] = 10ms; + CpuUsage::Sample s2; + s2[CpuUsage::Category::READ] = 20ms; + CpuUsage::Sample s3; + s3[CpuUsage::Category::WRITE] = 30ms; + CpuUsage::Sample s4; + s4[CpuUsage::Category::COMPACT] = 40ms; + f1.add_blocking(); + f1.add_simple(s1); // should be sampled + TEST_BARRIER(); // #1 + f1.blocking->sync_entry(); + f1.add_simple(s2); // should NOT be sampled (pending add) + f1.add_remove_simple(s3); // should be sampled (pending remove); + EXPECT_EQUAL(f1.count_threads(), 2u); + f1.blocking->swap_sample(s4); + TEST_BARRIER(); // #2 + EXPECT_EQUAL(f1.count_threads(), 3u); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::SETUP], duration(10ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::READ], duration(0ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::WRITE], duration(30ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::COMPACT], duration(40ms)); + for (size_t i = 1; i < 3; ++i) { + EXPECT_EQUAL(f2[i].first, f2[0].first); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::SETUP], f2[0].second[CpuUsage::Category::SETUP]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::READ], f2[0].second[CpuUsage::Category::READ]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::WRITE], f2[0].second[CpuUsage::Category::WRITE]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::COMPACT], f2[0].second[CpuUsage::Category::COMPACT]); + } + } else { + TEST_BARRIER(); // #1 + f2[thread_id - 1] = f1.sample(); + TEST_BARRIER(); // #2 + } +} + +//----------------------------------------------------------------------------- + +struct DummySampler : public cpu_usage::ThreadSampler { + duration &ref; + DummySampler(duration &ref_in) : ref(ref_in) {} + duration sample() const noexcept override { return ref; } +}; + +TEST("require that thread tracker implementation can track cpu use") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + t += 10ms; + tracker.set_category(CpuUsage::Category::SETUP); + t += 15ms; + tracker.set_category(CpuUsage::Category::READ); + t += 10ms; + auto sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(15ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(10ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(0ms)); + t += 15ms; + tracker.set_category(CpuUsage::Category::WRITE); + t += 10ms; + sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(0ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(15ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(10ms)); +} + +TEST("require that thread tracker implementation reports previous CPU category") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::OTHER), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::SETUP))); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::SETUP), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ))); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::READ), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ))); +} + +TEST("require that thread tracker implementation does not track OTHER cpu use") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + t += 10ms; + tracker.set_category(CpuUsage::Category::OTHER); + t += 15ms; + tracker.set_category(CpuUsage::Category::READ); + tracker.set_category(CpuUsage::Category::OTHER); + t += 15ms; + auto sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(0ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::OTHER], duration(0ms)); +} + +//----------------------------------------------------------------------------- + +// prototype for the class we want to use when integrating CPU usage +// into metrics as load values. NB: this class is not thread safe. class CpuMonitor { private: - duration _old_usage; - CpuUsage::TimedSample _old_sample; duration _min_delay; - std::array<double,CpuUsage::num_categories+1> _load; - - static duration total_usage() { - rusage usage; - memset(&usage, 0, sizeof(usage)); - getrusage(RUSAGE_SELF, &usage); - return from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime); - } + CpuUsage::TimedSample _old_sample; + std::array<double,CpuUsage::num_categories> _load; public: CpuMonitor(duration min_delay) - : _old_usage(total_usage()), + : _min_delay(min_delay), _old_sample(CpuUsage::sample()), - _min_delay(min_delay), _load() {} - std::array<double,CpuUsage::num_categories+1> get_load() { + std::array<double,CpuUsage::num_categories> get_load() { if (steady_clock::now() >= (_old_sample.first + _min_delay)) { - auto new_usage = total_usage(); auto new_sample = CpuUsage::sample(); auto dt = to_s(new_sample.first - _old_sample.first); - double sampled_load = 0.0; for (size_t i = 0; i < CpuUsage::num_categories; ++i) { _load[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt; - sampled_load += _load[i]; } - _load[CpuUsage::num_categories] = (to_s(new_usage - _old_usage) / dt) - sampled_load; - _old_usage = new_usage; _old_sample = new_sample; } return _load; } }; -std::array<vespalib::string,CpuUsage::num_categories+1> names -{ "SETUP", "READ", "WRITE", "COMPACT", "MAINTAIN", "NETWORK", "OTHER", "UNKNOWN" }; - void do_sample_cpu_usage(const EndTime &end_time) { + auto my_usage = CpuUsage::use(CpuUsage::Category::SETUP); CpuMonitor monitor(8ms); while (!end_time()) { std::this_thread::sleep_for(verbose ? 1s : 10ms); @@ -220,7 +415,7 @@ void do_sample_cpu_usage(const EndTime &end_time) { if (!body.empty()) { body.append(", "); } - body.append(fmt("%s: %.2f", names[i].c_str(), load[i])); + body.append(fmt("%s: %.2f", CpuUsage::name_of(CpuUsage::Category(i)).c_str(), load[i])); } fprintf(stderr, "CPU: %s\n", body.c_str()); } @@ -250,11 +445,11 @@ void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndT } } -void do_external_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) { - auto my_usage1 = CpuUsage::use(cat1); +void do_external_work(CpuUsage::Category cat, const EndTime &end_time) { + auto my_usage1 = CpuUsage::use(CpuUsage::Category::SETUP); while (!end_time()) { - std::thread thread([cat2](){ - auto my_usage2 = CpuUsage::use(cat2); + std::thread thread([cat](){ + auto my_usage2 = CpuUsage::use(cat); be_busy(4ms); }); thread.join(); @@ -266,8 +461,8 @@ TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 10 case 0: return do_sample_cpu_usage(f1); case 1: return do_full_work(CpuUsage::Category::WRITE, f1); case 2: return do_some_work(CpuUsage::Category::READ, f1); - case 3: return do_nested_work(CpuUsage::Category::NETWORK, CpuUsage::Category::READ, f1); - case 4: return do_external_work(CpuUsage::Category::SETUP, CpuUsage::Category::COMPACT, f1); + case 3: return do_nested_work(CpuUsage::Category::OTHER, CpuUsage::Category::READ, f1); + case 4: return do_external_work(CpuUsage::Category::COMPACT, f1); default: TEST_FATAL("missing thread id case"); } } diff --git a/vespalib/src/vespa/vespalib/test/CMakeLists.txt b/vespalib/src/vespa/vespalib/test/CMakeLists.txt index d658b28f94b..a60eb15a4d4 100644 --- a/vespalib/src/vespa/vespalib/test/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/test/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_library(vespalib_vespalib_test OBJECT make_tls_options_for_testing.cpp memory_allocator_observer.cpp peer_policy_utils.cpp + thread_meets.cpp time_tracer.cpp DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.cpp b/vespalib/src/vespa/vespalib/test/thread_meets.cpp new file mode 100644 index 00000000000..9d23e0eab28 --- /dev/null +++ b/vespalib/src/vespa/vespalib/test/thread_meets.cpp @@ -0,0 +1,12 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "thread_meets.h" + +namespace vespalib::test { + +void +ThreadMeets::Nop::mingle() +{ +} + +} diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.h b/vespalib/src/vespa/vespalib/test/thread_meets.h new file mode 100644 index 00000000000..62ca7779935 --- /dev/null +++ b/vespalib/src/vespa/vespalib/test/thread_meets.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/rendezvous.h> + +namespace vespalib::test { + +/** + * Generally useful rendezvous implementations. + **/ +struct ThreadMeets { + // can be used as a simple thread barrier + struct Nop : vespalib::Rendezvous<bool,bool> { + Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {} + void operator()() { rendezvous(false); } + void mingle() override; + }; + // swap values between 2 threads + template <typename T> + struct Swap : vespalib::Rendezvous<T,T> { + using vespalib::Rendezvous<T,T>::in; + using vespalib::Rendezvous<T,T>::out; + using vespalib::Rendezvous<T,T>::rendezvous; + Swap() : vespalib::Rendezvous<T,T>(2) {} + T operator()(T input) { return rendezvous(input); } + void mingle() override { + out(1) = in(0); + out(0) = in(1); + } + }; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp index ca2199275de..5f9e1f521b9 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp @@ -6,6 +6,8 @@ #include <optional> #include <cassert> +#include <sys/resource.h> + namespace vespalib { namespace cpu_usage { @@ -18,7 +20,7 @@ private: double _load; public: DummyThreadSampler(double load) : _start(steady_clock::now()), _load(load) {} - duration sample() const override { + duration sample() const noexcept override { return from_s(to_s(steady_clock::now() - _start) * _load); } }; @@ -32,9 +34,10 @@ public: LinuxThreadSampler() : _my_clock() { REQUIRE_EQ(pthread_getcpuclockid(pthread_self(), &_my_clock), 0); } - duration sample() const override { + duration sample() const noexcept override { timespec ts; - REQUIRE_EQ(clock_gettime(_my_clock, &ts), 0); + memset(&ts, 0, sizeof(ts)); + clock_gettime(_my_clock, &ts); return from_timespec(ts); } }; @@ -43,6 +46,15 @@ public: } // <unnamed> +RUsage +RUsage::sample() noexcept +{ + rusage usage; + memset(&usage, 0, sizeof(usage)); + getrusage(RUSAGE_SELF, &usage); + return {from_timeval(usage.ru_utime), from_timeval(usage.ru_stime)}; +} + ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_load) { if (force_mock_impl) { return std::make_unique<DummyThreadSampler>(expected_load); @@ -55,75 +67,69 @@ ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_lo } // cpu_usage -class CpuUsage::ThreadTrackerImpl : public CpuUsage::ThreadTracker { -private: - SpinLock _lock; - uint32_t _cat_idx; - duration _old_usage; - cpu_usage::ThreadSampler::UP _sampler; - Sample _pending; - - using Guard = std::lock_guard<SpinLock>; - - struct Wrapper { - std::shared_ptr<ThreadTrackerImpl> self; - Wrapper() : self(std::make_shared<ThreadTrackerImpl>()) { - CpuUsage::self().add_thread(self); - } - ~Wrapper() { - self->set_category(CpuUsage::num_categories); - CpuUsage::self().remove_thread(std::move(self)); - } - }; +CpuUsage::ThreadTrackerImpl::ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler) + : _lock(), + _cat(Category::OTHER), + _old_usage(), + _sampler(std::move(sampler)), + _pending() +{ +} -public: - ThreadTrackerImpl() - : _lock(), - _cat_idx(num_categories), - _old_usage(), - _sampler(cpu_usage::create_thread_sampler()), - _pending() - { +CpuUsage::Category +CpuUsage::ThreadTrackerImpl::set_category(Category new_cat) noexcept +{ + // only owning thread may change category + if (new_cat == _cat) { + return new_cat; + } + Guard guard(_lock); + duration new_usage = _sampler->sample(); + if (_cat != Category::OTHER) { + _pending[_cat] += (new_usage - _old_usage); } + _old_usage = new_usage; + auto old_cat = _cat; + _cat = new_cat; + return old_cat; +} - uint32_t set_category(uint32_t new_cat_idx) { - Guard guard(_lock); +CpuUsage::Sample +CpuUsage::ThreadTrackerImpl::sample() noexcept +{ + Guard guard(_lock); + if (_cat != Category::OTHER) { duration new_usage = _sampler->sample(); - if (_cat_idx < num_categories) { - _pending[_cat_idx] += (new_usage - _old_usage); - } + _pending[_cat] += (new_usage - _old_usage); _old_usage = new_usage; - size_t old_cat_idx = _cat_idx; - _cat_idx = new_cat_idx; - return old_cat_idx; - } - - Sample sample() override { - Guard guard(_lock); - if (_cat_idx < num_categories) { - duration new_usage = _sampler->sample(); - _pending[_cat_idx] += (new_usage - _old_usage); - _old_usage = new_usage; - } - Sample sample = _pending; - _pending = Sample(); - return sample; - } - - static ThreadTrackerImpl &self() { - thread_local Wrapper wrapper; - return *wrapper.self; } -}; + Sample sample = _pending; + _pending = Sample(); + return sample; +} -CpuUsage::MyUsage::MyUsage(Category cat) - : _old_cat_idx(ThreadTrackerImpl::self().set_category(index_of(cat))) +vespalib::string & +CpuUsage::name_of(Category cat) { + static std::array<vespalib::string,num_categories> names = {"setup", "read", "write", "compact", "other"}; + return names[index_of(cat)]; } -CpuUsage::MyUsage::~MyUsage() +CpuUsage::Category +CpuUsage::MyUsage::set_cpu_category_for_this_thread(Category cat) noexcept { - ThreadTrackerImpl::self().set_category(_old_cat_idx); + struct Wrapper { + std::shared_ptr<ThreadTrackerImpl> self; + Wrapper() : self(std::make_shared<ThreadTrackerImpl>(cpu_usage::create_thread_sampler())) { + CpuUsage::self().add_thread(self); + } + ~Wrapper() { + self->set_category(CpuUsage::Category::OTHER); + CpuUsage::self().remove_thread(std::move(self)); + } + }; + thread_local Wrapper wrapper; + return wrapper.self->set_category(cat); } CpuUsage::CpuUsage() @@ -218,6 +224,11 @@ CpuUsage::do_sample() my_sample.merge(_usage); _usage = my_sample; } + auto total = cpu_usage::RUsage::sample().total(); + for (size_t i = 0; i < index_of(Category::OTHER); ++i) { + total -= my_sample[i]; + } + my_sample[Category::OTHER] = std::max(total, duration::zero()); TimedSample result{t, my_sample}; if (my_promise.has_value()) { my_promise.value().set_value(result); diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h index e3333bac6b7..7b44d07a0a3 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.h +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h @@ -2,6 +2,7 @@ #include "spin_lock.h" #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/stllike/string.h> #include <memory> #include <future> #include <vector> @@ -12,13 +13,24 @@ namespace vespalib { namespace cpu_usage { /** + * Uses getrusage to sample the total amount of user and system cpu + * time used so far. + **/ +struct RUsage { + duration user; + duration system; + duration total() const { return user + system; } + static RUsage sample() noexcept; +}; + +/** * Samples the total CPU usage of the thread that created it. Note * that this must not be used after thread termination. Enables * sampling the CPU usage of a thread from outside the thread. **/ struct ThreadSampler { using UP = std::unique_ptr<ThreadSampler>; - virtual duration sample() const = 0; + virtual duration sample() const noexcept = 0; virtual ~ThreadSampler() {} }; @@ -45,16 +57,15 @@ public: // exclusive/non-overlapping; a thread can only perform one kind // of CPU work at any specific time. enum class Category { - SETUP = 0, // usage related to system setup (init/(re-)config/etc.) - READ = 1, // usage related to reading data from the system - WRITE = 2, // usage related to writing data to the system - COMPACT = 3, // usage related to internal data re-structuring - MAINTAIN = 4, // usage related to distributed cluster maintainance - NETWORK = 5, // usage related to network communication - OTHER = 6 // unspecified usage + SETUP = 0, // usage related to system setup (init/(re-)config/etc.) + READ = 1, // usage related to reading data from the system + WRITE = 2, // usage related to writing data to the system + COMPACT = 3, // usage related to internal data re-structuring + OTHER = 4 // all other cpu usage not in the categories above }; + static vespalib::string &name_of(Category cat); static constexpr size_t index_of(Category cat) { return static_cast<size_t>(cat); } - static constexpr size_t num_categories = 7; + static constexpr size_t num_categories = 5; // A sample contains how much CPU has been spent in various // categories. @@ -78,6 +89,33 @@ public: // a sample tagged with the time it was taken using TimedSample = std::pair<steady_time, Sample>; + // Used by threads to signal what kind of CPU they are currently + // using. The thread will contribute to the declared CPU usage + // category while this object lives. MyUsage instances may shadow + // each other, but must be destructed in reverse construction + // order. The preferred way to use this class is by doing: + // + // auto my_usage = CpuUsage::use(my_cat); + class MyUsage { + private: + Category _old_cat; + static Category set_cpu_category_for_this_thread(Category cat) noexcept; + public: + MyUsage(Category cat) + : _old_cat(set_cpu_category_for_this_thread(cat)) {} + MyUsage(MyUsage &&) = delete; + MyUsage(const MyUsage &) = delete; + MyUsage &operator=(MyUsage &&) = delete; + MyUsage &operator=(const MyUsage &) = delete; + ~MyUsage() { set_cpu_category_for_this_thread(_old_cat); } + }; + + // grant extra access for testing + struct Test; + +private: + using Guard = std::lock_guard<SpinLock>; + // Used when multiple threads call the 'sample' function at the // same time. One thread will sample while the others will wait // for the result. @@ -92,31 +130,25 @@ public: // in various categories since the last time it was sampled. struct ThreadTracker { using SP = std::shared_ptr<ThreadTracker>; - virtual Sample sample() = 0; + virtual Sample sample() noexcept = 0; virtual ~ThreadTracker() {} }; - class ThreadTrackerImpl; - // Used by threads to signal what kind of CPU they are currently - // using. The thread will contribute to the declared CPU usage - // category while this object lives. MyUsage instances may shadow - // each other, but must be destructed in reverse construction - // order. The preferred way to use this class is by doing: - // - // auto my_usage = CpuUsage::use(my_cat); - class MyUsage { + class ThreadTrackerImpl : public ThreadTracker { private: - uint32_t _old_cat_idx; + SpinLock _lock; + Category _cat; + duration _old_usage; + cpu_usage::ThreadSampler::UP _sampler; + Sample _pending; + public: - MyUsage(Category cat); - MyUsage(MyUsage &&) = delete; - MyUsage(const MyUsage &) = delete; - MyUsage &operator=(MyUsage &&) = delete; - MyUsage &operator=(const MyUsage &) = delete; - ~MyUsage(); + ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler); + // only called by owning thread + Category set_category(Category new_cat) noexcept; + Sample sample() noexcept override; }; -private: SpinLock _lock; Sample _usage; std::map<ThreadTracker*,ThreadTracker::SP> _threads; @@ -125,8 +157,6 @@ private: std::vector<ThreadTracker::SP> _pending_add; std::vector<ThreadTracker::SP> _pending_remove; - using Guard = std::lock_guard<SpinLock>; - CpuUsage(); CpuUsage(CpuUsage &&) = delete; CpuUsage(const CpuUsage &) = delete; |