aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-02-20 11:39:12 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-02-20 12:33:29 +0000
commitd9b2cc5dd6aa9210241efeac249be84772377b35 (patch)
treea78022b7588a4deba71544c79dd6efaa53792f94
parenta5d5a7dd7bab499554691fa59e08b3771b5e32d3 (diff)
remove document::Runnable
use std::thread directly instead
-rw-r--r--metrics/src/tests/metricmanagertest.cpp28
-rw-r--r--metrics/src/tests/snapshottest.cpp3
-rw-r--r--metrics/src/tests/stresstest.cpp25
-rw-r--r--metrics/src/vespa/metrics/metricmanager.cpp19
-rw-r--r--metrics/src/vespa/metrics/metricmanager.h14
-rw-r--r--searchcore/src/apps/proton/proton.cpp2
-rw-r--r--searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp1
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp1
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp15
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h11
-rw-r--r--searchlib/src/vespa/searchlib/util/runnable.h1
-rw-r--r--storage/src/tests/bucketdb/lockablemaptest.cpp1
-rw-r--r--storage/src/tests/common/metricstest.cpp2
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp44
-rw-r--r--storage/src/tests/storageserver/statereportertest.cpp2
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.h1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h1
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h1
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h1
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.h6
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h1
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h1
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h1
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp12
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h9
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp4
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/util/document_runnable.cpp103
-rw-r--r--vespalib/src/vespa/vespalib/util/document_runnable.h96
31 files changed, 97 insertions, 314 deletions
diff --git a/metrics/src/tests/metricmanagertest.cpp b/metrics/src/tests/metricmanagertest.cpp
index 604e9c46b80..98d03514de0 100644
--- a/metrics/src/tests/metricmanagertest.cpp
+++ b/metrics/src/tests/metricmanagertest.cpp
@@ -152,11 +152,10 @@ namespace {
std::pair<std::string, std::string>
getMatchedMetrics(const vespalib::string& config)
{
- FastOS_ThreadPool pool;
TestMetricSet mySet;
MetricManager mm;
mm.registerMetric(mm.getMetricLock(), mySet.set);
- mm.init(ConfigUri(config), pool);
+ mm.init(ConfigUri(config));
MetricNameVisitor visitor;
/** Take a copy to verify clone works.
@@ -475,7 +474,6 @@ std::string dumpAllSnapshots(const MetricManager& mm,
TEST_F(MetricManagerTest, test_snapshots)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
TestMetricSet mySet;
@@ -491,8 +489,7 @@ TEST_F(MetricManagerTest, test_snapshots)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
MetricNameVisitor visitor;
{
MetricLockGuard lockGuard(mm.getMetricLock());
@@ -575,7 +572,6 @@ TEST_F(MetricManagerTest, test_snapshots)
TEST_F(MetricManagerTest, test_xml_output)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
MetricManager mm(std::move(timerImpl));
@@ -593,8 +589,7 @@ TEST_F(MetricManagerTest, test_xml_output)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
takeSnapshots(mm, 1000);
@@ -653,7 +648,6 @@ TEST_F(MetricManagerTest, test_xml_output)
TEST_F(MetricManagerTest, test_json_output)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
MetricManager mm(std::move(timerImpl));
@@ -668,8 +662,7 @@ TEST_F(MetricManagerTest, test_json_output)
"consumer[1]\n"
"consumer[0].name snapper\n"
"consumer[0].tags[1]\n"
- "consumer[0].tags[0] snaptest\n"),
- pool);
+ "consumer[0].tags[0] snaptest\n"));
takeSnapshots(mm, 1000);
@@ -743,14 +736,12 @@ namespace {
struct MetricSnapshotTestFixture
{
MetricManagerTest& test;
- FastOS_ThreadPool pool;
FakeTimer* timer;
MetricManager manager;
MetricSet& mset;
MetricSnapshotTestFixture(MetricManagerTest& callerTest, MetricSet& metricSet)
: test(callerTest),
- pool(),
timer(new FakeTimer(1000)),
manager(std::unique_ptr<MetricManager::Timer>(timer)),
mset(metricSet)
@@ -765,8 +756,7 @@ struct MetricSnapshotTestFixture
"consumer[1]\n"
"consumer[0].name snapper\n"
"consumer[0].addedmetrics[1]\n"
- "consumer[0].addedmetrics[0] *\n"),
- pool);
+ "consumer[0].addedmetrics[0] *\n"));
test.takeSnapshots(manager, 1000);
}
@@ -986,7 +976,6 @@ TEST_F(MetricManagerTest, json_output_can_have_multiple_sets_with_same_name)
TEST_F(MetricManagerTest, test_text_output)
{
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
MetricManager mm(std::move(timerImpl));
@@ -1010,8 +999,7 @@ TEST_F(MetricManagerTest, test_text_output)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
std::string expected(
"snapshot \"Active metrics showing updates since last snapshot\" from 1000 to 0 period 0\n"
"temp.val6 average=2 last=2 min=2 max=2 count=1 total=2\n"
@@ -1085,7 +1073,6 @@ TEST_F(MetricManagerTest, test_update_hooks)
{
std::mutex output_mutex;
std::ostringstream output;
- FastOS_ThreadPool pool;
FakeTimer* timer = new FakeTimer(1000);
std::unique_ptr<MetricManager::Timer> timerImpl(timer);
// Add a metric set just so one exist
@@ -1114,8 +1101,7 @@ TEST_F(MetricManagerTest, test_update_hooks)
"consumer[0].tags[0] snaptest\n"
"consumer[1].name log\n"
"consumer[1].tags[1]\n"
- "consumer[1].tags[0] snaptest\n"),
- pool);
+ "consumer[1].tags[0] snaptest\n"));
output << "Init done\n";
MyUpdateHook postInitShort(output, output_mutex, "AIS", *timer);
diff --git a/metrics/src/tests/snapshottest.cpp b/metrics/src/tests/snapshottest.cpp
index 22eb3587eff..b4eb4a1353c 100644
--- a/metrics/src/tests/snapshottest.cpp
+++ b/metrics/src/tests/snapshottest.cpp
@@ -176,7 +176,6 @@ TEST_F(SnapshotTest, test_snapshot_two_days)
TestMetricSet set("test");
FakeTimer* timer;
- FastOS_ThreadPool threadPool;
MetricManager mm(
std::unique_ptr<MetricManager::Timer>(timer = new FakeTimer));
{
@@ -185,7 +184,7 @@ TEST_F(SnapshotTest, test_snapshot_two_days)
}
mm.init(config::ConfigUri("raw:consumer[1]\n"
"consumer[0].name \"log\""),
- threadPool, false);
+ false);
tick(mm, timer->_timeInSecs * 1000);
for (uint32_t days=0; days<2; ++days) {
diff --git a/metrics/src/tests/stresstest.cpp b/metrics/src/tests/stresstest.cpp
index e942d47b9de..afabf91d5c9 100644
--- a/metrics/src/tests/stresstest.cpp
+++ b/metrics/src/tests/stresstest.cpp
@@ -74,25 +74,29 @@ OuterMetricSet::OuterMetricSet(MetricSet* owner)
OuterMetricSet::~OuterMetricSet() = default;
-struct Hammer : public document::Runnable {
+struct Hammer {
using UP = std::unique_ptr<Hammer>;
OuterMetricSet& _metrics;
-
- Hammer(OuterMetricSet& metrics,FastOS_ThreadPool& threadPool)
- : _metrics(metrics)
+ std::atomic<bool> _stop_requested;
+ std::thread _thread;
+
+ Hammer(OuterMetricSet& metrics)
+ : _metrics(metrics),
+ _stop_requested(false),
+ _thread()
{
- start(threadPool);
+ _thread = std::thread([this](){run();});
}
~Hammer() {
- stop();
- join();
+ _stop_requested = true;
+ _thread.join();
//std::cerr << "Loadgiver thread joined\n";
}
- void run() override {
+ void run() {
uint64_t i = 0;
- while (running()) {
+ while (!_stop_requested.load(std::memory_order_relaxed)) {
++i;
setMetrics(i, _metrics._inner1);
setMetrics(i + 3, _metrics._inner2);
@@ -114,10 +118,9 @@ TEST(StressTest, test_stress)
OuterMetricSet metrics;
LOG(info, "Starting load givers");
- FastOS_ThreadPool threadPool;
std::vector<Hammer::UP> hammers;
for (uint32_t i=0; i<10; ++i) {
- hammers.push_back(std::make_unique<Hammer>(metrics, threadPool));
+ hammers.push_back(std::make_unique<Hammer>(metrics));
}
LOG(info, "Waiting to let loadgivers hammer a while");
std::this_thread::sleep_for(5s);
diff --git a/metrics/src/vespa/metrics/metricmanager.cpp b/metrics/src/vespa/metrics/metricmanager.cpp
index ae75968e605..a0e44ddbeac 100644
--- a/metrics/src/vespa/metrics/metricmanager.cpp
+++ b/metrics/src/vespa/metrics/metricmanager.cpp
@@ -82,7 +82,9 @@ MetricManager::MetricManager(std::unique_ptr<Timer> timer)
_snapshotHookLatency("snapshothooklatency", {}, "Time in ms used to update a single snapshot hook", &_metricManagerMetrics),
_resetLatency("resetlatency", {}, "Time in ms used to reset all metrics.", &_metricManagerMetrics),
_snapshotLatency("snapshotlatency", {}, "Time in ms used to take a snapshot", &_metricManagerMetrics),
- _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics)
+ _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics),
+ _stop_requested(false),
+ _thread()
{
registerMetric(getMetricLock(), _metricManagerMetrics);
}
@@ -95,15 +97,14 @@ MetricManager::~MetricManager()
void
MetricManager::stop()
{
- if (!running()) {
- return; // Let stop() be idempotent.
- }
- Runnable::stop();
+ request_stop();
{
MetricLockGuard sync(_waiter);
_cond.notify_all();
}
- join();
+ if (_thread.joinable()) {
+ _thread.join();
+ }
}
void
@@ -161,7 +162,7 @@ MetricManager::isInitialized() const {
}
void
-MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool startThread)
+MetricManager::init(const config::ConfigUri & uri, bool startThread)
{
if (isInitialized()) {
throw vespalib::IllegalStateException(
@@ -175,7 +176,7 @@ MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool
configure(getMetricLock(), _configHandle->getConfig());
LOG(debug, "Starting worker thread, waiting for first iteration to complete.");
if (startThread) {
- Runnable::start(pool);
+ _thread = std::thread([this](){run();});
// Wait for first iteration to have completed, such that it is safe
// to access snapshots afterwards.
MetricLockGuard sync(_waiter);
@@ -763,7 +764,7 @@ MetricManager::run()
}
// Ensure correct time for first snapshot
_snapshots[0]->getSnapshot().setToTime(currentTime);
- while (!stopping()) {
+ while (!stop_requested()) {
currentTime = _timer->getTime();
time_t next = tick(sync, currentTime);
if (currentTime < next) {
diff --git a/metrics/src/vespa/metrics/metricmanager.h b/metrics/src/vespa/metrics/metricmanager.h
index 5f35c349f7f..b1777a1d228 100644
--- a/metrics/src/vespa/metrics/metricmanager.h
+++ b/metrics/src/vespa/metrics/metricmanager.h
@@ -49,7 +49,6 @@
#include "valuemetric.h"
#include "updatehook.h"
#include <vespa/vespalib/stllike/hash_set.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/jsonwriter.h>
#include <vespa/metrics/config-metricsmanager.h>
#include <vespa/config/subscription/configsubscriber.h>
@@ -61,7 +60,7 @@ template class vespalib::hash_set<metrics::Metric::String>;
namespace metrics {
-class MetricManager : private document::Runnable
+class MetricManager
{
public:
@@ -119,10 +118,15 @@ private:
LongAverageMetric _resetLatency;
LongAverageMetric _snapshotLatency;
LongAverageMetric _sleepTimes;
+ std::atomic<bool> _stop_requested;
+ std::thread _thread;
+ void request_stop() { _stop_requested.store(true, std::memory_order_relaxed); }
+ bool stop_requested() const { return _stop_requested.load(std::memory_order_relaxed); }
+
public:
MetricManager(std::unique_ptr<Timer> timer = std::make_unique<Timer>());
- ~MetricManager() override;
+ ~MetricManager();
void stop();
@@ -194,7 +198,7 @@ public:
* of consumers. readConfig() will start a config subscription. It should
* not be called multiple times.
*/
- void init(const config::ConfigUri & uri, FastOS_ThreadPool&, bool startThread = true);
+ void init(const config::ConfigUri & uri, bool startThread = true);
/**
* Visit a given snapshot for a given consumer. (Empty consumer name means
@@ -271,7 +275,7 @@ private:
friend struct SnapshotTest;
void configure(const MetricLockGuard & guard, std::unique_ptr<MetricsmanagerConfig> conf);
- void run() override;
+ void run();
time_t tick(const MetricLockGuard & guard, time_t currentTime);
/**
* Utility function for updating periodic metrics.
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp
index 06df12f73b7..2c47b5162ca 100644
--- a/searchcore/src/apps/proton/proton.cpp
+++ b/searchcore/src/apps/proton/proton.cpp
@@ -254,7 +254,7 @@ App::startAndRun(FastOS_ThreadPool & threadPool, FNET_Transport & transport, int
spiProton->createNode();
EV_STARTED("servicelayer");
} else {
- proton.getMetricManager().init(identityUri, threadPool);
+ proton.getMetricManager().init(identityUri);
}
EV_STARTED("proton");
while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) {
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
index 00f62aefc28..76fc875b209 100644
--- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
+++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
@@ -19,6 +19,7 @@
#include <vespa/vespalib/util/signalhandler.h>
#include <iostream>
#include <thread>
+#include <vespa/fastos/thread.h>
#include <vespa/log/log.h>
LOG_SETUP("vespa-transactionlog-inspect");
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index af214c34be8..a1a42b592b2 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -11,6 +11,7 @@
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/fnet/transport.h>
#include <vespa/fastos/file.h>
+#include <vespa/fastos/thread.h>
#include <thread>
#include <vespa/log/log.h>
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 98a9568e4e8..c96b0cdcd61 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -99,7 +99,7 @@ TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::strin
_baseDir(baseDir),
_domainConfig(cfg),
_executor(maxThreads, CpuUsage::wrap(tls_executor, CpuUsage::Category::WRITE)),
- _threadPool(std::make_unique<FastOS_ThreadPool>()),
+ _thread(),
_supervisor(std::make_unique<FRT_Supervisor>(&transport)),
_domains(),
_reqQ(),
@@ -143,25 +143,24 @@ TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::strin
} else {
throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno));
}
- start(*_threadPool);
+ _thread = std::thread([this](){run();});
}
TransLogServer::~TransLogServer()
{
- _closed = true;
- stop();
- join();
+ request_stop();
+ _thread.join();
_executor.sync();
_executor.shutdown();
_executor.sync();
}
-bool
-TransLogServer::onStop()
+void
+TransLogServer::request_stop()
{
+ _closed = true;
LOG(info, "Stopping TLS");
_reqQ.push(nullptr);
- return true;
}
void
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index f7ea80c9248..2c5fbf51a08 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -2,7 +2,6 @@
#pragma once
#include "domainconfig.h"
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/document/util/queue.h>
#include <vespa/fnet/frt/invokable.h>
@@ -18,7 +17,7 @@ namespace search::transactionlog {
class TransLogServerExplorer;
class Domain;
-class TransLogServer : private FRT_Invokable, public document::Runnable, public WriterFactory
+class TransLogServer : private FRT_Invokable, public WriterFactory
{
public:
friend class TransLogServerExplorer;
@@ -36,8 +35,8 @@ public:
TransLogServer & setDomainConfig(const DomainConfig & cfg);
private:
- bool onStop() override;
- void run() override;
+ void request_stop();
+ void run();
void exportRPC(FRT_Supervisor & supervisor);
void relayToThreadRPC(FRT_RPCRequest *req);
@@ -63,11 +62,13 @@ private:
using ReadGuard = std::shared_lock<std::shared_mutex>;
using WriteGuard = std::unique_lock<std::shared_mutex>;
+ bool running() const { return !_closed.load(std::memory_order_relaxed); }
+
vespalib::string _name;
vespalib::string _baseDir;
DomainConfig _domainConfig;
vespalib::ThreadStackExecutor _executor;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::thread _thread;
std::unique_ptr<FRT_Supervisor> _supervisor;
DomainList _domains;
mutable std::shared_mutex _domainMutex;; // Protects _domains
diff --git a/searchlib/src/vespa/searchlib/util/runnable.h b/searchlib/src/vespa/searchlib/util/runnable.h
index e268b13e09a..4b353209762 100644
--- a/searchlib/src/vespa/searchlib/util/runnable.h
+++ b/searchlib/src/vespa/searchlib/util/runnable.h
@@ -4,6 +4,7 @@
#include <mutex>
#include <condition_variable>
+#include <vespa/fastos/thread.h>
namespace search {
diff --git a/storage/src/tests/bucketdb/lockablemaptest.cpp b/storage/src/tests/bucketdb/lockablemaptest.cpp
index 582e6957c22..3a16ee170fe 100644
--- a/storage/src/tests/bucketdb/lockablemaptest.cpp
+++ b/storage/src/tests/bucketdb/lockablemaptest.cpp
@@ -1,6 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/storage/bucketdb/btree_lockable_map.hpp>
#include <vespa/storage/bucketdb/striped_btree_lockable_map.hpp>
#include <vespa/vespalib/datastore/buffer_type.hpp>
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp
index 97d1c22364f..78fa32e24e5 100644
--- a/storage/src/tests/common/metricstest.cpp
+++ b/storage/src/tests/common/metricstest.cpp
@@ -100,7 +100,7 @@ void MetricsTest::SetUp() {
_visitorMetrics = std::make_shared<VisitorMetrics>();
_visitorMetrics->initThreads(4);
_topSet->registerMetric(*_visitorMetrics);
- _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool());
+ _metricManager->init(config::ConfigUri(_config->getConfigId()));
}
void MetricsTest::TearDown() {
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 50ad7b54382..7f3fe06fc29 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -533,17 +533,18 @@ TEST_F(FileStorManagerTest, handler_priority) {
ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
-class MessagePusherThread : public document::Runnable {
+class MessagePusherThread {
public:
FileStorHandler& _handler;
Document::SP _doc;
std::atomic<bool> _done;
std::atomic<bool> _threadDone;
-
+ std::thread _thread;
+
MessagePusherThread(FileStorHandler& handler, Document::SP doc);
- ~MessagePusherThread() override;
+ ~MessagePusherThread();
- void run() override {
+ void run() {
while (!_done) {
document::BucketIdFactory factory;
document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId());
@@ -558,11 +559,16 @@ public:
};
MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc)
- : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false)
-{}
-MessagePusherThread::~MessagePusherThread() = default;
+ : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false), _thread()
+{
+ _thread = std::thread([this](){run();});
+}
+MessagePusherThread::~MessagePusherThread()
+{
+ _thread.join();
+}
-class MessageFetchingThread : public document::Runnable {
+class MessageFetchingThread {
public:
const uint32_t _threadId;
FileStorHandler& _handler;
@@ -571,13 +577,17 @@ public:
std::atomic<bool> _done;
std::atomic<bool> _failed;
std::atomic<bool> _threadDone;
-
+ std::thread _thread;
+
explicit MessageFetchingThread(FileStorHandler& handler)
: _threadId(0), _handler(handler), _config(0), _fetchedCount(0), _done(false),
- _failed(false), _threadDone(false)
- {}
-
- void run() override {
+ _failed(false), _threadDone(false), _thread()
+ {
+ _thread = std::thread([this](){run();});
+ }
+ ~MessageFetchingThread();
+
+ void run() {
while (!_done) {
FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
if (msg.msg.get()) {
@@ -596,6 +606,10 @@ public:
_threadDone = true;
};
};
+MessageFetchingThread::~MessageFetchingThread()
+{
+ _thread.join();
+}
TEST_F(FileStorManagerTest, handler_paused_multi_thread) {
FileStorHandlerComponents c(*this);
@@ -606,12 +620,8 @@ TEST_F(FileStorManagerTest, handler_paused_multi_thread) {
Document::SP doc(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release());
- FastOS_ThreadPool pool;
MessagePusherThread pushthread(filestorHandler, doc);
- pushthread.start(pool);
-
MessageFetchingThread thread(filestorHandler);
- thread.start(pool);
for (uint32_t i = 0; i < 50; ++i) {
std::this_thread::sleep_for(2ms);
diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp
index b7903de0fe2..1fb5a9730c4 100644
--- a/storage/src/tests/storageserver/statereportertest.cpp
+++ b/storage/src/tests/storageserver/statereportertest.cpp
@@ -97,7 +97,7 @@ void StateReporterTest::SetUp() {
_filestorMetrics->initDiskMetrics(1, 1);
_topSet->registerMetric(*_filestorMetrics);
- _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool());
+ _metricManager->init(config::ConfigUri(_config->getConfigId()));
}
void StateReporterTest::TearDown() {
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h
index 74434c0116b..17a344a368a 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.h
+++ b/storage/src/vespa/storage/common/storagelinkqueued.h
@@ -16,7 +16,6 @@
#include "storagelink.h"
#include <vespa/storageframework/generic/thread/runnable.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <deque>
#include <limits>
#include <mutex>
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 08a48cc2d8a..99f61c62cd1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -10,7 +10,6 @@
#include "filestorhandler.h"
#include "service_layer_host_info_reporter.h"
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/persistence/spi/bucketexecutor.h>
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 593640e7d03..156ec8bc031 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -23,7 +23,6 @@
#include <vespa/messagebus/imessagehandler.h>
#include <vespa/messagebus/ireplyhandler.h>
#include <vespa/config/helper/ifetchercallback.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/config/subscription/configuri.h>
#include <vespa/config-bucketspaces.h>
#include <map>
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 0adfb209e66..dddcd42aad7 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -15,7 +15,6 @@
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/bucket/bucket.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/metrics/metricset.h>
#include <vespa/metrics/summetric.h>
#include <vespa/metrics/countmetric.h>
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 2836ab80acf..a09abb25f7a 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -201,7 +201,7 @@ StorageNode::initialize()
// have been created, such that we don't need to pay the extra cost of
// reinitializing metric manager often.
if ( ! _context.getComponentRegister().getMetricManager().isInitialized() ) {
- _context.getComponentRegister().getMetricManager().init(_configUri, _context.getThreadPool());
+ _context.getComponentRegister().getMetricManager().init(_configUri);
}
if (_chain) {
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h
index f07bdd37cd4..52709fb1d9b 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.h
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h
@@ -28,12 +28,6 @@ struct StorageNodeContext {
*/
ComponentRegister& getComponentRegister() { return *_componentRegister; }
- /**
- * There currently exist threads that doesn't use the component model.
- * Let the backend threadpool be accessible for now.
- */
- FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); }
-
~StorageNodeContext();
protected:
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 0c5ec08fb4c..02bb37db59f 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -30,7 +30,6 @@
#include <vespa/storageapi/message/internal.h>
#include <vespa/storageapi/message/visitor.h>
#include <vespa/config/helper/ifetchercallback.h>
-#include <vespa/vespalib/util/document_runnable.h>
namespace config {
class ConfigUri;
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index 729b675df3a..f6204fed438 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -22,7 +22,6 @@
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <vespa/metrics/metrictimer.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <atomic>
#include <deque>
diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
index d228dace1ed..1aede4d12e8 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h
@@ -31,7 +31,6 @@ public:
virtual ComponentRegisterImpl& getComponentRegister() { return *_compReg; }
FakeClock& getClock() { return _clock; }
ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; }
- FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); }
};
}
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
index 925c9cda248..314434a4c1a 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
@@ -28,11 +28,11 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
_tickDataPtr(0),
_interrupted(false),
_joined(false),
- _thread(*this),
+ _thread(),
_cpu_category(cpu_category)
{
_tickData[load_relaxed(_tickDataPtr)]._lastTick = pool.getClock().getMonotonicTime();
- _thread.start(_pool.getThreadPool());
+ _thread = std::thread([this](){run();});
}
ThreadImpl::~ThreadImpl()
@@ -70,19 +70,21 @@ void
ThreadImpl::interrupt()
{
_interrupted.store(true, std::memory_order_relaxed);
- _thread.stop();
}
void
ThreadImpl::join()
{
- _thread.join();
+ if (_thread.joinable()) {
+ _thread.join();
+ }
}
vespalib::string
ThreadImpl::get_live_thread_stack_trace() const
{
- return vespalib::SignalHandler::get_cross_thread_stack_trace(_thread.native_thread_id());
+ auto native_handle = const_cast<std::thread&>(_thread).native_handle();
+ return vespalib::SignalHandler::get_cross_thread_stack_trace(native_handle);
}
void
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
index d95ba2a37ef..68ed63ea17c 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
@@ -4,7 +4,6 @@
#include <vespa/storageframework/generic/thread/thread.h>
#include <vespa/vespalib/util/cpu_usage.h>
-#include <vespa/vespalib/util/document_runnable.h>
#include <array>
#include <atomic>
#include <optional>
@@ -15,12 +14,6 @@ struct ThreadPoolImpl;
class ThreadImpl final : public Thread
{
- struct BackendThread : public document::Runnable {
- ThreadImpl& _impl;
- explicit BackendThread(ThreadImpl& impl) : _impl(impl) {}
- void run() override { _impl.run(); }
- };
-
/**
* Internal data race free implementation of tick data that maps to and
* from ThreadTickData. We hide the atomicity of this since atomic vars
@@ -52,7 +45,7 @@ class ThreadImpl final : public Thread
std::atomic<uint32_t> _tickDataPtr;
std::atomic<bool> _interrupted;
bool _joined;
- BackendThread _thread;
+ std::thread _thread;
std::optional<vespalib::CpuUsage::Category> _cpu_category;
void run();
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
index 95959d06b54..068de8f5880 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
@@ -15,8 +15,7 @@ using vespalib::IllegalStateException;
namespace storage::framework::defaultimplementation {
ThreadPoolImpl::ThreadPoolImpl(Clock& clock)
- : _backendThreadPool(std::make_unique<FastOS_ThreadPool>()),
- _clock(clock),
+ : _clock(clock),
_stopping(false)
{ }
@@ -44,7 +43,6 @@ ThreadPoolImpl::~ThreadPoolImpl()
}
std::this_thread::sleep_for(10ms);
}
- _backendThreadPool->Close();
}
Thread::UP
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
index b788a3eed78..07b2dd78ed9 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
@@ -15,7 +15,6 @@ class ThreadImpl;
struct ThreadPoolImpl final : public ThreadPool
{
- std::unique_ptr<FastOS_ThreadPool> _backendThreadPool;
std::vector<ThreadImpl*> _threads;
mutable std::mutex _threadVectorLock;
Clock & _clock;
@@ -30,7 +29,6 @@ public:
std::optional<vespalib::CpuUsage::Category> cpu_category) override;
void visitThreads(ThreadVisitor&) const override;
void unregisterThread(ThreadImpl&);
- FastOS_ThreadPool& getThreadPool() { return *_backendThreadPool; }
Clock& getClock() { return _clock; }
};
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 73e8b93a2ff..c8536fc68c1 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -24,7 +24,6 @@ vespa_add_library(vespalib_vespalib_util OBJECT
cpu_usage.cpp
crc.cpp
destructor_callbacks.cpp
- document_runnable.cpp
doom.cpp
dual_merge_director.cpp
error.cpp
diff --git a/vespalib/src/vespa/vespalib/util/document_runnable.cpp b/vespalib/src/vespa/vespalib/util/document_runnable.cpp
deleted file mode 100644
index c0af72dbbb1..00000000000
--- a/vespalib/src/vespa/vespalib/util/document_runnable.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "document_runnable.h"
-#include <vespa/vespalib/util/exceptions.h>
-#include <cassert>
-
-namespace document {
-
-Runnable::Runnable()
- : _stateLock(),
- _stateCond(),
- _state(NOT_RUNNING)
-{
-}
-
-Runnable::~Runnable() {
- std::lock_guard monitorGuard(_stateLock);
- assert(getState() == NOT_RUNNING);
-}
-
-bool Runnable::start(FastOS_ThreadPool& pool)
-{
- std::unique_lock guard(_stateLock);
- _stateCond.wait(guard, [&](){ return (getState() != STOPPING);});
-
- if (getState() != NOT_RUNNING) return false;
- set_state(STARTING);
- if (pool.NewThread(this) == nullptr) {
- throw vespalib::IllegalStateException("Failed starting a new thread", VESPA_STRLOC);
- }
- return true;
-}
-
-void Runnable::set_state(State new_state) noexcept
-{
- _state.store(new_state, std::memory_order_relaxed);
-}
-
-bool Runnable::stopping() const noexcept
-{
- State s(getState());
- return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag());
-}
-
-bool Runnable::running() const noexcept
-{
- State s(getState());
- // Must check break-flag too, as threadpool will use that to close
- // down.
- return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag()));
-}
-
-bool Runnable::stop()
-{
- std::lock_guard monitor(_stateLock);
- if (getState() == STOPPING || getState() == NOT_RUNNING) return false;
- GetThread()->SetBreakFlag();
- set_state(STOPPING);
- return onStop();
-}
-
-bool Runnable::onStop()
-{
- return true;
-}
-
-bool Runnable::join() const
-{
- std::unique_lock guard(_stateLock);
- assert ((getState() != STARTING) && (getState() != RUNNING));
- _stateCond.wait(guard, [&](){ return (getState() == NOT_RUNNING);});
- return true;
-}
-
-FastOS_ThreadId Runnable::native_thread_id() const noexcept
-{
- return GetThread()->GetThreadId();
-}
-
-void Runnable::Run(FastOS_ThreadInterface*, void*)
-{
- {
- std::lock_guard guard(_stateLock);
- // Don't set state if its already at stopping. (And let run() be
- // called even though about to stop for consistency)
- if (getState() == STARTING) {
- set_state(RUNNING);
- }
- }
-
- // By not catching exceptions, they should abort whole application.
- // We should thus not need to have a catch-all to set state to not
- // running.
- run();
-
- {
- std::lock_guard guard(_stateLock);
- set_state(NOT_RUNNING);
- _stateCond.notify_all();
- }
-}
-
-}
diff --git a/vespalib/src/vespa/vespalib/util/document_runnable.h b/vespalib/src/vespa/vespalib/util/document_runnable.h
deleted file mode 100644
index 89388bac34c..00000000000
--- a/vespalib/src/vespa/vespalib/util/document_runnable.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @class document::Runnable
- * @ingroup util
- *
- * @brief Implementation of FastOS_Runnable that implements threadsafe stop.
- *
- * FastOS_Runnable can easily be used unsafe. If you use the thread pointer for
- * anything after your runnable had returned from Run(), it could affect another
- * runnable now using that thread.
- *
- * Using this class should be foolproof to avoid synchronization issues during
- * thread starting and stopping :)
- *
- * @author H�kon Humberset
- * @date 2005-09-19
- */
-
-#pragma once
-
-#include <vespa/fastos/thread.h>
-#include <atomic>
-
-namespace document {
-
-class Runnable : private FastOS_Runnable {
-public:
- enum State { NOT_RUNNING, STARTING, RUNNING, STOPPING };
-
-private:
- mutable std::mutex _stateLock;
- mutable std::condition_variable _stateCond;
- std::atomic<State> _state;
-
- void Run(FastOS_ThreadInterface*, void*) override;
- void set_state(State new_state) noexcept; // _stateLock must be held
-public:
- /**
- * Create a runnable.
- * @param pool If set, runnable will be started in constructor.
- */
- Runnable();
- ~Runnable() override;
-
- /**
- * Start this runnable.
- * @param pool The threadpool from which a thread is acquired.
- * @return True if thread was started, false if thread was already running.
- */
- bool start(FastOS_ThreadPool& pool);
-
- /**
- * Stop this runnable.
- * @return True if thread was stopped, false if thread was not running.
- */
- bool stop();
-
- /**
- * Called in stop(). Implement, to for instance notify any monitors that
- * can be waiting.
- */
- virtual bool onStop();
-
- /**
- * Wait for this thread to finish, if it is in the process of stopping.
- * @return True if thread finished (or not running), false if thread is
- * running normally and no stop is scheduled.
- */
- bool join() const;
-
- /**
- * Implement this to make the runnable actually do something.
- */
- virtual void run() = 0;
-
- /**
- * Get the current state of this runnable.
- * Thread safe (but relaxed) read; may be stale if done outside _stateLock.
- */
- [[nodiscard]] State getState() const noexcept {
- return _state.load(std::memory_order_relaxed);
- }
-
- /** Check if system is in the process of stopping. */
- [[nodiscard]] bool stopping() const noexcept;
-
- /**
- * Checks if runnable is running or not. (Started is considered running)
- */
- [[nodiscard]] bool running() const noexcept;
-
- FastOS_ThreadId native_thread_id() const noexcept;
-};
-
-}
-