diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-17 19:29:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-17 19:29:43 +0100 |
commit | 13babe5abd3ecb8ffe95124662d835df157c4bea (patch) | |
tree | 173af3cfd1895432866aa195556860618a4867d7 | |
parent | d70502fa56157840a2308ab34cc3228ee1a1be81 (diff) | |
parent | 5110a68e5e82f8f5382c4c2447dd0226b517ae53 (diff) |
Merge pull request #20815 from vespa-engine/havardpe/track-multi-threaded-cpu-usage-per-category
track cpu usage across threads and usage categories
-rw-r--r-- | vespalib/src/tests/cpu_usage/cpu_usage_test.cpp | 183 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/cpu_usage.cpp | 200 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/cpu_usage.h | 128 |
3 files changed, 511 insertions, 0 deletions
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp index 98a7bd780a7..5deb467ed17 100644 --- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp +++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp @@ -2,11 +2,14 @@ #include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/benchmark_timer.h> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/testkit/test_kit.h> +#include <sys/resource.h> #include <thread> using namespace vespalib; +using vespalib::make_string_short::fmt; bool verbose = false; size_t loop_cnt = 10; @@ -16,6 +19,16 @@ using Sampler = vespalib::cpu_usage::ThreadSampler; //----------------------------------------------------------------------------- +class EndTime { +private: + steady_time _end_time; +public: + EndTime(duration test_time) : _end_time(steady_clock::now() + test_time) {} + bool operator()() const { return (steady_clock::now() >= _end_time); } +}; + +//----------------------------------------------------------------------------- + void be_busy(duration d) { if (d > 0ms) { volatile int tmp = 123; @@ -91,6 +104,176 @@ TEST("measure thread CPU clock overhead") { //----------------------------------------------------------------------------- +void verify_category(CpuUsage::Category cat, size_t idx) { + switch (cat) { // make sure we known all categories + case CpuUsage::Category::SETUP: + case CpuUsage::Category::READ: + case CpuUsage::Category::WRITE: + case CpuUsage::Category::COMPACT: + case CpuUsage::Category::MAINTAIN: + case CpuUsage::Category::NETWORK: + case CpuUsage::Category::OTHER: + EXPECT_EQUAL(CpuUsage::index_of(cat), idx); + } +} + +TEST("require that CPU categories are as expected") { + TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u)); + TEST_DO(verify_category(CpuUsage::Category::READ, 1u)); + TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u)); + TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u)); + TEST_DO(verify_category(CpuUsage::Category::MAINTAIN,4u)); + TEST_DO(verify_category(CpuUsage::Category::NETWORK, 5u)); + TEST_DO(verify_category(CpuUsage::Category::OTHER, 6u)); + EXPECT_EQUAL(CpuUsage::num_categories, 7u); +} + +TEST("require that empty sample is zero") { + CpuUsage::Sample sample; + EXPECT_EQUAL(sample.size(), CpuUsage::num_categories); + for (uint32_t i = 0; i < sample.size(); ++i) { + EXPECT_EQUAL(sample[i].count(), 0); + } +} + +TEST("require that cpu samples can be manipulated and inspected") { + CpuUsage::Sample a; + CpuUsage::Sample b; + const CpuUsage::Sample &c = a; + a[CpuUsage::Category::SETUP] = 1ms; + a[CpuUsage::Category::READ] = 2ms; + a[CpuUsage::Category::WRITE] = 3ms; + a[CpuUsage::Category::COMPACT] = 4ms; + a[CpuUsage::Category::MAINTAIN] = 5ms; + a[CpuUsage::Category::NETWORK] = 6ms; + a[CpuUsage::Category::OTHER] = 7ms; + for (uint32_t i = 0; i < b.size(); ++i) { + b[i] = 10ms * (i + 1); + } + a.merge(b); + for (uint32_t i = 0; i < c.size(); ++i) { + EXPECT_EQUAL(c[i], 11ms * (i + 1)); + } + EXPECT_EQUAL(c[CpuUsage::Category::SETUP], 11ms); + EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms); + EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms); + EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms); + EXPECT_EQUAL(c[CpuUsage::Category::MAINTAIN], 55ms); + EXPECT_EQUAL(c[CpuUsage::Category::NETWORK], 66ms); + EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 77ms); +} + +//----------------------------------------------------------------------------- + +// prototype for the class we want to use to integrate CPU usage into +// metrics as load values. NB: this class is not thread safe. + +class CpuMonitor { +private: + duration _old_usage; + CpuUsage::TimedSample _old_sample; + duration _min_delay; + std::array<double,CpuUsage::num_categories+1> _load; + + static duration total_usage() { + rusage usage; + memset(&usage, 0, sizeof(usage)); + getrusage(RUSAGE_SELF, &usage); + return from_timeval(usage.ru_utime) + from_timeval(usage.ru_stime); + } + +public: + CpuMonitor(duration min_delay) + : _old_usage(total_usage()), + _old_sample(CpuUsage::sample()), + _min_delay(min_delay), + _load() {} + + std::array<double,CpuUsage::num_categories+1> get_load() { + if (steady_clock::now() >= (_old_sample.first + _min_delay)) { + auto new_usage = total_usage(); + auto new_sample = CpuUsage::sample(); + auto dt = to_s(new_sample.first - _old_sample.first); + double sampled_load = 0.0; + for (size_t i = 0; i < CpuUsage::num_categories; ++i) { + _load[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt; + sampled_load += _load[i]; + } + _load[CpuUsage::num_categories] = (to_s(new_usage - _old_usage) / dt) - sampled_load; + _old_usage = new_usage; + _old_sample = new_sample; + } + return _load; + } +}; + +std::array<vespalib::string,CpuUsage::num_categories+1> names +{ "SETUP", "READ", "WRITE", "COMPACT", "MAINTAIN", "NETWORK", "OTHER", "UNKNOWN" }; + +void do_sample_cpu_usage(const EndTime &end_time) { + CpuMonitor monitor(8ms); + while (!end_time()) { + std::this_thread::sleep_for(verbose ? 1s : 10ms); + auto load = monitor.get_load(); + vespalib::string body; + for (size_t i = 0; i < load.size(); ++i) { + if (!body.empty()) { + body.append(", "); + } + body.append(fmt("%s: %.2f", names[i].c_str(), load[i])); + } + fprintf(stderr, "CPU: %s\n", body.c_str()); + } +} + +void do_full_work(CpuUsage::Category cat, const EndTime &end_time) { + auto my_usage = CpuUsage::use(cat); + while (!end_time()) { + be_busy(4ms); + } +} + +void do_some_work(CpuUsage::Category cat, const EndTime &end_time) { + auto my_usage = CpuUsage::use(cat); + while (!end_time()) { + be_busy(4ms); + std::this_thread::sleep_for(4ms); + } +} + +void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) { + auto my_usage1 = CpuUsage::use(cat1); + while (!end_time()) { + be_busy(4ms); + auto my_usage2 = CpuUsage::use(cat2); + be_busy(4ms); + } +} + +void do_external_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) { + auto my_usage1 = CpuUsage::use(cat1); + while (!end_time()) { + std::thread thread([cat2](){ + auto my_usage2 = CpuUsage::use(cat2); + be_busy(4ms); + }); + thread.join(); + } +} + +TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 100ms)) { + switch (thread_id) { + case 0: return do_sample_cpu_usage(f1); + case 1: return do_full_work(CpuUsage::Category::WRITE, f1); + case 2: return do_some_work(CpuUsage::Category::READ, f1); + case 3: return do_nested_work(CpuUsage::Category::NETWORK, CpuUsage::Category::READ, f1); + case 4: return do_external_work(CpuUsage::Category::SETUP, CpuUsage::Category::COMPACT, f1); + default: TEST_FATAL("missing thread id case"); + } +} + +//----------------------------------------------------------------------------- + int main(int argc, char **argv) { TEST_MASTER.init(__FILE__); if ((argc == 2) && (argv[1] == std::string("verbose"))) { diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp index 4eee0a63870..ca2199275de 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp @@ -3,6 +3,8 @@ #include "cpu_usage.h" #include "require.h" #include <pthread.h> +#include <optional> +#include <cassert> namespace vespalib { @@ -53,4 +55,202 @@ ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_lo } // cpu_usage +class CpuUsage::ThreadTrackerImpl : public CpuUsage::ThreadTracker { +private: + SpinLock _lock; + uint32_t _cat_idx; + duration _old_usage; + cpu_usage::ThreadSampler::UP _sampler; + Sample _pending; + + using Guard = std::lock_guard<SpinLock>; + + struct Wrapper { + std::shared_ptr<ThreadTrackerImpl> self; + Wrapper() : self(std::make_shared<ThreadTrackerImpl>()) { + CpuUsage::self().add_thread(self); + } + ~Wrapper() { + self->set_category(CpuUsage::num_categories); + CpuUsage::self().remove_thread(std::move(self)); + } + }; + +public: + ThreadTrackerImpl() + : _lock(), + _cat_idx(num_categories), + _old_usage(), + _sampler(cpu_usage::create_thread_sampler()), + _pending() + { + } + + uint32_t set_category(uint32_t new_cat_idx) { + Guard guard(_lock); + duration new_usage = _sampler->sample(); + if (_cat_idx < num_categories) { + _pending[_cat_idx] += (new_usage - _old_usage); + } + _old_usage = new_usage; + size_t old_cat_idx = _cat_idx; + _cat_idx = new_cat_idx; + return old_cat_idx; + } + + Sample sample() override { + Guard guard(_lock); + if (_cat_idx < num_categories) { + duration new_usage = _sampler->sample(); + _pending[_cat_idx] += (new_usage - _old_usage); + _old_usage = new_usage; + } + Sample sample = _pending; + _pending = Sample(); + return sample; + } + + static ThreadTrackerImpl &self() { + thread_local Wrapper wrapper; + return *wrapper.self; + } +}; + +CpuUsage::MyUsage::MyUsage(Category cat) + : _old_cat_idx(ThreadTrackerImpl::self().set_category(index_of(cat))) +{ +} + +CpuUsage::MyUsage::~MyUsage() +{ + ThreadTrackerImpl::self().set_category(_old_cat_idx); +} + +CpuUsage::CpuUsage() + : _lock(), + _usage(), + _threads(), + _sampling(false), + _conflict(), + _pending_add(), + _pending_remove() +{ +} + +CpuUsage & +CpuUsage::self() +{ + static CpuUsage me; + return me; +} + +void +CpuUsage::do_add_thread(const Guard &, ThreadTracker::SP tracker) +{ + assert(!_sampling); + auto *key = tracker.get(); + auto [ignore, was_inserted] = _threads.emplace(key, std::move(tracker)); + assert(was_inserted); +} + +void +CpuUsage::do_remove_thread(const Guard &, ThreadTracker::SP tracker) +{ + assert(!_sampling); + _usage.merge(tracker->sample()); + auto was_removed = _threads.erase(tracker.get()); + assert(was_removed); +} + +void +CpuUsage::add_thread(ThreadTracker::SP tracker) +{ + Guard guard(_lock); + if (_sampling) { + _pending_add.push_back(std::move(tracker)); + } else { + do_add_thread(guard, std::move(tracker)); + } +} + +void +CpuUsage::remove_thread(ThreadTracker::SP tracker) +{ + Guard guard(_lock); + if (_sampling) { + _pending_remove.push_back(std::move(tracker)); + } else { + do_remove_thread(guard, std::move(tracker)); + } +} + +void +CpuUsage::handle_pending(const Guard &guard) +{ + for (auto &thread: _pending_add) { + do_add_thread(guard, std::move(thread)); + } + _pending_add.clear(); + for (auto &thread: _pending_remove) { + do_remove_thread(guard, std::move(thread)); + } + _pending_remove.clear(); +} + +CpuUsage::TimedSample +CpuUsage::do_sample() +{ + assert(_sampling); + Sample my_sample; + std::optional<std::promise<TimedSample>> my_promise; + auto t = steady_clock::now(); + for (const auto &entry: _threads) { + my_sample.merge(entry.first->sample()); + } + { + Guard guard(_lock); + _sampling = false; + handle_pending(guard); + if (_conflict) { + my_promise = std::move(_conflict->sample_promise); + _conflict.reset(); + } + my_sample.merge(_usage); + _usage = my_sample; + } + TimedSample result{t, my_sample}; + if (my_promise.has_value()) { + my_promise.value().set_value(result); + } + return result; +} + +CpuUsage::TimedSample +CpuUsage::sample_or_wait() +{ + std::shared_future<TimedSample> my_future; + { + Guard guard(_lock); + if (_sampling) { + if (!_conflict) { + _conflict = std::make_unique<SampleConflict>(); + } + my_future = _conflict->future_sample; + } else { + _sampling = true; + } + } + if (my_future.valid()) { + return my_future.get(); + } else { + return do_sample(); + } +} + +CpuUsage::TimedSample +CpuUsage::sample() +{ + return self().sample_or_wait(); +} + } // namespace diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h index 09509a984b5..e3333bac6b7 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.h +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h @@ -1,7 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "spin_lock.h" #include <vespa/vespalib/util/time.h> #include <memory> +#include <future> +#include <vector> +#include <map> namespace vespalib { @@ -22,4 +26,128 @@ ThreadSampler::UP create_thread_sampler(bool force_mock_impl = false, double exp } // cpu_usage +/** + * Tracks accumulative cpu usage across threads and work + * categories. Use the 'use' function to signal what kind of CPU you + * are using in the current thread. Use the 'sample' function to get a + * complete view of CPU usage so far. + * + * The 'use' function returns a MyUsage object that needs to be kept + * alive for as long as the current thread should contribute to the + * specified cpu use. Note that MyUsage instances may shadow each + * other, but must be destructed in reverse construction order. + **/ +class CpuUsage +{ +public: + // The kind of work performed by a thread. Used to separate + // different kinds of CPU usage. Note that categories are + // exclusive/non-overlapping; a thread can only perform one kind + // of CPU work at any specific time. + enum class Category { + SETUP = 0, // usage related to system setup (init/(re-)config/etc.) + READ = 1, // usage related to reading data from the system + WRITE = 2, // usage related to writing data to the system + COMPACT = 3, // usage related to internal data re-structuring + MAINTAIN = 4, // usage related to distributed cluster maintainance + NETWORK = 5, // usage related to network communication + OTHER = 6 // unspecified usage + }; + static constexpr size_t index_of(Category cat) { return static_cast<size_t>(cat); } + static constexpr size_t num_categories = 7; + + // A sample contains how much CPU has been spent in various + // categories. + class Sample { + private: + std::array<duration,num_categories> _usage; + public: + Sample() : _usage() {} + size_t size() const { return _usage.size(); } + duration &operator[](size_t idx) { return _usage[idx]; } + duration &operator[](Category cat) { return _usage[index_of(cat)]; } + const duration &operator[](size_t idx) const { return _usage[idx]; } + const duration &operator[](Category cat) const { return _usage[index_of(cat)]; } + void merge(const Sample &rhs) { + for (size_t i = 0; i < size(); ++i) { + _usage[i] += rhs._usage[i]; + } + } + }; + + // a sample tagged with the time it was taken + using TimedSample = std::pair<steady_time, Sample>; + + // Used when multiple threads call the 'sample' function at the + // same time. One thread will sample while the others will wait + // for the result. + struct SampleConflict { + std::promise<TimedSample> sample_promise; + std::shared_future<TimedSample> future_sample; + SampleConflict() : sample_promise(), + future_sample(sample_promise.get_future()) {} + }; + + // Interface used to perform destructive sampling of the CPU spent + // in various categories since the last time it was sampled. + struct ThreadTracker { + using SP = std::shared_ptr<ThreadTracker>; + virtual Sample sample() = 0; + virtual ~ThreadTracker() {} + }; + class ThreadTrackerImpl; + + // Used by threads to signal what kind of CPU they are currently + // using. The thread will contribute to the declared CPU usage + // category while this object lives. MyUsage instances may shadow + // each other, but must be destructed in reverse construction + // order. The preferred way to use this class is by doing: + // + // auto my_usage = CpuUsage::use(my_cat); + class MyUsage { + private: + uint32_t _old_cat_idx; + public: + MyUsage(Category cat); + MyUsage(MyUsage &&) = delete; + MyUsage(const MyUsage &) = delete; + MyUsage &operator=(MyUsage &&) = delete; + MyUsage &operator=(const MyUsage &) = delete; + ~MyUsage(); + }; + +private: + SpinLock _lock; + Sample _usage; + std::map<ThreadTracker*,ThreadTracker::SP> _threads; + bool _sampling; + std::unique_ptr<SampleConflict> _conflict; + std::vector<ThreadTracker::SP> _pending_add; + std::vector<ThreadTracker::SP> _pending_remove; + + using Guard = std::lock_guard<SpinLock>; + + CpuUsage(); + CpuUsage(CpuUsage &&) = delete; + CpuUsage(const CpuUsage &) = delete; + CpuUsage &operator=(CpuUsage &&) = delete; + CpuUsage &operator=(const CpuUsage &) = delete; + + static CpuUsage &self(); + + void do_add_thread(const Guard &guard, ThreadTracker::SP tracker); + void do_remove_thread(const Guard &guard, ThreadTracker::SP tracker); + + void add_thread(ThreadTracker::SP tracker); + void remove_thread(ThreadTracker::SP tracker); + + void handle_pending(const Guard &guard); + TimedSample do_sample(); + TimedSample sample_or_wait(); + +public: + static MyUsage use(Category cat) { return MyUsage(cat); } + static TimedSample sample(); +}; + } // namespace |