diff options
author | HÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com> | 2022-01-24 11:29:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-24 11:29:53 +0100 |
commit | e022e18ae1c424d9afb87ba7d5acde5d7f27cf45 (patch) | |
tree | 15d41fcca8b78842b7d0f334569217ce635ccbb5 /vespalib/src/tests | |
parent | 2addfdbbf732730f51dfb9b1cdfcd416f2fa630d (diff) | |
parent | 980c43eb4464bf11c48e996a2b0a88a8ba32da46 (diff) |
Merge pull request #20903 from vespa-engine/havardpe/improve-cpu-usage-tracking
improve cpu usage tracking
Diffstat (limited to 'vespalib/src/tests')
-rw-r--r-- | vespalib/src/tests/cpu_usage/cpu_usage_test.cpp | 311 |
1 files changed, 264 insertions, 47 deletions
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp index 5deb467ed17..6bedfb5013e 100644 --- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp +++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp @@ -3,12 +3,14 @@ #include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/test/thread_meets.h> #include <vespa/vespalib/testkit/test_kit.h> #include <sys/resource.h> #include <thread> using namespace vespalib; +using namespace vespalib::test; using vespalib::make_string_short::fmt; bool verbose = false; @@ -58,18 +60,28 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*> TEST_BARRIER(); // #1 auto t0 = steady_clock::now(); std::vector<duration> pre_usage = sample(samplers); + auto pre_total = cpu_usage::RUsage::sample(); TEST_BARRIER(); // #2 TEST_BARRIER(); // #3 auto t1 = steady_clock::now(); std::vector<duration> post_usage = sample(samplers); + auto post_total = cpu_usage::RUsage::sample(); TEST_BARRIER(); // #4 double wall = to_s(t1 - t0); std::vector<double> load(4, 0.0); for (size_t i = 0; i < 4; ++i) { load[i] = to_s(post_usage[i] - pre_usage[i]) / wall; } + double user_load = to_s(post_total.user - pre_total.user) / wall; + double system_load = to_s(post_total.system - pre_total.system) / wall; + double total_load = to_s(post_total.total() - pre_total.total()) / wall; EXPECT_GREATER(load[3], load[0]); + // NB: cannot expect total_load to be greater than load[3] + // here due to mock loads being 'as expected' while valgrind + // will cut all loads in about half. + EXPECT_GREATER(total_load, load[0]); fprintf(stderr, "loads: { %.2f, %.2f, %.2f, %.2f }\n", load[0], load[1], load[2], load[3]); + fprintf(stderr, "total load: %.2f (user: %.2f, system: %.2f)\n", total_load, user_load, system_load); } else { int idx = (thread_id - 1); double target_load = double(thread_id - 1) / (num_threads - 2); @@ -102,30 +114,33 @@ TEST("measure thread CPU clock overhead") { fprintf(stderr, "approx overhead per sample (thread CPU clock): %f us\n", min_time_us); } +TEST("measure RUsage overhead") { + duration d; + double min_time_us = BenchmarkTimer::benchmark([&d]() noexcept { d = cpu_usage::RUsage::sample().total(); }, budget) * 1000000.0; + fprintf(stderr, "approx overhead per RUsage sample: %f us\n", min_time_us); +} + //----------------------------------------------------------------------------- -void verify_category(CpuUsage::Category cat, size_t idx) { +void verify_category(CpuUsage::Category cat, size_t idx, const vespalib::string &name) { switch (cat) { // make sure we known all categories case CpuUsage::Category::SETUP: case CpuUsage::Category::READ: case CpuUsage::Category::WRITE: case CpuUsage::Category::COMPACT: - case CpuUsage::Category::MAINTAIN: - case CpuUsage::Category::NETWORK: case CpuUsage::Category::OTHER: EXPECT_EQUAL(CpuUsage::index_of(cat), idx); + EXPECT_EQUAL(CpuUsage::name_of(cat), name); } } TEST("require that CPU categories are as expected") { - TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u)); - TEST_DO(verify_category(CpuUsage::Category::READ, 1u)); - TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u)); - TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u)); - TEST_DO(verify_category(CpuUsage::Category::MAINTAIN,4u)); - TEST_DO(verify_category(CpuUsage::Category::NETWORK, 5u)); - TEST_DO(verify_category(CpuUsage::Category::OTHER, 6u)); - EXPECT_EQUAL(CpuUsage::num_categories, 7u); + TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u, "setup")); + TEST_DO(verify_category(CpuUsage::Category::READ, 1u, "read")); + TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u, "write")); + TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u, "compact")); + TEST_DO(verify_category(CpuUsage::Category::OTHER, 4u, "other")); + EXPECT_EQUAL(CpuUsage::num_categories, 5u); } TEST("require that empty sample is zero") { @@ -144,9 +159,7 @@ TEST("require that cpu samples can be manipulated and inspected") { a[CpuUsage::Category::READ] = 2ms; a[CpuUsage::Category::WRITE] = 3ms; a[CpuUsage::Category::COMPACT] = 4ms; - a[CpuUsage::Category::MAINTAIN] = 5ms; - a[CpuUsage::Category::NETWORK] = 6ms; - a[CpuUsage::Category::OTHER] = 7ms; + a[CpuUsage::Category::OTHER] = 5ms; for (uint32_t i = 0; i < b.size(); ++i) { b[i] = 10ms * (i + 1); } @@ -158,59 +171,263 @@ TEST("require that cpu samples can be manipulated and inspected") { EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms); EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms); EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms); - EXPECT_EQUAL(c[CpuUsage::Category::MAINTAIN], 55ms); - EXPECT_EQUAL(c[CpuUsage::Category::NETWORK], 66ms); - EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 77ms); + EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 55ms); } //----------------------------------------------------------------------------- -// prototype for the class we want to use to integrate CPU usage into -// metrics as load values. NB: this class is not thread safe. +struct CpuUsage::Test { + struct BlockingTracker : ThreadTracker { + std::atomic<size_t> called; + ThreadMeets::Nop sync_entry; + ThreadMeets::Swap<Sample> swap_sample; + BlockingTracker() + : called(0), sync_entry(2), swap_sample() {} + Sample sample() noexcept override { + if (called++) { + return Sample(); + } + sync_entry(); + return swap_sample(Sample()); + } + }; + struct SimpleTracker : ThreadTracker { + Sample my_sample; + std::atomic<size_t> called; + SimpleTracker(Sample sample) noexcept + : my_sample(sample), called(0) {} + Sample sample() noexcept override { + ++called; + return my_sample; + } + }; + struct Fixture { + CpuUsage my_usage; + std::shared_ptr<BlockingTracker> blocking; + std::vector<std::shared_ptr<SimpleTracker>> simple_list; + Fixture() : my_usage() {} + void add_blocking() { + ASSERT_TRUE(!blocking); + blocking = std::make_unique<BlockingTracker>(); + my_usage.add_thread(blocking); + } + void add_simple(Sample sample) { + auto simple = std::make_shared<SimpleTracker>(sample); + simple_list.push_back(simple); + my_usage.add_thread(simple); + } + void add_remove_simple(Sample sample) { + auto simple = std::make_shared<SimpleTracker>(sample); + my_usage.add_thread(simple); + my_usage.remove_thread(simple); + } + size_t count_threads() { + Guard guard(my_usage._lock); + return my_usage._threads.size(); + } + bool is_sampling() { + Guard guard(my_usage._lock); + return my_usage._sampling; + } + size_t count_conflicts() { + Guard guard(my_usage._lock); + if (!my_usage._conflict) { + return 0; + } + return my_usage._conflict->waiters; + } + size_t count_simple_samples() { + size_t result = 0; + for (const auto &simple: simple_list) { + result += simple->called; + } + return result; + } + TimedSample sample() { return my_usage.sample_or_wait(); } + ~Fixture() { + if (blocking) { + my_usage.remove_thread(std::move(blocking)); + } + for (auto &simple: simple_list) { + my_usage.remove_thread(std::move(simple)); + } + ASSERT_EQUAL(count_threads(), 0u); + } + }; + struct TrackerImpl { + ThreadTrackerImpl impl; + TrackerImpl(cpu_usage::ThreadSampler::UP sampler) + : impl(std::move(sampler)) {} + CpuUsage::Sample sample() { return impl.sample(); } + CpuUsage::Category set_category(CpuUsage::Category cat) { return impl.set_category(cat); } + }; +}; + +TEST_F("require that CpuUsage sample calls sample on thread trackers", CpuUsage::Test::Fixture()) { + CpuUsage::Sample sample; + sample[CpuUsage::Category::READ] = 10ms; + f1.add_simple(sample); + f1.add_simple(sample); + f1.add_simple(sample); + EXPECT_EQUAL(f1.count_threads(), 3u); + auto result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); + EXPECT_EQUAL(f1.count_simple_samples(), 3u); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(60ms)); + EXPECT_EQUAL(f1.count_simple_samples(), 6u); +} + +TEST_F("require that threads added and removed between CpuUsage sample calls are tracked", CpuUsage::Test::Fixture()) { + CpuUsage::Sample sample; + sample[CpuUsage::Category::READ] = 10ms; + auto result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(0ms)); + f1.add_remove_simple(sample); + f1.add_remove_simple(sample); + f1.add_remove_simple(sample); + EXPECT_EQUAL(f1.count_threads(), 0u); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); +} + +TEST_MT_FF("require that sample conflicts are resolved correctly", 5, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(num_threads - 1)) { + if (thread_id == 0) { + CpuUsage::Sample s1; + s1[CpuUsage::Category::SETUP] = 10ms; + CpuUsage::Sample s2; + s2[CpuUsage::Category::READ] = 20ms; + CpuUsage::Sample s3; + s3[CpuUsage::Category::WRITE] = 30ms; + CpuUsage::Sample s4; + s4[CpuUsage::Category::COMPACT] = 40ms; + f1.add_blocking(); + f1.add_simple(s1); // should be sampled + EXPECT_TRUE(!f1.is_sampling()); + EXPECT_EQUAL(f1.count_conflicts(), 0u); + TEST_BARRIER(); // #1 + f1.blocking->sync_entry(); + EXPECT_TRUE(f1.is_sampling()); + while (f1.count_conflicts() < (num_threads - 2)) { + // wait for appropriate number of conflicts + std::this_thread::sleep_for(1ms); + } + f1.add_simple(s2); // should NOT be sampled (pending add) + f1.add_remove_simple(s3); // should be sampled (pending remove); + EXPECT_EQUAL(f1.count_threads(), 2u); + EXPECT_TRUE(f1.is_sampling()); + EXPECT_EQUAL(f1.count_conflicts(), (num_threads - 2)); + f1.blocking->swap_sample(s4); + TEST_BARRIER(); // #2 + EXPECT_TRUE(!f1.is_sampling()); + EXPECT_EQUAL(f1.count_conflicts(), 0u); + EXPECT_EQUAL(f1.count_threads(), 3u); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::SETUP], duration(10ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::READ], duration(0ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::WRITE], duration(30ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::COMPACT], duration(40ms)); + for (size_t i = 1; i < (num_threads - 1); ++i) { + EXPECT_EQUAL(f2[i].first, f2[0].first); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::SETUP], f2[0].second[CpuUsage::Category::SETUP]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::READ], f2[0].second[CpuUsage::Category::READ]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::WRITE], f2[0].second[CpuUsage::Category::WRITE]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::COMPACT], f2[0].second[CpuUsage::Category::COMPACT]); + } + } else { + TEST_BARRIER(); // #1 + f2[thread_id - 1] = f1.sample(); + TEST_BARRIER(); // #2 + } +} + +//----------------------------------------------------------------------------- + +struct DummySampler : public cpu_usage::ThreadSampler { + duration &ref; + DummySampler(duration &ref_in) : ref(ref_in) {} + duration sample() const noexcept override { return ref; } +}; + +TEST("require that thread tracker implementation can track cpu use") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + t += 10ms; + tracker.set_category(CpuUsage::Category::SETUP); + t += 15ms; + tracker.set_category(CpuUsage::Category::READ); + t += 10ms; + auto sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(15ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(10ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(0ms)); + t += 15ms; + tracker.set_category(CpuUsage::Category::WRITE); + t += 10ms; + sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(0ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(15ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(10ms)); +} + +TEST("require that thread tracker implementation reports previous CPU category") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::OTHER), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::SETUP))); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::SETUP), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ))); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::READ), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ))); +} + +TEST("require that thread tracker implementation does not track OTHER cpu use") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + t += 10ms; + tracker.set_category(CpuUsage::Category::OTHER); + t += 15ms; + tracker.set_category(CpuUsage::Category::READ); + tracker.set_category(CpuUsage::Category::OTHER); + t += 15ms; + auto sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(0ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::OTHER], duration(0ms)); +} + +//----------------------------------------------------------------------------- + +// prototype for the class we want to use when integrating CPU usage +// into metrics as load values. NB: this class is not thread safe. class CpuMonitor { private: - duration _old_usage; - CpuUsage::TimedSample _old_sample; duration _min_delay; - std::array<double,CpuUsage::num_categories+1> _load; - - static duration total_usage() { - rusage usage; - memset(&usage, 0, sizeof(usage)); - getrusage(RUSAGE_SELF, &usage); - return from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime); - } + CpuUsage::TimedSample _old_sample; + std::array<double,CpuUsage::num_categories> _load; public: CpuMonitor(duration min_delay) - : _old_usage(total_usage()), + : _min_delay(min_delay), _old_sample(CpuUsage::sample()), - _min_delay(min_delay), _load() {} - std::array<double,CpuUsage::num_categories+1> get_load() { + std::array<double,CpuUsage::num_categories> get_load() { if (steady_clock::now() >= (_old_sample.first + _min_delay)) { - auto new_usage = total_usage(); auto new_sample = CpuUsage::sample(); auto dt = to_s(new_sample.first - _old_sample.first); - double sampled_load = 0.0; for (size_t i = 0; i < CpuUsage::num_categories; ++i) { _load[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt; - sampled_load += _load[i]; } - _load[CpuUsage::num_categories] = (to_s(new_usage - _old_usage) / dt) - sampled_load; - _old_usage = new_usage; _old_sample = new_sample; } return _load; } }; -std::array<vespalib::string,CpuUsage::num_categories+1> names -{ "SETUP", "READ", "WRITE", "COMPACT", "MAINTAIN", "NETWORK", "OTHER", "UNKNOWN" }; - void do_sample_cpu_usage(const EndTime &end_time) { + auto my_usage = CpuUsage::use(CpuUsage::Category::SETUP); CpuMonitor monitor(8ms); while (!end_time()) { std::this_thread::sleep_for(verbose ? 1s : 10ms); @@ -220,7 +437,7 @@ void do_sample_cpu_usage(const EndTime &end_time) { if (!body.empty()) { body.append(", "); } - body.append(fmt("%s: %.2f", names[i].c_str(), load[i])); + body.append(fmt("%s: %.2f", CpuUsage::name_of(CpuUsage::Category(i)).c_str(), load[i])); } fprintf(stderr, "CPU: %s\n", body.c_str()); } @@ -250,11 +467,11 @@ void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndT } } -void do_external_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) { - auto my_usage1 = CpuUsage::use(cat1); +void do_external_work(CpuUsage::Category cat, const EndTime &end_time) { + auto my_usage1 = CpuUsage::use(CpuUsage::Category::SETUP); while (!end_time()) { - std::thread thread([cat2](){ - auto my_usage2 = CpuUsage::use(cat2); + std::thread thread([cat](){ + auto my_usage2 = CpuUsage::use(cat); be_busy(4ms); }); thread.join(); @@ -266,8 +483,8 @@ TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 10 case 0: return do_sample_cpu_usage(f1); case 1: return do_full_work(CpuUsage::Category::WRITE, f1); case 2: return do_some_work(CpuUsage::Category::READ, f1); - case 3: return do_nested_work(CpuUsage::Category::NETWORK, CpuUsage::Category::READ, f1); - case 4: return do_external_work(CpuUsage::Category::SETUP, CpuUsage::Category::COMPACT, f1); + case 3: return do_nested_work(CpuUsage::Category::OTHER, CpuUsage::Category::READ, f1); + case 4: return do_external_work(CpuUsage::Category::COMPACT, f1); default: TEST_FATAL("missing thread id case"); } } |