summaryrefslogtreecommitdiffstats
path: root/vespalib/src/tests
diff options
context:
space:
mode:
authorHÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com>2022-01-24 11:29:53 +0100
committerGitHub <noreply@github.com>2022-01-24 11:29:53 +0100
commite022e18ae1c424d9afb87ba7d5acde5d7f27cf45 (patch)
tree15d41fcca8b78842b7d0f334569217ce635ccbb5 /vespalib/src/tests
parent2addfdbbf732730f51dfb9b1cdfcd416f2fa630d (diff)
parent980c43eb4464bf11c48e996a2b0a88a8ba32da46 (diff)
Merge pull request #20903 from vespa-engine/havardpe/improve-cpu-usage-tracking
improve cpu usage tracking
Diffstat (limited to 'vespalib/src/tests')
-rw-r--r--vespalib/src/tests/cpu_usage/cpu_usage_test.cpp311
1 files changed, 264 insertions, 47 deletions
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
index 5deb467ed17..6bedfb5013e 100644
--- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
+++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
@@ -3,12 +3,14 @@
#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/benchmark_timer.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/test/thread_meets.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <sys/resource.h>
#include <thread>
using namespace vespalib;
+using namespace vespalib::test;
using vespalib::make_string_short::fmt;
bool verbose = false;
@@ -58,18 +60,28 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*>
TEST_BARRIER(); // #1
auto t0 = steady_clock::now();
std::vector<duration> pre_usage = sample(samplers);
+ auto pre_total = cpu_usage::RUsage::sample();
TEST_BARRIER(); // #2
TEST_BARRIER(); // #3
auto t1 = steady_clock::now();
std::vector<duration> post_usage = sample(samplers);
+ auto post_total = cpu_usage::RUsage::sample();
TEST_BARRIER(); // #4
double wall = to_s(t1 - t0);
std::vector<double> load(4, 0.0);
for (size_t i = 0; i < 4; ++i) {
load[i] = to_s(post_usage[i] - pre_usage[i]) / wall;
}
+ double user_load = to_s(post_total.user - pre_total.user) / wall;
+ double system_load = to_s(post_total.system - pre_total.system) / wall;
+ double total_load = to_s(post_total.total() - pre_total.total()) / wall;
EXPECT_GREATER(load[3], load[0]);
+ // NB: cannot expect total_load to be greater than load[3]
+ // here due to mock loads being 'as expected' while valgrind
+ // will cut all loads in about half.
+ EXPECT_GREATER(total_load, load[0]);
fprintf(stderr, "loads: { %.2f, %.2f, %.2f, %.2f }\n", load[0], load[1], load[2], load[3]);
+ fprintf(stderr, "total load: %.2f (user: %.2f, system: %.2f)\n", total_load, user_load, system_load);
} else {
int idx = (thread_id - 1);
double target_load = double(thread_id - 1) / (num_threads - 2);
@@ -102,30 +114,33 @@ TEST("measure thread CPU clock overhead") {
fprintf(stderr, "approx overhead per sample (thread CPU clock): %f us\n", min_time_us);
}
+TEST("measure RUsage overhead") {
+ duration d;
+ double min_time_us = BenchmarkTimer::benchmark([&d]() noexcept { d = cpu_usage::RUsage::sample().total(); }, budget) * 1000000.0;
+ fprintf(stderr, "approx overhead per RUsage sample: %f us\n", min_time_us);
+}
+
//-----------------------------------------------------------------------------
-void verify_category(CpuUsage::Category cat, size_t idx) {
+void verify_category(CpuUsage::Category cat, size_t idx, const vespalib::string &name) {
switch (cat) { // make sure we known all categories
case CpuUsage::Category::SETUP:
case CpuUsage::Category::READ:
case CpuUsage::Category::WRITE:
case CpuUsage::Category::COMPACT:
- case CpuUsage::Category::MAINTAIN:
- case CpuUsage::Category::NETWORK:
case CpuUsage::Category::OTHER:
EXPECT_EQUAL(CpuUsage::index_of(cat), idx);
+ EXPECT_EQUAL(CpuUsage::name_of(cat), name);
}
}
TEST("require that CPU categories are as expected") {
- TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u));
- TEST_DO(verify_category(CpuUsage::Category::READ, 1u));
- TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u));
- TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u));
- TEST_DO(verify_category(CpuUsage::Category::MAINTAIN,4u));
- TEST_DO(verify_category(CpuUsage::Category::NETWORK, 5u));
- TEST_DO(verify_category(CpuUsage::Category::OTHER, 6u));
- EXPECT_EQUAL(CpuUsage::num_categories, 7u);
+ TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u, "setup"));
+ TEST_DO(verify_category(CpuUsage::Category::READ, 1u, "read"));
+ TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u, "write"));
+ TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u, "compact"));
+ TEST_DO(verify_category(CpuUsage::Category::OTHER, 4u, "other"));
+ EXPECT_EQUAL(CpuUsage::num_categories, 5u);
}
TEST("require that empty sample is zero") {
@@ -144,9 +159,7 @@ TEST("require that cpu samples can be manipulated and inspected") {
a[CpuUsage::Category::READ] = 2ms;
a[CpuUsage::Category::WRITE] = 3ms;
a[CpuUsage::Category::COMPACT] = 4ms;
- a[CpuUsage::Category::MAINTAIN] = 5ms;
- a[CpuUsage::Category::NETWORK] = 6ms;
- a[CpuUsage::Category::OTHER] = 7ms;
+ a[CpuUsage::Category::OTHER] = 5ms;
for (uint32_t i = 0; i < b.size(); ++i) {
b[i] = 10ms * (i + 1);
}
@@ -158,59 +171,263 @@ TEST("require that cpu samples can be manipulated and inspected") {
EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms);
EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms);
EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms);
- EXPECT_EQUAL(c[CpuUsage::Category::MAINTAIN], 55ms);
- EXPECT_EQUAL(c[CpuUsage::Category::NETWORK], 66ms);
- EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 77ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 55ms);
}
//-----------------------------------------------------------------------------
-// prototype for the class we want to use to integrate CPU usage into
-// metrics as load values. NB: this class is not thread safe.
+struct CpuUsage::Test {
+ struct BlockingTracker : ThreadTracker {
+ std::atomic<size_t> called;
+ ThreadMeets::Nop sync_entry;
+ ThreadMeets::Swap<Sample> swap_sample;
+ BlockingTracker()
+ : called(0), sync_entry(2), swap_sample() {}
+ Sample sample() noexcept override {
+ if (called++) {
+ return Sample();
+ }
+ sync_entry();
+ return swap_sample(Sample());
+ }
+ };
+ struct SimpleTracker : ThreadTracker {
+ Sample my_sample;
+ std::atomic<size_t> called;
+ SimpleTracker(Sample sample) noexcept
+ : my_sample(sample), called(0) {}
+ Sample sample() noexcept override {
+ ++called;
+ return my_sample;
+ }
+ };
+ struct Fixture {
+ CpuUsage my_usage;
+ std::shared_ptr<BlockingTracker> blocking;
+ std::vector<std::shared_ptr<SimpleTracker>> simple_list;
+ Fixture() : my_usage() {}
+ void add_blocking() {
+ ASSERT_TRUE(!blocking);
+ blocking = std::make_unique<BlockingTracker>();
+ my_usage.add_thread(blocking);
+ }
+ void add_simple(Sample sample) {
+ auto simple = std::make_shared<SimpleTracker>(sample);
+ simple_list.push_back(simple);
+ my_usage.add_thread(simple);
+ }
+ void add_remove_simple(Sample sample) {
+ auto simple = std::make_shared<SimpleTracker>(sample);
+ my_usage.add_thread(simple);
+ my_usage.remove_thread(simple);
+ }
+ size_t count_threads() {
+ Guard guard(my_usage._lock);
+ return my_usage._threads.size();
+ }
+ bool is_sampling() {
+ Guard guard(my_usage._lock);
+ return my_usage._sampling;
+ }
+ size_t count_conflicts() {
+ Guard guard(my_usage._lock);
+ if (!my_usage._conflict) {
+ return 0;
+ }
+ return my_usage._conflict->waiters;
+ }
+ size_t count_simple_samples() {
+ size_t result = 0;
+ for (const auto &simple: simple_list) {
+ result += simple->called;
+ }
+ return result;
+ }
+ TimedSample sample() { return my_usage.sample_or_wait(); }
+ ~Fixture() {
+ if (blocking) {
+ my_usage.remove_thread(std::move(blocking));
+ }
+ for (auto &simple: simple_list) {
+ my_usage.remove_thread(std::move(simple));
+ }
+ ASSERT_EQUAL(count_threads(), 0u);
+ }
+ };
+ struct TrackerImpl {
+ ThreadTrackerImpl impl;
+ TrackerImpl(cpu_usage::ThreadSampler::UP sampler)
+ : impl(std::move(sampler)) {}
+ CpuUsage::Sample sample() { return impl.sample(); }
+ CpuUsage::Category set_category(CpuUsage::Category cat) { return impl.set_category(cat); }
+ };
+};
+
+TEST_F("require that CpuUsage sample calls sample on thread trackers", CpuUsage::Test::Fixture()) {
+ CpuUsage::Sample sample;
+ sample[CpuUsage::Category::READ] = 10ms;
+ f1.add_simple(sample);
+ f1.add_simple(sample);
+ f1.add_simple(sample);
+ EXPECT_EQUAL(f1.count_threads(), 3u);
+ auto result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+ EXPECT_EQUAL(f1.count_simple_samples(), 3u);
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(60ms));
+ EXPECT_EQUAL(f1.count_simple_samples(), 6u);
+}
+
+TEST_F("require that threads added and removed between CpuUsage sample calls are tracked", CpuUsage::Test::Fixture()) {
+ CpuUsage::Sample sample;
+ sample[CpuUsage::Category::READ] = 10ms;
+ auto result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(0ms));
+ f1.add_remove_simple(sample);
+ f1.add_remove_simple(sample);
+ f1.add_remove_simple(sample);
+ EXPECT_EQUAL(f1.count_threads(), 0u);
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+}
+
+TEST_MT_FF("require that sample conflicts are resolved correctly", 5, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(num_threads - 1)) {
+ if (thread_id == 0) {
+ CpuUsage::Sample s1;
+ s1[CpuUsage::Category::SETUP] = 10ms;
+ CpuUsage::Sample s2;
+ s2[CpuUsage::Category::READ] = 20ms;
+ CpuUsage::Sample s3;
+ s3[CpuUsage::Category::WRITE] = 30ms;
+ CpuUsage::Sample s4;
+ s4[CpuUsage::Category::COMPACT] = 40ms;
+ f1.add_blocking();
+ f1.add_simple(s1); // should be sampled
+ EXPECT_TRUE(!f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), 0u);
+ TEST_BARRIER(); // #1
+ f1.blocking->sync_entry();
+ EXPECT_TRUE(f1.is_sampling());
+ while (f1.count_conflicts() < (num_threads - 2)) {
+ // wait for appropriate number of conflicts
+ std::this_thread::sleep_for(1ms);
+ }
+ f1.add_simple(s2); // should NOT be sampled (pending add)
+ f1.add_remove_simple(s3); // should be sampled (pending remove);
+ EXPECT_EQUAL(f1.count_threads(), 2u);
+ EXPECT_TRUE(f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), (num_threads - 2));
+ f1.blocking->swap_sample(s4);
+ TEST_BARRIER(); // #2
+ EXPECT_TRUE(!f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), 0u);
+ EXPECT_EQUAL(f1.count_threads(), 3u);
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::SETUP], duration(10ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::READ], duration(0ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::WRITE], duration(30ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::COMPACT], duration(40ms));
+ for (size_t i = 1; i < (num_threads - 1); ++i) {
+ EXPECT_EQUAL(f2[i].first, f2[0].first);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::SETUP], f2[0].second[CpuUsage::Category::SETUP]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::READ], f2[0].second[CpuUsage::Category::READ]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::WRITE], f2[0].second[CpuUsage::Category::WRITE]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::COMPACT], f2[0].second[CpuUsage::Category::COMPACT]);
+ }
+ } else {
+ TEST_BARRIER(); // #1
+ f2[thread_id - 1] = f1.sample();
+ TEST_BARRIER(); // #2
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+struct DummySampler : public cpu_usage::ThreadSampler {
+ duration &ref;
+ DummySampler(duration &ref_in) : ref(ref_in) {}
+ duration sample() const noexcept override { return ref; }
+};
+
+TEST("require that thread tracker implementation can track cpu use") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ t += 10ms;
+ tracker.set_category(CpuUsage::Category::SETUP);
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::READ);
+ t += 10ms;
+ auto sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(15ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(10ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(0ms));
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::WRITE);
+ t += 10ms;
+ sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(0ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(15ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(10ms));
+}
+
+TEST("require that thread tracker implementation reports previous CPU category") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::OTHER),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::SETUP)));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::SETUP),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ)));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::READ),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ)));
+}
+
+TEST("require that thread tracker implementation does not track OTHER cpu use") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ t += 10ms;
+ tracker.set_category(CpuUsage::Category::OTHER);
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::READ);
+ tracker.set_category(CpuUsage::Category::OTHER);
+ t += 15ms;
+ auto sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(0ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::OTHER], duration(0ms));
+}
+
+//-----------------------------------------------------------------------------
+
+// prototype for the class we want to use when integrating CPU usage
+// into metrics as load values. NB: this class is not thread safe.
class CpuMonitor {
private:
- duration _old_usage;
- CpuUsage::TimedSample _old_sample;
duration _min_delay;
- std::array<double,CpuUsage::num_categories+1> _load;
-
- static duration total_usage() {
- rusage usage;
- memset(&usage, 0, sizeof(usage));
- getrusage(RUSAGE_SELF, &usage);
- return from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime);
- }
+ CpuUsage::TimedSample _old_sample;
+ std::array<double,CpuUsage::num_categories> _load;
public:
CpuMonitor(duration min_delay)
- : _old_usage(total_usage()),
+ : _min_delay(min_delay),
_old_sample(CpuUsage::sample()),
- _min_delay(min_delay),
_load() {}
- std::array<double,CpuUsage::num_categories+1> get_load() {
+ std::array<double,CpuUsage::num_categories> get_load() {
if (steady_clock::now() >= (_old_sample.first + _min_delay)) {
- auto new_usage = total_usage();
auto new_sample = CpuUsage::sample();
auto dt = to_s(new_sample.first - _old_sample.first);
- double sampled_load = 0.0;
for (size_t i = 0; i < CpuUsage::num_categories; ++i) {
_load[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt;
- sampled_load += _load[i];
}
- _load[CpuUsage::num_categories] = (to_s(new_usage - _old_usage) / dt) - sampled_load;
- _old_usage = new_usage;
_old_sample = new_sample;
}
return _load;
}
};
-std::array<vespalib::string,CpuUsage::num_categories+1> names
-{ "SETUP", "READ", "WRITE", "COMPACT", "MAINTAIN", "NETWORK", "OTHER", "UNKNOWN" };
-
void do_sample_cpu_usage(const EndTime &end_time) {
+ auto my_usage = CpuUsage::use(CpuUsage::Category::SETUP);
CpuMonitor monitor(8ms);
while (!end_time()) {
std::this_thread::sleep_for(verbose ? 1s : 10ms);
@@ -220,7 +437,7 @@ void do_sample_cpu_usage(const EndTime &end_time) {
if (!body.empty()) {
body.append(", ");
}
- body.append(fmt("%s: %.2f", names[i].c_str(), load[i]));
+ body.append(fmt("%s: %.2f", CpuUsage::name_of(CpuUsage::Category(i)).c_str(), load[i]));
}
fprintf(stderr, "CPU: %s\n", body.c_str());
}
@@ -250,11 +467,11 @@ void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndT
}
}
-void do_external_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) {
- auto my_usage1 = CpuUsage::use(cat1);
+void do_external_work(CpuUsage::Category cat, const EndTime &end_time) {
+ auto my_usage1 = CpuUsage::use(CpuUsage::Category::SETUP);
while (!end_time()) {
- std::thread thread([cat2](){
- auto my_usage2 = CpuUsage::use(cat2);
+ std::thread thread([cat](){
+ auto my_usage2 = CpuUsage::use(cat);
be_busy(4ms);
});
thread.join();
@@ -266,8 +483,8 @@ TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 10
case 0: return do_sample_cpu_usage(f1);
case 1: return do_full_work(CpuUsage::Category::WRITE, f1);
case 2: return do_some_work(CpuUsage::Category::READ, f1);
- case 3: return do_nested_work(CpuUsage::Category::NETWORK, CpuUsage::Category::READ, f1);
- case 4: return do_external_work(CpuUsage::Category::SETUP, CpuUsage::Category::COMPACT, f1);
+ case 3: return do_nested_work(CpuUsage::Category::OTHER, CpuUsage::Category::READ, f1);
+ case 4: return do_external_work(CpuUsage::Category::COMPACT, f1);
default: TEST_FATAL("missing thread id case");
}
}