summaryrefslogtreecommitdiffstats
path: root/metrics
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-02-20 11:39:12 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-02-20 12:33:29 +0000
commitd9b2cc5dd6aa9210241efeac249be84772377b35 (patch)
treea78022b7588a4deba71544c79dd6efaa53792f94 /metrics
parenta5d5a7dd7bab499554691fa59e08b3771b5e32d3 (diff)
remove document::Runnable
use std::thread directly instead
Diffstat (limited to 'metrics')
-rw-r--r--metrics/src/tests/metricmanagertest.cpp28
-rw-r--r--metrics/src/tests/snapshottest.cpp3
-rw-r--r--metrics/src/tests/stresstest.cpp25
-rw-r--r--metrics/src/vespa/metrics/metricmanager.cpp19
-rw-r--r--metrics/src/vespa/metrics/metricmanager.h14
5 files changed, 41 insertions, 48 deletions
diff --git a/metrics/src/tests/metricmanagertest.cpp b/metrics/src/tests/metricmanagertest.cpp
index 604e9c46b80..98d03514de0 100644
--- a/metrics/src/tests/metricmanagertest.cpp
+++ b/metrics/src/tests/metricmanagertest.cpp
@@ -152,11 +152,10 @@ namespace {
std::pair<std::string, std::string>
getMatchedMetrics(const vespalib::string& config)
{
- FastOS_ThreadPool pool;
TestMetricSet mySet;
MetricManager mm;
mm.registerMetric(mm.getMetricLock(), mySet.set);
- mm.init(ConfigUri(config), pool);
+ mm.init(ConfigUri(config));
MetricNameVisitor visitor;
/** Take a copy to verify clone works.
@@ -475,7 +474,6 @@ std::string dumpAllSnapshots(const MetricManager& mm,
TEST_F(MetricManagerTest, test_snapshots)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
TestMetricSet mySet;
@@ -491,8 +489,7 @@ TEST_F(MetricManagerTest, test_snapshots)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
MetricNameVisitor visitor;
{
MetricLockGuard lockGuard(mm.getMetricLock());
@@ -575,7 +572,6 @@ TEST_F(MetricManagerTest, test_snapshots)
TEST_F(MetricManagerTest, test_xml_output)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
MetricManager mm(std::move(timerImpl));
@@ -593,8 +589,7 @@ TEST_F(MetricManagerTest, test_xml_output)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
takeSnapshots(mm, 1000);
@@ -653,7 +648,6 @@ TEST_F(MetricManagerTest, test_xml_output)
TEST_F(MetricManagerTest, test_json_output)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
MetricManager mm(std::move(timerImpl));
@@ -668,8 +662,7 @@ TEST_F(MetricManagerTest, test_json_output)
"consumer[1]\n"
"consumer[0].name snapper\n"
"consumer[0].tags[1]\n"
- "consumer[0].tags[0] snaptest\n"),
- pool);
+ "consumer[0].tags[0] snaptest\n"));
takeSnapshots(mm, 1000);
@@ -743,14 +736,12 @@ namespace {
struct MetricSnapshotTestFixture
{
MetricManagerTest& test;
- FastOS_ThreadPool pool;
FakeTimer* timer;
MetricManager manager;
MetricSet& mset;
MetricSnapshotTestFixture(MetricManagerTest& callerTest, MetricSet& metricSet)
: test(callerTest),
- pool(),
timer(new FakeTimer(1000)),
manager(std::unique_ptr<MetricManager::Timer>(timer)),
mset(metricSet)
@@ -765,8 +756,7 @@ struct MetricSnapshotTestFixture
"consumer[1]\n"
"consumer[0].name snapper\n"
"consumer[0].addedmetrics[1]\n"
- "consumer[0].addedmetrics[0] *\n"),
- pool);
+ "consumer[0].addedmetrics[0] *\n"));
test.takeSnapshots(manager, 1000);
}
@@ -986,7 +976,6 @@ TEST_F(MetricManagerTest, json_output_can_have_multiple_sets_with_same_name)
TEST_F(MetricManagerTest, test_text_output)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
MetricManager mm(std::move(timerImpl));
@@ -1010,8 +999,7 @@ TEST_F(MetricManagerTest, test_text_output)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
std::string expected(
"snapshot \"Active metrics showing updates since last snapshot\" from 1000 to 0 period 0\n"
"temp.val6 average=2 last=2 min=2 max=2 count=1 total=2\n"
@@ -1085,7 +1073,6 @@ TEST_F(MetricManagerTest, test_update_hooks)
{
std::mutex output_mutex;
std::ostringstream output;
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
// Add a metric set just so one exist
@@ -1114,8 +1101,7 @@ TEST_F(MetricManagerTest, test_update_hooks)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
output << "Init done\n";
MyUpdateHook postInitShort(output, output_mutex, "AIS", *timer);
diff --git a/metrics/src/tests/snapshottest.cpp b/metrics/src/tests/snapshottest.cpp
index 22eb3587eff..b4eb4a1353c 100644
--- a/metrics/src/tests/snapshottest.cpp
+++ b/metrics/src/tests/snapshottest.cpp
@@ -176,7 +176,6 @@ TEST_F(SnapshotTest, test_snapshot_two_days)
TestMetricSet set("test");
FakeTimer* timer;
- FastOS_ThreadPool threadPool;
MetricManager mm(
std::unique_ptr<MetricManager::Timer>(timer = new FakeTimer));
{
@@ -185,7 +184,7 @@ TEST_F(SnapshotTest, test_snapshot_two_days)
}
mm.init(config::ConfigUri("raw:consumer[1]\n"
"consumer[0].name \"log\""),
- threadPool, false);
+ false);
tick(mm, timer->_timeInSecs * 1000);
for (uint32_t days=0; days<2; ++days) {
diff --git a/metrics/src/tests/stresstest.cpp b/metrics/src/tests/stresstest.cpp
index e942d47b9de..afabf91d5c9 100644
--- a/metrics/src/tests/stresstest.cpp
+++ b/metrics/src/tests/stresstest.cpp
@@ -74,25 +74,29 @@ OuterMetricSet::OuterMetricSet(MetricSet* owner)
OuterMetricSet::~OuterMetricSet() = default;
-struct Hammer : public document::Runnable {
+struct Hammer {
using UP = std::unique_ptr<Hammer>;
OuterMetricSet& _metrics;
-
- Hammer(OuterMetricSet& metrics,FastOS_ThreadPool& threadPool)
- : _metrics(metrics)
+ std::atomic<bool> _stop_requested;
+ std::thread _thread;
+
+ Hammer(OuterMetricSet& metrics)
+ : _metrics(metrics),
+ _stop_requested(false),
+ _thread()
{
- start(threadPool);
+ _thread = std::thread([this](){run();});
}
~Hammer() {
- stop();
- join();
+ _stop_requested = true;
+ _thread.join();
//std::cerr << "Loadgiver thread joined\n";
}
- void run() override {
+ void run() {
uint64_t i = 0;
- while (running()) {
+ while (!_stop_requested.load(std::memory_order_relaxed)) {
++i;
setMetrics(i, _metrics._inner1);
setMetrics(i + 3, _metrics._inner2);
@@ -114,10 +118,9 @@ TEST(StressTest, test_stress)
OuterMetricSet metrics;
LOG(info, "Starting load givers");
- FastOS_ThreadPool threadPool;
std::vector<Hammer::UP> hammers;
for (uint32_t i=0; i<10; ++i) {
- hammers.push_back(std::make_unique<Hammer>(metrics, threadPool));
+ hammers.push_back(std::make_unique<Hammer>(metrics));
}
LOG(info, "Waiting to let loadgivers hammer a while");
std::this_thread::sleep_for(5s);
diff --git a/metrics/src/vespa/metrics/metricmanager.cpp b/metrics/src/vespa/metrics/metricmanager.cpp
index ae75968e605..a0e44ddbeac 100644
--- a/metrics/src/vespa/metrics/metricmanager.cpp
+++ b/metrics/src/vespa/metrics/metricmanager.cpp
@@ -82,7 +82,9 @@ MetricManager::MetricManager(std::unique_ptr<Timer> timer)
_snapshotHookLatency("snapshothooklatency", {}, "Time in ms used to update a single snapshot hook", &_metricManagerMetrics),
_resetLatency("resetlatency", {}, "Time in ms used to reset all metrics.", &_metricManagerMetrics),
_snapshotLatency("snapshotlatency", {}, "Time in ms used to take a snapshot", &_metricManagerMetrics),
- _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics)
+ _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics),
+ _stop_requested(false),
+ _thread()
{
registerMetric(getMetricLock(), _metricManagerMetrics);
}
@@ -95,15 +97,14 @@ MetricManager::~MetricManager()
void
MetricManager::stop()
{
- if (!running()) {
- return; // Let stop() be idempotent.
- }
- Runnable::stop();
+ request_stop();
{
MetricLockGuard sync(_waiter);
_cond.notify_all();
}
- join();
+ if (_thread.joinable()) {
+ _thread.join();
+ }
}
void
@@ -161,7 +162,7 @@ MetricManager::isInitialized() const {
}
void
-MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool startThread)
+MetricManager::init(const config::ConfigUri & uri, bool startThread)
{
if (isInitialized()) {
throw vespalib::IllegalStateException(
@@ -175,7 +176,7 @@ MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool
configure(getMetricLock(), _configHandle->getConfig());
LOG(debug, "Starting worker thread, waiting for first iteration to complete.");
if (startThread) {
- Runnable::start(pool);
+ _thread = std::thread([this](){run();});
// Wait for first iteration to have completed, such that it is safe
// to access snapshots afterwards.
MetricLockGuard sync(_waiter);
@@ -763,7 +764,7 @@ MetricManager::run()
}
// Ensure correct time for first snapshot
_snapshots[0]->getSnapshot().setToTime(currentTime);
- while (!stopping()) {
+ while (!stop_requested()) {
currentTime = _timer->getTime();
time_t next = tick(sync, currentTime);
if (currentTime < next) {
diff --git a/metrics/src/vespa/metrics/metricmanager.h b/metrics/src/vespa/metrics/metricmanager.h
index 5f35c349f7f..b1777a1d228 100644
--- a/metrics/src/vespa/metrics/metricmanager.h
+++ b/metrics/src/vespa/metrics/metricmanager.h
@@ -49,7 +49,6 @@
#include "valuemetric.h"
#include "updatehook.h"
#include <vespa/vespalib/stllike/hash_set.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/jsonwriter.h>
#include <vespa/metrics/config-metricsmanager.h>
#include <vespa/config/subscription/configsubscriber.h>
@@ -61,7 +60,7 @@ template class vespalib::hash_set<metrics::Metric::String>;
namespace metrics {
-class MetricManager : private document::Runnable
+class MetricManager
{
public:
@@ -119,10 +118,15 @@ private:
LongAverageMetric _resetLatency;
LongAverageMetric _snapshotLatency;
LongAverageMetric _sleepTimes;
+ std::atomic<bool> _stop_requested;
+ std::thread _thread;
+ void request_stop() { _stop_requested.store(true, std::memory_order_relaxed); }
+ bool stop_requested() const { return _stop_requested.load(std::memory_order_relaxed); }
+
public:
MetricManager(std::unique_ptr<Timer> timer = std::make_unique<Timer>());
- ~MetricManager() override;
+ ~MetricManager();
void stop();
@@ -194,7 +198,7 @@ public:
* of consumers. readConfig() will start a config subscription. It should
* not be called multiple times.
*/
- void init(const config::ConfigUri & uri, FastOS_ThreadPool&, bool startThread = true);
+ void init(const config::ConfigUri & uri, bool startThread = true);
/**
* Visit a given snapshot for a given consumer. (Empty consumer name means
@@ -271,7 +275,7 @@ private:
friend struct SnapshotTest;
void configure(const MetricLockGuard & guard, std::unique_ptr<MetricsmanagerConfig> conf);
- void run() override;
+ void run();
time_t tick(const MetricLockGuard & guard, time_t currentTime);
/**
* Utility function for updating periodic metrics.