From d9b2cc5dd6aa9210241efeac249be84772377b35 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Mon, 20 Feb 2023 11:39:12 +0000 Subject: remove document::Runnable use std::thread directly instead --- metrics/src/tests/metricmanagertest.cpp | 28 +++++++--------------------- metrics/src/tests/snapshottest.cpp | 3 +-- metrics/src/tests/stresstest.cpp | 25 ++++++++++++++----------- metrics/src/vespa/metrics/metricmanager.cpp | 19 ++++++++++--------- metrics/src/vespa/metrics/metricmanager.h | 14 +++++++++----- 5 files changed, 41 insertions(+), 48 deletions(-) (limited to 'metrics') diff --git a/metrics/src/tests/metricmanagertest.cpp b/metrics/src/tests/metricmanagertest.cpp index 604e9c46b80..98d03514de0 100644 --- a/metrics/src/tests/metricmanagertest.cpp +++ b/metrics/src/tests/metricmanagertest.cpp @@ -152,11 +152,10 @@ namespace { std::pair getMatchedMetrics(const vespalib::string& config) { - FastOS_ThreadPool pool; TestMetricSet mySet; MetricManager mm; mm.registerMetric(mm.getMetricLock(), mySet.set); - mm.init(ConfigUri(config), pool); + mm.init(ConfigUri(config)); MetricNameVisitor visitor; /** Take a copy to verify clone works. @@ -475,7 +474,6 @@ std::string dumpAllSnapshots(const MetricManager& mm, TEST_F(MetricManagerTest, test_snapshots) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); TestMetricSet mySet; @@ -491,8 +489,7 @@ TEST_F(MetricManagerTest, test_snapshots) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); MetricNameVisitor visitor; { MetricLockGuard lockGuard(mm.getMetricLock()); @@ -575,7 +572,6 @@ TEST_F(MetricManagerTest, test_snapshots) TEST_F(MetricManagerTest, test_xml_output) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); MetricManager mm(std::move(timerImpl)); @@ -593,8 +589,7 @@ TEST_F(MetricManagerTest, test_xml_output) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); takeSnapshots(mm, 1000); @@ -653,7 +648,6 @@ TEST_F(MetricManagerTest, test_xml_output) TEST_F(MetricManagerTest, test_json_output) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); MetricManager mm(std::move(timerImpl)); @@ -668,8 +662,7 @@ TEST_F(MetricManagerTest, test_json_output) "consumer[1]\n" "consumer[0].name snapper\n" "consumer[0].tags[1]\n" - "consumer[0].tags[0] snaptest\n"), - pool); + "consumer[0].tags[0] snaptest\n")); takeSnapshots(mm, 1000); @@ -743,14 +736,12 @@ namespace { struct MetricSnapshotTestFixture { MetricManagerTest& test; - FastOS_ThreadPool pool; FakeTimer* timer; MetricManager manager; MetricSet& mset; MetricSnapshotTestFixture(MetricManagerTest& callerTest, MetricSet& metricSet) : test(callerTest), - pool(), timer(new FakeTimer(1000)), manager(std::unique_ptr(timer)), mset(metricSet) @@ -765,8 +756,7 @@ struct MetricSnapshotTestFixture "consumer[1]\n" "consumer[0].name snapper\n" "consumer[0].addedmetrics[1]\n" - "consumer[0].addedmetrics[0] *\n"), - pool); + "consumer[0].addedmetrics[0] *\n")); test.takeSnapshots(manager, 1000); } @@ -986,7 +976,6 @@ TEST_F(MetricManagerTest, json_output_can_have_multiple_sets_with_same_name) TEST_F(MetricManagerTest, test_text_output) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); MetricManager mm(std::move(timerImpl)); @@ -1010,8 +999,7 @@ TEST_F(MetricManagerTest, test_text_output) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); std::string expected( "snapshot \"Active metrics showing updates since last snapshot\" from 1000 to 0 period 0\n" "temp.val6 average=2 last=2 min=2 max=2 count=1 total=2\n" @@ -1085,7 +1073,6 @@ TEST_F(MetricManagerTest, test_update_hooks) { std::mutex output_mutex; std::ostringstream output; - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); // Add a metric set just so one exist @@ -1114,8 +1101,7 @@ TEST_F(MetricManagerTest, test_update_hooks) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); output << "Init done\n"; MyUpdateHook postInitShort(output, output_mutex, "AIS", *timer); diff --git a/metrics/src/tests/snapshottest.cpp b/metrics/src/tests/snapshottest.cpp index 22eb3587eff..b4eb4a1353c 100644 --- a/metrics/src/tests/snapshottest.cpp +++ b/metrics/src/tests/snapshottest.cpp @@ -176,7 +176,6 @@ TEST_F(SnapshotTest, test_snapshot_two_days) TestMetricSet set("test"); FakeTimer* timer; - FastOS_ThreadPool threadPool; MetricManager mm( std::unique_ptr(timer = new FakeTimer)); { @@ -185,7 +184,7 @@ TEST_F(SnapshotTest, test_snapshot_two_days) } mm.init(config::ConfigUri("raw:consumer[1]\n" "consumer[0].name \"log\""), - threadPool, false); + false); tick(mm, timer->_timeInSecs * 1000); for (uint32_t days=0; days<2; ++days) { diff --git a/metrics/src/tests/stresstest.cpp b/metrics/src/tests/stresstest.cpp index e942d47b9de..afabf91d5c9 100644 --- a/metrics/src/tests/stresstest.cpp +++ b/metrics/src/tests/stresstest.cpp @@ -74,25 +74,29 @@ OuterMetricSet::OuterMetricSet(MetricSet* owner) OuterMetricSet::~OuterMetricSet() = default; -struct Hammer : public document::Runnable { +struct Hammer { using UP = std::unique_ptr; OuterMetricSet& _metrics; - - Hammer(OuterMetricSet& metrics,FastOS_ThreadPool& threadPool) - : _metrics(metrics) + std::atomic _stop_requested; + std::thread _thread; + + Hammer(OuterMetricSet& metrics) + : _metrics(metrics), + _stop_requested(false), + _thread() { - start(threadPool); + _thread = std::thread([this](){run();}); } ~Hammer() { - stop(); - join(); + _stop_requested = true; + _thread.join(); //std::cerr << "Loadgiver thread joined\n"; } - void run() override { + void run() { uint64_t i = 0; - while (running()) { + while (!_stop_requested.load(std::memory_order_relaxed)) { ++i; setMetrics(i, _metrics._inner1); setMetrics(i + 3, _metrics._inner2); @@ -114,10 +118,9 @@ TEST(StressTest, test_stress) OuterMetricSet metrics; LOG(info, "Starting load givers"); - FastOS_ThreadPool threadPool; std::vector hammers; for (uint32_t i=0; i<10; ++i) { - hammers.push_back(std::make_unique(metrics, threadPool)); + hammers.push_back(std::make_unique(metrics)); } LOG(info, "Waiting to let loadgivers hammer a while"); std::this_thread::sleep_for(5s); diff --git a/metrics/src/vespa/metrics/metricmanager.cpp b/metrics/src/vespa/metrics/metricmanager.cpp index ae75968e605..a0e44ddbeac 100644 --- a/metrics/src/vespa/metrics/metricmanager.cpp +++ b/metrics/src/vespa/metrics/metricmanager.cpp @@ -82,7 +82,9 @@ MetricManager::MetricManager(std::unique_ptr timer) _snapshotHookLatency("snapshothooklatency", {}, "Time in ms used to update a single snapshot hook", &_metricManagerMetrics), _resetLatency("resetlatency", {}, "Time in ms used to reset all metrics.", &_metricManagerMetrics), _snapshotLatency("snapshotlatency", {}, "Time in ms used to take a snapshot", &_metricManagerMetrics), - _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics) + _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics), + _stop_requested(false), + _thread() { registerMetric(getMetricLock(), _metricManagerMetrics); } @@ -95,15 +97,14 @@ MetricManager::~MetricManager() void MetricManager::stop() { - if (!running()) { - return; // Let stop() be idempotent. - } - Runnable::stop(); + request_stop(); { MetricLockGuard sync(_waiter); _cond.notify_all(); } - join(); + if (_thread.joinable()) { + _thread.join(); + } } void @@ -161,7 +162,7 @@ MetricManager::isInitialized() const { } void -MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool startThread) +MetricManager::init(const config::ConfigUri & uri, bool startThread) { if (isInitialized()) { throw vespalib::IllegalStateException( @@ -175,7 +176,7 @@ MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool configure(getMetricLock(), _configHandle->getConfig()); LOG(debug, "Starting worker thread, waiting for first iteration to complete."); if (startThread) { - Runnable::start(pool); + _thread = std::thread([this](){run();}); // Wait for first iteration to have completed, such that it is safe // to access snapshots afterwards. MetricLockGuard sync(_waiter); @@ -763,7 +764,7 @@ MetricManager::run() } // Ensure correct time for first snapshot _snapshots[0]->getSnapshot().setToTime(currentTime); - while (!stopping()) { + while (!stop_requested()) { currentTime = _timer->getTime(); time_t next = tick(sync, currentTime); if (currentTime < next) { diff --git a/metrics/src/vespa/metrics/metricmanager.h b/metrics/src/vespa/metrics/metricmanager.h index 5f35c349f7f..b1777a1d228 100644 --- a/metrics/src/vespa/metrics/metricmanager.h +++ b/metrics/src/vespa/metrics/metricmanager.h @@ -49,7 +49,6 @@ #include "valuemetric.h" #include "updatehook.h" #include -#include #include #include #include @@ -61,7 +60,7 @@ template class vespalib::hash_set; namespace metrics { -class MetricManager : private document::Runnable +class MetricManager { public: @@ -119,10 +118,15 @@ private: LongAverageMetric _resetLatency; LongAverageMetric _snapshotLatency; LongAverageMetric _sleepTimes; + std::atomic _stop_requested; + std::thread _thread; + void request_stop() { _stop_requested.store(true, std::memory_order_relaxed); } + bool stop_requested() const { return _stop_requested.load(std::memory_order_relaxed); } + public: MetricManager(std::unique_ptr timer = std::make_unique()); - ~MetricManager() override; + ~MetricManager(); void stop(); @@ -194,7 +198,7 @@ public: * of consumers. readConfig() will start a config subscription. It should * not be called multiple times. */ - void init(const config::ConfigUri & uri, FastOS_ThreadPool&, bool startThread = true); + void init(const config::ConfigUri & uri, bool startThread = true); /** * Visit a given snapshot for a given consumer. (Empty consumer name means @@ -271,7 +275,7 @@ private: friend struct SnapshotTest; void configure(const MetricLockGuard & guard, std::unique_ptr conf); - void run() override; + void run(); time_t tick(const MetricLockGuard & guard, time_t currentTime); /** * Utility function for updating periodic metrics. -- cgit v1.2.3