aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2022-01-20 12:52:49 +0000
committerHåvard Pettersen <havardpe@oath.com>2022-01-21 14:57:53 +0000
commit4ea7718e942e69fd6b0a52fe373dc3493a389a16 (patch)
tree833cd5c65309a22bc96463ac01b3196400fdd855 /vespalib
parent645a093e74c383d1632681e78b0d8f179dcc763e (diff)
improve cpu usage tracking
- add cpu_usage::RUsage primitive used for getrusage - reduce the number of categories (small initial set) - use RUsage to fill OTHER category (more complete samples) - threads no longer track OTHER category explicitly (less overhead) - changing cpu category to itself is now nop (less overhead) - make ThreadTracker and SampleConflict private (less API clutter) - improved testing (sample conflict resolution) - improves testing (thread tracker implementation)
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/cpu_usage/cpu_usage_test.cpp289
-rw-r--r--vespalib/src/vespa/vespalib/test/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/test/thread_meets.cpp12
-rw-r--r--vespalib/src/vespa/vespalib/test/thread_meets.h34
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.cpp133
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.h88
6 files changed, 420 insertions, 137 deletions
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
index 5deb467ed17..b2da9db857c 100644
--- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
+++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
@@ -3,12 +3,14 @@
#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/benchmark_timer.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/test/thread_meets.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <sys/resource.h>
#include <thread>
using namespace vespalib;
+using namespace vespalib::test;
using vespalib::make_string_short::fmt;
bool verbose = false;
@@ -58,18 +60,28 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*>
TEST_BARRIER(); // #1
auto t0 = steady_clock::now();
std::vector<duration> pre_usage = sample(samplers);
+ auto pre_total = cpu_usage::RUsage::sample();
TEST_BARRIER(); // #2
TEST_BARRIER(); // #3
auto t1 = steady_clock::now();
std::vector<duration> post_usage = sample(samplers);
+ auto post_total = cpu_usage::RUsage::sample();
TEST_BARRIER(); // #4
double wall = to_s(t1 - t0);
std::vector<double> load(4, 0.0);
for (size_t i = 0; i < 4; ++i) {
load[i] = to_s(post_usage[i] - pre_usage[i]) / wall;
}
+ double user_load = to_s(post_total.user - pre_total.user) / wall;
+ double system_load = to_s(post_total.system - pre_total.system) / wall;
+ double total_load = to_s(post_total.total() - pre_total.total()) / wall;
EXPECT_GREATER(load[3], load[0]);
+ // NB: cannot expect total_load to be greater than load[3]
+ // here due to mock loads being 'as expected' while valgrind
+ // will cut all loads in about half.
+ EXPECT_GREATER(total_load, load[0]);
fprintf(stderr, "loads: { %.2f, %.2f, %.2f, %.2f }\n", load[0], load[1], load[2], load[3]);
+ fprintf(stderr, "total load: %.2f (user: %.2f, system: %.2f)\n", total_load, user_load, system_load);
} else {
int idx = (thread_id - 1);
double target_load = double(thread_id - 1) / (num_threads - 2);
@@ -102,30 +114,33 @@ TEST("measure thread CPU clock overhead") {
fprintf(stderr, "approx overhead per sample (thread CPU clock): %f us\n", min_time_us);
}
+TEST("measure RUsage overhead") {
+ duration d;
+ double min_time_us = BenchmarkTimer::benchmark([&d]() noexcept { d = cpu_usage::RUsage::sample().total(); }, budget) * 1000000.0;
+ fprintf(stderr, "approx overhead per RUsage sample: %f us\n", min_time_us);
+}
+
//-----------------------------------------------------------------------------
-void verify_category(CpuUsage::Category cat, size_t idx) {
+void verify_category(CpuUsage::Category cat, size_t idx, const vespalib::string &name) {
switch (cat) { // make sure we known all categories
case CpuUsage::Category::SETUP:
case CpuUsage::Category::READ:
case CpuUsage::Category::WRITE:
case CpuUsage::Category::COMPACT:
- case CpuUsage::Category::MAINTAIN:
- case CpuUsage::Category::NETWORK:
case CpuUsage::Category::OTHER:
EXPECT_EQUAL(CpuUsage::index_of(cat), idx);
+ EXPECT_EQUAL(CpuUsage::name_of(cat), name);
}
}
TEST("require that CPU categories are as expected") {
- TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u));
- TEST_DO(verify_category(CpuUsage::Category::READ, 1u));
- TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u));
- TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u));
- TEST_DO(verify_category(CpuUsage::Category::MAINTAIN,4u));
- TEST_DO(verify_category(CpuUsage::Category::NETWORK, 5u));
- TEST_DO(verify_category(CpuUsage::Category::OTHER, 6u));
- EXPECT_EQUAL(CpuUsage::num_categories, 7u);
+ TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u, "setup"));
+ TEST_DO(verify_category(CpuUsage::Category::READ, 1u, "read"));
+ TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u, "write"));
+ TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u, "compact"));
+ TEST_DO(verify_category(CpuUsage::Category::OTHER, 4u, "other"));
+ EXPECT_EQUAL(CpuUsage::num_categories, 5u);
}
TEST("require that empty sample is zero") {
@@ -144,9 +159,7 @@ TEST("require that cpu samples can be manipulated and inspected") {
a[CpuUsage::Category::READ] = 2ms;
a[CpuUsage::Category::WRITE] = 3ms;
a[CpuUsage::Category::COMPACT] = 4ms;
- a[CpuUsage::Category::MAINTAIN] = 5ms;
- a[CpuUsage::Category::NETWORK] = 6ms;
- a[CpuUsage::Category::OTHER] = 7ms;
+ a[CpuUsage::Category::OTHER] = 5ms;
for (uint32_t i = 0; i < b.size(); ++i) {
b[i] = 10ms * (i + 1);
}
@@ -158,59 +171,241 @@ TEST("require that cpu samples can be manipulated and inspected") {
EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms);
EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms);
EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms);
- EXPECT_EQUAL(c[CpuUsage::Category::MAINTAIN], 55ms);
- EXPECT_EQUAL(c[CpuUsage::Category::NETWORK], 66ms);
- EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 77ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 55ms);
}
//-----------------------------------------------------------------------------
-// prototype for the class we want to use to integrate CPU usage into
-// metrics as load values. NB: this class is not thread safe.
+struct CpuUsage::Test {
+ struct BlockingTracker : ThreadTracker {
+ std::atomic<size_t> called;
+ ThreadMeets::Nop sync_entry;
+ ThreadMeets::Swap<Sample> swap_sample;
+ BlockingTracker()
+ : called(0), sync_entry(2), swap_sample() {}
+ Sample sample() noexcept override {
+ if (called++) {
+ return Sample();
+ }
+ sync_entry();
+ return swap_sample(Sample());
+ }
+ };
+ struct SimpleTracker : ThreadTracker {
+ Sample my_sample;
+ std::atomic<size_t> called;
+ SimpleTracker(Sample sample) noexcept
+ : my_sample(sample), called(0) {}
+ Sample sample() noexcept override {
+ ++called;
+ return my_sample;
+ }
+ };
+ struct Fixture {
+ CpuUsage my_usage;
+ std::shared_ptr<BlockingTracker> blocking;
+ std::vector<std::shared_ptr<SimpleTracker>> simple_list;
+ Fixture() : my_usage() {}
+ void add_blocking() {
+ ASSERT_TRUE(!blocking);
+ blocking = std::make_unique<BlockingTracker>();
+ my_usage.add_thread(blocking);
+ }
+ void add_simple(Sample sample) {
+ auto simple = std::make_shared<SimpleTracker>(sample);
+ simple_list.push_back(simple);
+ my_usage.add_thread(simple);
+ }
+ void add_remove_simple(Sample sample) {
+ auto simple = std::make_shared<SimpleTracker>(sample);
+ my_usage.add_thread(simple);
+ my_usage.remove_thread(simple);
+ }
+ size_t count_threads() {
+ Guard guard(my_usage._lock);
+ return my_usage._threads.size();
+ }
+ size_t count_simple_samples() {
+ size_t result = 0;
+ for (const auto &simple: simple_list) {
+ result += simple->called;
+ }
+ return result;
+ }
+ TimedSample sample() { return my_usage.sample_or_wait(); }
+ ~Fixture() {
+ if (blocking) {
+ my_usage.remove_thread(std::move(blocking));
+ }
+ for (auto &simple: simple_list) {
+ my_usage.remove_thread(std::move(simple));
+ }
+ ASSERT_EQUAL(count_threads(), 0u);
+ }
+ };
+ struct TrackerImpl {
+ ThreadTrackerImpl impl;
+ TrackerImpl(cpu_usage::ThreadSampler::UP sampler)
+ : impl(std::move(sampler)) {}
+ CpuUsage::Sample sample() { return impl.sample(); }
+ CpuUsage::Category set_category(CpuUsage::Category cat) { return impl.set_category(cat); }
+ };
+};
+
+TEST_F("require that CpuUsage sample calls sample on thread trackers", CpuUsage::Test::Fixture()) {
+ CpuUsage::Sample sample;
+ sample[CpuUsage::Category::READ] = 10ms;
+ f1.add_simple(sample);
+ f1.add_simple(sample);
+ f1.add_simple(sample);
+ EXPECT_EQUAL(f1.count_threads(), 3u);
+ auto result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+ EXPECT_EQUAL(f1.count_simple_samples(), 3u);
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(60ms));
+ EXPECT_EQUAL(f1.count_simple_samples(), 6u);
+}
+
+TEST_F("require that threads added and removed between CpuUsage sample calls are tracked", CpuUsage::Test::Fixture()) {
+ CpuUsage::Sample sample;
+ sample[CpuUsage::Category::READ] = 10ms;
+ auto result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(0ms));
+ f1.add_remove_simple(sample);
+ f1.add_remove_simple(sample);
+ f1.add_remove_simple(sample);
+ EXPECT_EQUAL(f1.count_threads(), 0u);
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+}
+
+TEST_MT_FF("require that sample conflicts are resolved correctly", 4, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(3)) {
+ if (thread_id == 0) {
+ CpuUsage::Sample s1;
+ s1[CpuUsage::Category::SETUP] = 10ms;
+ CpuUsage::Sample s2;
+ s2[CpuUsage::Category::READ] = 20ms;
+ CpuUsage::Sample s3;
+ s3[CpuUsage::Category::WRITE] = 30ms;
+ CpuUsage::Sample s4;
+ s4[CpuUsage::Category::COMPACT] = 40ms;
+ f1.add_blocking();
+ f1.add_simple(s1); // should be sampled
+ TEST_BARRIER(); // #1
+ f1.blocking->sync_entry();
+ f1.add_simple(s2); // should NOT be sampled (pending add)
+ f1.add_remove_simple(s3); // should be sampled (pending remove);
+ EXPECT_EQUAL(f1.count_threads(), 2u);
+ f1.blocking->swap_sample(s4);
+ TEST_BARRIER(); // #2
+ EXPECT_EQUAL(f1.count_threads(), 3u);
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::SETUP], duration(10ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::READ], duration(0ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::WRITE], duration(30ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::COMPACT], duration(40ms));
+ for (size_t i = 1; i < 3; ++i) {
+ EXPECT_EQUAL(f2[i].first, f2[0].first);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::SETUP], f2[0].second[CpuUsage::Category::SETUP]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::READ], f2[0].second[CpuUsage::Category::READ]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::WRITE], f2[0].second[CpuUsage::Category::WRITE]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::COMPACT], f2[0].second[CpuUsage::Category::COMPACT]);
+ }
+ } else {
+ TEST_BARRIER(); // #1
+ f2[thread_id - 1] = f1.sample();
+ TEST_BARRIER(); // #2
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+struct DummySampler : public cpu_usage::ThreadSampler {
+ duration &ref;
+ DummySampler(duration &ref_in) : ref(ref_in) {}
+ duration sample() const noexcept override { return ref; }
+};
+
+TEST("require that thread tracker implementation can track cpu use") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ t += 10ms;
+ tracker.set_category(CpuUsage::Category::SETUP);
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::READ);
+ t += 10ms;
+ auto sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(15ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(10ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(0ms));
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::WRITE);
+ t += 10ms;
+ sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(0ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(15ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(10ms));
+}
+
+TEST("require that thread tracker implementation reports previous CPU category") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::OTHER),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::SETUP)));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::SETUP),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ)));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::READ),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ)));
+}
+
+TEST("require that thread tracker implementation does not track OTHER cpu use") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ t += 10ms;
+ tracker.set_category(CpuUsage::Category::OTHER);
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::READ);
+ tracker.set_category(CpuUsage::Category::OTHER);
+ t += 15ms;
+ auto sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(0ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::OTHER], duration(0ms));
+}
+
+//-----------------------------------------------------------------------------
+
+// prototype for the class we want to use when integrating CPU usage
+// into metrics as load values. NB: this class is not thread safe.
class CpuMonitor {
private:
- duration _old_usage;
- CpuUsage::TimedSample _old_sample;
duration _min_delay;
- std::array<double,CpuUsage::num_categories+1> _load;
-
- static duration total_usage() {
- rusage usage;
- memset(&usage, 0, sizeof(usage));
- getrusage(RUSAGE_SELF, &usage);
- return from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime);
- }
+ CpuUsage::TimedSample _old_sample;
+ std::array<double,CpuUsage::num_categories> _load;
public:
CpuMonitor(duration min_delay)
- : _old_usage(total_usage()),
+ : _min_delay(min_delay),
_old_sample(CpuUsage::sample()),
- _min_delay(min_delay),
_load() {}
- std::array<double,CpuUsage::num_categories+1> get_load() {
+ std::array<double,CpuUsage::num_categories> get_load() {
if (steady_clock::now() >= (_old_sample.first + _min_delay)) {
- auto new_usage = total_usage();
auto new_sample = CpuUsage::sample();
auto dt = to_s(new_sample.first - _old_sample.first);
- double sampled_load = 0.0;
for (size_t i = 0; i < CpuUsage::num_categories; ++i) {
_load[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt;
- sampled_load += _load[i];
}
- _load[CpuUsage::num_categories] = (to_s(new_usage - _old_usage) / dt) - sampled_load;
- _old_usage = new_usage;
_old_sample = new_sample;
}
return _load;
}
};
-std::array<vespalib::string,CpuUsage::num_categories+1> names
-{ "SETUP", "READ", "WRITE", "COMPACT", "MAINTAIN", "NETWORK", "OTHER", "UNKNOWN" };
-
void do_sample_cpu_usage(const EndTime &end_time) {
+ auto my_usage = CpuUsage::use(CpuUsage::Category::SETUP);
CpuMonitor monitor(8ms);
while (!end_time()) {
std::this_thread::sleep_for(verbose ? 1s : 10ms);
@@ -220,7 +415,7 @@ void do_sample_cpu_usage(const EndTime &end_time) {
if (!body.empty()) {
body.append(", ");
}
- body.append(fmt("%s: %.2f", names[i].c_str(), load[i]));
+ body.append(fmt("%s: %.2f", CpuUsage::name_of(CpuUsage::Category(i)).c_str(), load[i]));
}
fprintf(stderr, "CPU: %s\n", body.c_str());
}
@@ -250,11 +445,11 @@ void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndT
}
}
-void do_external_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) {
- auto my_usage1 = CpuUsage::use(cat1);
+void do_external_work(CpuUsage::Category cat, const EndTime &end_time) {
+ auto my_usage1 = CpuUsage::use(CpuUsage::Category::SETUP);
while (!end_time()) {
- std::thread thread([cat2](){
- auto my_usage2 = CpuUsage::use(cat2);
+ std::thread thread([cat](){
+ auto my_usage2 = CpuUsage::use(cat);
be_busy(4ms);
});
thread.join();
@@ -266,8 +461,8 @@ TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 10
case 0: return do_sample_cpu_usage(f1);
case 1: return do_full_work(CpuUsage::Category::WRITE, f1);
case 2: return do_some_work(CpuUsage::Category::READ, f1);
- case 3: return do_nested_work(CpuUsage::Category::NETWORK, CpuUsage::Category::READ, f1);
- case 4: return do_external_work(CpuUsage::Category::SETUP, CpuUsage::Category::COMPACT, f1);
+ case 3: return do_nested_work(CpuUsage::Category::OTHER, CpuUsage::Category::READ, f1);
+ case 4: return do_external_work(CpuUsage::Category::COMPACT, f1);
default: TEST_FATAL("missing thread id case");
}
}
diff --git a/vespalib/src/vespa/vespalib/test/CMakeLists.txt b/vespalib/src/vespa/vespalib/test/CMakeLists.txt
index d658b28f94b..a60eb15a4d4 100644
--- a/vespalib/src/vespa/vespalib/test/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/test/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(vespalib_vespalib_test OBJECT
make_tls_options_for_testing.cpp
memory_allocator_observer.cpp
peer_policy_utils.cpp
+ thread_meets.cpp
time_tracer.cpp
DEPENDS
)
diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.cpp b/vespalib/src/vespa/vespalib/test/thread_meets.cpp
new file mode 100644
index 00000000000..9d23e0eab28
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/test/thread_meets.cpp
@@ -0,0 +1,12 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "thread_meets.h"
+
+namespace vespalib::test {
+
+void
+ThreadMeets::Nop::mingle()
+{
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.h b/vespalib/src/vespa/vespalib/test/thread_meets.h
new file mode 100644
index 00000000000..62ca7779935
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/test/thread_meets.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/rendezvous.h>
+
+namespace vespalib::test {
+
+/**
+ * Generally useful rendezvous implementations.
+ **/
+struct ThreadMeets {
+ // can be used as a simple thread barrier
+ struct Nop : vespalib::Rendezvous<bool,bool> {
+ Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {}
+ void operator()() { rendezvous(false); }
+ void mingle() override;
+ };
+ // swap values between 2 threads
+ template <typename T>
+ struct Swap : vespalib::Rendezvous<T,T> {
+ using vespalib::Rendezvous<T,T>::in;
+ using vespalib::Rendezvous<T,T>::out;
+ using vespalib::Rendezvous<T,T>::rendezvous;
+ Swap() : vespalib::Rendezvous<T,T>(2) {}
+ T operator()(T input) { return rendezvous(input); }
+ void mingle() override {
+ out(1) = in(0);
+ out(0) = in(1);
+ }
+ };
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
index ca2199275de..5f9e1f521b9 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
@@ -6,6 +6,8 @@
#include <optional>
#include <cassert>
+#include <sys/resource.h>
+
namespace vespalib {
namespace cpu_usage {
@@ -18,7 +20,7 @@ private:
double _load;
public:
DummyThreadSampler(double load) : _start(steady_clock::now()), _load(load) {}
- duration sample() const override {
+ duration sample() const noexcept override {
return from_s(to_s(steady_clock::now() - _start) * _load);
}
};
@@ -32,9 +34,10 @@ public:
LinuxThreadSampler() : _my_clock() {
REQUIRE_EQ(pthread_getcpuclockid(pthread_self(), &_my_clock), 0);
}
- duration sample() const override {
+ duration sample() const noexcept override {
timespec ts;
- REQUIRE_EQ(clock_gettime(_my_clock, &ts), 0);
+ memset(&ts, 0, sizeof(ts));
+ clock_gettime(_my_clock, &ts);
return from_timespec(ts);
}
};
@@ -43,6 +46,15 @@ public:
} // <unnamed>
+RUsage
+RUsage::sample() noexcept
+{
+ rusage usage;
+ memset(&usage, 0, sizeof(usage));
+ getrusage(RUSAGE_SELF, &usage);
+ return {from_timeval(usage.ru_utime), from_timeval(usage.ru_stime)};
+}
+
ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_load) {
if (force_mock_impl) {
return std::make_unique<DummyThreadSampler>(expected_load);
@@ -55,75 +67,69 @@ ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_lo
} // cpu_usage
-class CpuUsage::ThreadTrackerImpl : public CpuUsage::ThreadTracker {
-private:
- SpinLock _lock;
- uint32_t _cat_idx;
- duration _old_usage;
- cpu_usage::ThreadSampler::UP _sampler;
- Sample _pending;
-
- using Guard = std::lock_guard<SpinLock>;
-
- struct Wrapper {
- std::shared_ptr<ThreadTrackerImpl> self;
- Wrapper() : self(std::make_shared<ThreadTrackerImpl>()) {
- CpuUsage::self().add_thread(self);
- }
- ~Wrapper() {
- self->set_category(CpuUsage::num_categories);
- CpuUsage::self().remove_thread(std::move(self));
- }
- };
+CpuUsage::ThreadTrackerImpl::ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler)
+ : _lock(),
+ _cat(Category::OTHER),
+ _old_usage(),
+ _sampler(std::move(sampler)),
+ _pending()
+{
+}
-public:
- ThreadTrackerImpl()
- : _lock(),
- _cat_idx(num_categories),
- _old_usage(),
- _sampler(cpu_usage::create_thread_sampler()),
- _pending()
- {
+CpuUsage::Category
+CpuUsage::ThreadTrackerImpl::set_category(Category new_cat) noexcept
+{
+ // only owning thread may change category
+ if (new_cat == _cat) {
+ return new_cat;
+ }
+ Guard guard(_lock);
+ duration new_usage = _sampler->sample();
+ if (_cat != Category::OTHER) {
+ _pending[_cat] += (new_usage - _old_usage);
}
+ _old_usage = new_usage;
+ auto old_cat = _cat;
+ _cat = new_cat;
+ return old_cat;
+}
- uint32_t set_category(uint32_t new_cat_idx) {
- Guard guard(_lock);
+CpuUsage::Sample
+CpuUsage::ThreadTrackerImpl::sample() noexcept
+{
+ Guard guard(_lock);
+ if (_cat != Category::OTHER) {
duration new_usage = _sampler->sample();
- if (_cat_idx < num_categories) {
- _pending[_cat_idx] += (new_usage - _old_usage);
- }
+ _pending[_cat] += (new_usage - _old_usage);
_old_usage = new_usage;
- size_t old_cat_idx = _cat_idx;
- _cat_idx = new_cat_idx;
- return old_cat_idx;
- }
-
- Sample sample() override {
- Guard guard(_lock);
- if (_cat_idx < num_categories) {
- duration new_usage = _sampler->sample();
- _pending[_cat_idx] += (new_usage - _old_usage);
- _old_usage = new_usage;
- }
- Sample sample = _pending;
- _pending = Sample();
- return sample;
- }
-
- static ThreadTrackerImpl &self() {
- thread_local Wrapper wrapper;
- return *wrapper.self;
}
-};
+ Sample sample = _pending;
+ _pending = Sample();
+ return sample;
+}
-CpuUsage::MyUsage::MyUsage(Category cat)
- : _old_cat_idx(ThreadTrackerImpl::self().set_category(index_of(cat)))
+vespalib::string &
+CpuUsage::name_of(Category cat)
{
+ static std::array<vespalib::string,num_categories> names = {"setup", "read", "write", "compact", "other"};
+ return names[index_of(cat)];
}
-CpuUsage::MyUsage::~MyUsage()
+CpuUsage::Category
+CpuUsage::MyUsage::set_cpu_category_for_this_thread(Category cat) noexcept
{
- ThreadTrackerImpl::self().set_category(_old_cat_idx);
+ struct Wrapper {
+ std::shared_ptr<ThreadTrackerImpl> self;
+ Wrapper() : self(std::make_shared<ThreadTrackerImpl>(cpu_usage::create_thread_sampler())) {
+ CpuUsage::self().add_thread(self);
+ }
+ ~Wrapper() {
+ self->set_category(CpuUsage::Category::OTHER);
+ CpuUsage::self().remove_thread(std::move(self));
+ }
+ };
+ thread_local Wrapper wrapper;
+ return wrapper.self->set_category(cat);
}
CpuUsage::CpuUsage()
@@ -218,6 +224,11 @@ CpuUsage::do_sample()
my_sample.merge(_usage);
_usage = my_sample;
}
+ auto total = cpu_usage::RUsage::sample().total();
+ for (size_t i = 0; i < index_of(Category::OTHER); ++i) {
+ total -= my_sample[i];
+ }
+ my_sample[Category::OTHER] = std::max(total, duration::zero());
TimedSample result{t, my_sample};
if (my_promise.has_value()) {
my_promise.value().set_value(result);
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h
index e3333bac6b7..7b44d07a0a3 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.h
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h
@@ -2,6 +2,7 @@
#include "spin_lock.h"
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/stllike/string.h>
#include <memory>
#include <future>
#include <vector>
@@ -12,13 +13,24 @@ namespace vespalib {
namespace cpu_usage {
/**
+ * Uses getrusage to sample the total amount of user and system cpu
+ * time used so far.
+ **/
+struct RUsage {
+ duration user;
+ duration system;
+ duration total() const { return user + system; }
+ static RUsage sample() noexcept;
+};
+
+/**
* Samples the total CPU usage of the thread that created it. Note
* that this must not be used after thread termination. Enables
* sampling the CPU usage of a thread from outside the thread.
**/
struct ThreadSampler {
using UP = std::unique_ptr<ThreadSampler>;
- virtual duration sample() const = 0;
+ virtual duration sample() const noexcept = 0;
virtual ~ThreadSampler() {}
};
@@ -45,16 +57,15 @@ public:
// exclusive/non-overlapping; a thread can only perform one kind
// of CPU work at any specific time.
enum class Category {
- SETUP = 0, // usage related to system setup (init/(re-)config/etc.)
- READ = 1, // usage related to reading data from the system
- WRITE = 2, // usage related to writing data to the system
- COMPACT = 3, // usage related to internal data re-structuring
- MAINTAIN = 4, // usage related to distributed cluster maintainance
- NETWORK = 5, // usage related to network communication
- OTHER = 6 // unspecified usage
+ SETUP = 0, // usage related to system setup (init/(re-)config/etc.)
+ READ = 1, // usage related to reading data from the system
+ WRITE = 2, // usage related to writing data to the system
+ COMPACT = 3, // usage related to internal data re-structuring
+ OTHER = 4 // all other cpu usage not in the categories above
};
+ static vespalib::string &name_of(Category cat);
static constexpr size_t index_of(Category cat) { return static_cast<size_t>(cat); }
- static constexpr size_t num_categories = 7;
+ static constexpr size_t num_categories = 5;
// A sample contains how much CPU has been spent in various
// categories.
@@ -78,6 +89,33 @@ public:
// a sample tagged with the time it was taken
using TimedSample = std::pair<steady_time, Sample>;
+ // Used by threads to signal what kind of CPU they are currently
+ // using. The thread will contribute to the declared CPU usage
+ // category while this object lives. MyUsage instances may shadow
+ // each other, but must be destructed in reverse construction
+ // order. The preferred way to use this class is by doing:
+ //
+ // auto my_usage = CpuUsage::use(my_cat);
+ class MyUsage {
+ private:
+ Category _old_cat;
+ static Category set_cpu_category_for_this_thread(Category cat) noexcept;
+ public:
+ MyUsage(Category cat)
+ : _old_cat(set_cpu_category_for_this_thread(cat)) {}
+ MyUsage(MyUsage &&) = delete;
+ MyUsage(const MyUsage &) = delete;
+ MyUsage &operator=(MyUsage &&) = delete;
+ MyUsage &operator=(const MyUsage &) = delete;
+ ~MyUsage() { set_cpu_category_for_this_thread(_old_cat); }
+ };
+
+ // grant extra access for testing
+ struct Test;
+
+private:
+ using Guard = std::lock_guard<SpinLock>;
+
// Used when multiple threads call the 'sample' function at the
// same time. One thread will sample while the others will wait
// for the result.
@@ -92,31 +130,25 @@ public:
// in various categories since the last time it was sampled.
struct ThreadTracker {
using SP = std::shared_ptr<ThreadTracker>;
- virtual Sample sample() = 0;
+ virtual Sample sample() noexcept = 0;
virtual ~ThreadTracker() {}
};
- class ThreadTrackerImpl;
- // Used by threads to signal what kind of CPU they are currently
- // using. The thread will contribute to the declared CPU usage
- // category while this object lives. MyUsage instances may shadow
- // each other, but must be destructed in reverse construction
- // order. The preferred way to use this class is by doing:
- //
- // auto my_usage = CpuUsage::use(my_cat);
- class MyUsage {
+ class ThreadTrackerImpl : public ThreadTracker {
private:
- uint32_t _old_cat_idx;
+ SpinLock _lock;
+ Category _cat;
+ duration _old_usage;
+ cpu_usage::ThreadSampler::UP _sampler;
+ Sample _pending;
+
public:
- MyUsage(Category cat);
- MyUsage(MyUsage &&) = delete;
- MyUsage(const MyUsage &) = delete;
- MyUsage &operator=(MyUsage &&) = delete;
- MyUsage &operator=(const MyUsage &) = delete;
- ~MyUsage();
+ ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler);
+ // only called by owning thread
+ Category set_category(Category new_cat) noexcept;
+ Sample sample() noexcept override;
};
-private:
SpinLock _lock;
Sample _usage;
std::map<ThreadTracker*,ThreadTracker::SP> _threads;
@@ -125,8 +157,6 @@ private:
std::vector<ThreadTracker::SP> _pending_add;
std::vector<ThreadTracker::SP> _pending_remove;
- using Guard = std::lock_guard<SpinLock>;
-
CpuUsage();
CpuUsage(CpuUsage &&) = delete;
CpuUsage(const CpuUsage &) = delete;