From d9b2cc5dd6aa9210241efeac249be84772377b35 Mon Sep 17 00:00:00 2001 From: Håvard Pettersen Date: Mon, 20 Feb 2023 11:39:12 +0000 Subject: remove document::Runnable use std::thread directly instead --- metrics/src/tests/metricmanagertest.cpp | 28 ++---- metrics/src/tests/snapshottest.cpp | 3 +- metrics/src/tests/stresstest.cpp | 25 ++--- metrics/src/vespa/metrics/metricmanager.cpp | 19 ++-- metrics/src/vespa/metrics/metricmanager.h | 14 ++- searchcore/src/apps/proton/proton.cpp | 2 +- .../vespa-transactionlog-inspect.cpp | 1 + .../tests/transactionlog/translogclient_test.cpp | 1 + .../searchlib/transactionlog/translogserver.cpp | 15 ++- .../searchlib/transactionlog/translogserver.h | 11 ++- searchlib/src/vespa/searchlib/util/runnable.h | 1 + storage/src/tests/bucketdb/lockablemaptest.cpp | 1 - storage/src/tests/common/metricstest.cpp | 2 +- .../filestorage/filestormanagertest.cpp | 44 +++++---- .../src/tests/storageserver/statereportertest.cpp | 2 +- .../src/vespa/storage/common/storagelinkqueued.h | 1 - .../persistence/filestorage/filestormanager.h | 1 - .../storage/storageserver/communicationmanager.h | 1 - .../vespa/storage/storageserver/mergethrottler.h | 1 - .../vespa/storage/storageserver/storagenode.cpp | 2 +- .../storage/storageserver/storagenodecontext.h | 6 -- .../src/vespa/storage/visiting/visitormanager.h | 1 - storage/src/vespa/storage/visiting/visitorthread.h | 1 - .../component/testcomponentregister.h | 1 - .../defaultimplementation/thread/threadimpl.cpp | 12 ++- .../defaultimplementation/thread/threadimpl.h | 9 +- .../thread/threadpoolimpl.cpp | 4 +- .../defaultimplementation/thread/threadpoolimpl.h | 2 - vespalib/src/vespa/vespalib/util/CMakeLists.txt | 1 - .../src/vespa/vespalib/util/document_runnable.cpp | 103 --------------------- .../src/vespa/vespalib/util/document_runnable.h | 96 ------------------- 31 files changed, 97 insertions(+), 314 deletions(-) delete mode 100644 vespalib/src/vespa/vespalib/util/document_runnable.cpp delete mode 100644 vespalib/src/vespa/vespalib/util/document_runnable.h diff --git a/metrics/src/tests/metricmanagertest.cpp b/metrics/src/tests/metricmanagertest.cpp index 604e9c46b80..98d03514de0 100644 --- a/metrics/src/tests/metricmanagertest.cpp +++ b/metrics/src/tests/metricmanagertest.cpp @@ -152,11 +152,10 @@ namespace { std::pair getMatchedMetrics(const vespalib::string& config) { - FastOS_ThreadPool pool; TestMetricSet mySet; MetricManager mm; mm.registerMetric(mm.getMetricLock(), mySet.set); - mm.init(ConfigUri(config), pool); + mm.init(ConfigUri(config)); MetricNameVisitor visitor; /** Take a copy to verify clone works. @@ -475,7 +474,6 @@ std::string dumpAllSnapshots(const MetricManager& mm, TEST_F(MetricManagerTest, test_snapshots) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); TestMetricSet mySet; @@ -491,8 +489,7 @@ TEST_F(MetricManagerTest, test_snapshots) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); MetricNameVisitor visitor; { MetricLockGuard lockGuard(mm.getMetricLock()); @@ -575,7 +572,6 @@ TEST_F(MetricManagerTest, test_snapshots) TEST_F(MetricManagerTest, test_xml_output) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); MetricManager mm(std::move(timerImpl)); @@ -593,8 +589,7 @@ TEST_F(MetricManagerTest, test_xml_output) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); takeSnapshots(mm, 1000); @@ -653,7 +648,6 @@ TEST_F(MetricManagerTest, test_xml_output) TEST_F(MetricManagerTest, test_json_output) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); MetricManager mm(std::move(timerImpl)); @@ -668,8 +662,7 @@ TEST_F(MetricManagerTest, test_json_output) "consumer[1]\n" "consumer[0].name snapper\n" "consumer[0].tags[1]\n" - "consumer[0].tags[0] snaptest\n"), - pool); + "consumer[0].tags[0] snaptest\n")); takeSnapshots(mm, 1000); @@ -743,14 +736,12 @@ namespace { struct MetricSnapshotTestFixture { MetricManagerTest& test; - FastOS_ThreadPool pool; FakeTimer* timer; MetricManager manager; MetricSet& mset; MetricSnapshotTestFixture(MetricManagerTest& callerTest, MetricSet& metricSet) : test(callerTest), - pool(), timer(new FakeTimer(1000)), manager(std::unique_ptr(timer)), mset(metricSet) @@ -765,8 +756,7 @@ struct MetricSnapshotTestFixture "consumer[1]\n" "consumer[0].name snapper\n" "consumer[0].addedmetrics[1]\n" - "consumer[0].addedmetrics[0] *\n"), - pool); + "consumer[0].addedmetrics[0] *\n")); test.takeSnapshots(manager, 1000); } @@ -986,7 +976,6 @@ TEST_F(MetricManagerTest, json_output_can_have_multiple_sets_with_same_name) TEST_F(MetricManagerTest, test_text_output) { - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); MetricManager mm(std::move(timerImpl)); @@ -1010,8 +999,7 @@ TEST_F(MetricManagerTest, test_text_output) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); std::string expected( "snapshot \"Active metrics showing updates since last snapshot\" from 1000 to 0 period 0\n" "temp.val6 average=2 last=2 min=2 max=2 count=1 total=2\n" @@ -1085,7 +1073,6 @@ TEST_F(MetricManagerTest, test_update_hooks) { std::mutex output_mutex; std::ostringstream output; - FastOS_ThreadPool pool; FakeTimer* timer = new FakeTimer(1000); std::unique_ptr timerImpl(timer); // Add a metric set just so one exist @@ -1114,8 +1101,7 @@ TEST_F(MetricManagerTest, test_update_hooks) "consumer[0].tags[0] snaptest\n" "consumer[1].name log\n" "consumer[1].tags[1]\n" - "consumer[1].tags[0] snaptest\n"), - pool); + "consumer[1].tags[0] snaptest\n")); output << "Init done\n"; MyUpdateHook postInitShort(output, output_mutex, "AIS", *timer); diff --git a/metrics/src/tests/snapshottest.cpp b/metrics/src/tests/snapshottest.cpp index 22eb3587eff..b4eb4a1353c 100644 --- a/metrics/src/tests/snapshottest.cpp +++ b/metrics/src/tests/snapshottest.cpp @@ -176,7 +176,6 @@ TEST_F(SnapshotTest, test_snapshot_two_days) TestMetricSet set("test"); FakeTimer* timer; - FastOS_ThreadPool threadPool; MetricManager mm( std::unique_ptr(timer = new FakeTimer)); { @@ -185,7 +184,7 @@ TEST_F(SnapshotTest, test_snapshot_two_days) } mm.init(config::ConfigUri("raw:consumer[1]\n" "consumer[0].name \"log\""), - threadPool, false); + false); tick(mm, timer->_timeInSecs * 1000); for (uint32_t days=0; days<2; ++days) { diff --git a/metrics/src/tests/stresstest.cpp b/metrics/src/tests/stresstest.cpp index e942d47b9de..afabf91d5c9 100644 --- a/metrics/src/tests/stresstest.cpp +++ b/metrics/src/tests/stresstest.cpp @@ -74,25 +74,29 @@ OuterMetricSet::OuterMetricSet(MetricSet* owner) OuterMetricSet::~OuterMetricSet() = default; -struct Hammer : public document::Runnable { +struct Hammer { using UP = std::unique_ptr; OuterMetricSet& _metrics; - - Hammer(OuterMetricSet& metrics,FastOS_ThreadPool& threadPool) - : _metrics(metrics) + std::atomic _stop_requested; + std::thread _thread; + + Hammer(OuterMetricSet& metrics) + : _metrics(metrics), + _stop_requested(false), + _thread() { - start(threadPool); + _thread = std::thread([this](){run();}); } ~Hammer() { - stop(); - join(); + _stop_requested = true; + _thread.join(); //std::cerr << "Loadgiver thread joined\n"; } - void run() override { + void run() { uint64_t i = 0; - while (running()) { + while (!_stop_requested.load(std::memory_order_relaxed)) { ++i; setMetrics(i, _metrics._inner1); setMetrics(i + 3, _metrics._inner2); @@ -114,10 +118,9 @@ TEST(StressTest, test_stress) OuterMetricSet metrics; LOG(info, "Starting load givers"); - FastOS_ThreadPool threadPool; std::vector hammers; for (uint32_t i=0; i<10; ++i) { - hammers.push_back(std::make_unique(metrics, threadPool)); + hammers.push_back(std::make_unique(metrics)); } LOG(info, "Waiting to let loadgivers hammer a while"); std::this_thread::sleep_for(5s); diff --git a/metrics/src/vespa/metrics/metricmanager.cpp b/metrics/src/vespa/metrics/metricmanager.cpp index ae75968e605..a0e44ddbeac 100644 --- a/metrics/src/vespa/metrics/metricmanager.cpp +++ b/metrics/src/vespa/metrics/metricmanager.cpp @@ -82,7 +82,9 @@ MetricManager::MetricManager(std::unique_ptr timer) _snapshotHookLatency("snapshothooklatency", {}, "Time in ms used to update a single snapshot hook", &_metricManagerMetrics), _resetLatency("resetlatency", {}, "Time in ms used to reset all metrics.", &_metricManagerMetrics), _snapshotLatency("snapshotlatency", {}, "Time in ms used to take a snapshot", &_metricManagerMetrics), - _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics) + _sleepTimes("sleeptime", {}, "Time in ms worker thread is sleeping", &_metricManagerMetrics), + _stop_requested(false), + _thread() { registerMetric(getMetricLock(), _metricManagerMetrics); } @@ -95,15 +97,14 @@ MetricManager::~MetricManager() void MetricManager::stop() { - if (!running()) { - return; // Let stop() be idempotent. - } - Runnable::stop(); + request_stop(); { MetricLockGuard sync(_waiter); _cond.notify_all(); } - join(); + if (_thread.joinable()) { + _thread.join(); + } } void @@ -161,7 +162,7 @@ MetricManager::isInitialized() const { } void -MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool startThread) +MetricManager::init(const config::ConfigUri & uri, bool startThread) { if (isInitialized()) { throw vespalib::IllegalStateException( @@ -175,7 +176,7 @@ MetricManager::init(const config::ConfigUri & uri, FastOS_ThreadPool& pool, bool configure(getMetricLock(), _configHandle->getConfig()); LOG(debug, "Starting worker thread, waiting for first iteration to complete."); if (startThread) { - Runnable::start(pool); + _thread = std::thread([this](){run();}); // Wait for first iteration to have completed, such that it is safe // to access snapshots afterwards. MetricLockGuard sync(_waiter); @@ -763,7 +764,7 @@ MetricManager::run() } // Ensure correct time for first snapshot _snapshots[0]->getSnapshot().setToTime(currentTime); - while (!stopping()) { + while (!stop_requested()) { currentTime = _timer->getTime(); time_t next = tick(sync, currentTime); if (currentTime < next) { diff --git a/metrics/src/vespa/metrics/metricmanager.h b/metrics/src/vespa/metrics/metricmanager.h index 5f35c349f7f..b1777a1d228 100644 --- a/metrics/src/vespa/metrics/metricmanager.h +++ b/metrics/src/vespa/metrics/metricmanager.h @@ -49,7 +49,6 @@ #include "valuemetric.h" #include "updatehook.h" #include -#include #include #include #include @@ -61,7 +60,7 @@ template class vespalib::hash_set; namespace metrics { -class MetricManager : private document::Runnable +class MetricManager { public: @@ -119,10 +118,15 @@ private: LongAverageMetric _resetLatency; LongAverageMetric _snapshotLatency; LongAverageMetric _sleepTimes; + std::atomic _stop_requested; + std::thread _thread; + void request_stop() { _stop_requested.store(true, std::memory_order_relaxed); } + bool stop_requested() const { return _stop_requested.load(std::memory_order_relaxed); } + public: MetricManager(std::unique_ptr timer = std::make_unique()); - ~MetricManager() override; + ~MetricManager(); void stop(); @@ -194,7 +198,7 @@ public: * of consumers. readConfig() will start a config subscription. It should * not be called multiple times. */ - void init(const config::ConfigUri & uri, FastOS_ThreadPool&, bool startThread = true); + void init(const config::ConfigUri & uri, bool startThread = true); /** * Visit a given snapshot for a given consumer. (Empty consumer name means @@ -271,7 +275,7 @@ private: friend struct SnapshotTest; void configure(const MetricLockGuard & guard, std::unique_ptr conf); - void run() override; + void run(); time_t tick(const MetricLockGuard & guard, time_t currentTime); /** * Utility function for updating periodic metrics. diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 06df12f73b7..2c47b5162ca 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -254,7 +254,7 @@ App::startAndRun(FastOS_ThreadPool & threadPool, FNET_Transport & transport, int spiProton->createNode(); EV_STARTED("servicelayer"); } else { - proton.getMetricManager().init(identityUri, threadPool); + proton.getMetricManager().init(identityUri); } EV_STARTED("proton"); while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) { diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index 00f62aefc28..76fc875b209 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include LOG_SETUP("vespa-transactionlog-inspect"); diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index af214c34be8..a1a42b592b2 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 98a9568e4e8..c96b0cdcd61 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -99,7 +99,7 @@ TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::strin _baseDir(baseDir), _domainConfig(cfg), _executor(maxThreads, CpuUsage::wrap(tls_executor, CpuUsage::Category::WRITE)), - _threadPool(std::make_unique()), + _thread(), _supervisor(std::make_unique(&transport)), _domains(), _reqQ(), @@ -143,25 +143,24 @@ TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::strin } else { throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } - start(*_threadPool); + _thread = std::thread([this](){run();}); } TransLogServer::~TransLogServer() { - _closed = true; - stop(); - join(); + request_stop(); + _thread.join(); _executor.sync(); _executor.shutdown(); _executor.sync(); } -bool -TransLogServer::onStop() +void +TransLogServer::request_stop() { + _closed = true; LOG(info, "Stopping TLS"); _reqQ.push(nullptr); - return true; } void diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index f7ea80c9248..2c5fbf51a08 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -2,7 +2,6 @@ #pragma once #include "domainconfig.h" -#include #include #include #include @@ -18,7 +17,7 @@ namespace search::transactionlog { class TransLogServerExplorer; class Domain; -class TransLogServer : private FRT_Invokable, public document::Runnable, public WriterFactory +class TransLogServer : private FRT_Invokable, public WriterFactory { public: friend class TransLogServerExplorer; @@ -36,8 +35,8 @@ public: TransLogServer & setDomainConfig(const DomainConfig & cfg); private: - bool onStop() override; - void run() override; + void request_stop(); + void run(); void exportRPC(FRT_Supervisor & supervisor); void relayToThreadRPC(FRT_RPCRequest *req); @@ -63,11 +62,13 @@ private: using ReadGuard = std::shared_lock; using WriteGuard = std::unique_lock; + bool running() const { return !_closed.load(std::memory_order_relaxed); } + vespalib::string _name; vespalib::string _baseDir; DomainConfig _domainConfig; vespalib::ThreadStackExecutor _executor; - std::unique_ptr _threadPool; + std::thread _thread; std::unique_ptr _supervisor; DomainList _domains; mutable std::shared_mutex _domainMutex;; // Protects _domains diff --git a/searchlib/src/vespa/searchlib/util/runnable.h b/searchlib/src/vespa/searchlib/util/runnable.h index e268b13e09a..4b353209762 100644 --- a/searchlib/src/vespa/searchlib/util/runnable.h +++ b/searchlib/src/vespa/searchlib/util/runnable.h @@ -4,6 +4,7 @@ #include #include +#include namespace search { diff --git a/storage/src/tests/bucketdb/lockablemaptest.cpp b/storage/src/tests/bucketdb/lockablemaptest.cpp index 582e6957c22..3a16ee170fe 100644 --- a/storage/src/tests/bucketdb/lockablemaptest.cpp +++ b/storage/src/tests/bucketdb/lockablemaptest.cpp @@ -1,6 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include #include #include #include diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index 97d1c22364f..78fa32e24e5 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -100,7 +100,7 @@ void MetricsTest::SetUp() { _visitorMetrics = std::make_shared(); _visitorMetrics->initThreads(4); _topSet->registerMetric(*_visitorMetrics); - _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool()); + _metricManager->init(config::ConfigUri(_config->getConfigId())); } void MetricsTest::TearDown() { diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 50ad7b54382..7f3fe06fc29 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -533,17 +533,18 @@ TEST_F(FileStorManagerTest, handler_priority) { ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } -class MessagePusherThread : public document::Runnable { +class MessagePusherThread { public: FileStorHandler& _handler; Document::SP _doc; std::atomic _done; std::atomic _threadDone; - + std::thread _thread; + MessagePusherThread(FileStorHandler& handler, Document::SP doc); - ~MessagePusherThread() override; + ~MessagePusherThread(); - void run() override { + void run() { while (!_done) { document::BucketIdFactory factory; document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId()); @@ -558,11 +559,16 @@ public: }; MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc) - : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false) -{} -MessagePusherThread::~MessagePusherThread() = default; + : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false), _thread() +{ + _thread = std::thread([this](){run();}); +} +MessagePusherThread::~MessagePusherThread() +{ + _thread.join(); +} -class MessageFetchingThread : public document::Runnable { +class MessageFetchingThread { public: const uint32_t _threadId; FileStorHandler& _handler; @@ -571,13 +577,17 @@ public: std::atomic _done; std::atomic _failed; std::atomic _threadDone; - + std::thread _thread; + explicit MessageFetchingThread(FileStorHandler& handler) : _threadId(0), _handler(handler), _config(0), _fetchedCount(0), _done(false), - _failed(false), _threadDone(false) - {} - - void run() override { + _failed(false), _threadDone(false), _thread() + { + _thread = std::thread([this](){run();}); + } + ~MessageFetchingThread(); + + void run() { while (!_done) { FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId); if (msg.msg.get()) { @@ -596,6 +606,10 @@ public: _threadDone = true; }; }; +MessageFetchingThread::~MessageFetchingThread() +{ + _thread.join(); +} TEST_F(FileStorManagerTest, handler_paused_multi_thread) { FileStorHandlerComponents c(*this); @@ -606,12 +620,8 @@ TEST_F(FileStorManagerTest, handler_paused_multi_thread) { Document::SP doc(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release()); - FastOS_ThreadPool pool; MessagePusherThread pushthread(filestorHandler, doc); - pushthread.start(pool); - MessageFetchingThread thread(filestorHandler); - thread.start(pool); for (uint32_t i = 0; i < 50; ++i) { std::this_thread::sleep_for(2ms); diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp index b7903de0fe2..1fb5a9730c4 100644 --- a/storage/src/tests/storageserver/statereportertest.cpp +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -97,7 +97,7 @@ void StateReporterTest::SetUp() { _filestorMetrics->initDiskMetrics(1, 1); _topSet->registerMetric(*_filestorMetrics); - _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool()); + _metricManager->init(config::ConfigUri(_config->getConfigId())); } void StateReporterTest::TearDown() { diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h index 74434c0116b..17a344a368a 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.h +++ b/storage/src/vespa/storage/common/storagelinkqueued.h @@ -16,7 +16,6 @@ #include "storagelink.h" #include -#include #include #include #include diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 08a48cc2d8a..99f61c62cd1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -10,7 +10,6 @@ #include "filestorhandler.h" #include "service_layer_host_info_reporter.h" -#include #include #include #include diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 593640e7d03..156ec8bc031 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 0adfb209e66..dddcd42aad7 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 2836ab80acf..a09abb25f7a 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -201,7 +201,7 @@ StorageNode::initialize() // have been created, such that we don't need to pay the extra cost of // reinitializing metric manager often. if ( ! _context.getComponentRegister().getMetricManager().isInitialized() ) { - _context.getComponentRegister().getMetricManager().init(_configUri, _context.getThreadPool()); + _context.getComponentRegister().getMetricManager().init(_configUri); } if (_chain) { diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h index f07bdd37cd4..52709fb1d9b 100644 --- a/storage/src/vespa/storage/storageserver/storagenodecontext.h +++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h @@ -28,12 +28,6 @@ struct StorageNodeContext { */ ComponentRegister& getComponentRegister() { return *_componentRegister; } - /** - * There currently exist threads that doesn't use the component model. - * Let the backend threadpool be accessible for now. - */ - FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } - ~StorageNodeContext(); protected: diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 0c5ec08fb4c..02bb37db59f 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -30,7 +30,6 @@ #include #include #include -#include namespace config { class ConfigUri; diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index 729b675df3a..f6204fed438 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -22,7 +22,6 @@ #include #include #include -#include #include #include diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h index d228dace1ed..1aede4d12e8 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h +++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h @@ -31,7 +31,6 @@ public: virtual ComponentRegisterImpl& getComponentRegister() { return *_compReg; } FakeClock& getClock() { return _clock; } ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; } - FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } }; } diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp index 925c9cda248..314434a4c1a 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -28,11 +28,11 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, _tickDataPtr(0), _interrupted(false), _joined(false), - _thread(*this), + _thread(), _cpu_category(cpu_category) { _tickData[load_relaxed(_tickDataPtr)]._lastTick = pool.getClock().getMonotonicTime(); - _thread.start(_pool.getThreadPool()); + _thread = std::thread([this](){run();}); } ThreadImpl::~ThreadImpl() @@ -70,19 +70,21 @@ void ThreadImpl::interrupt() { _interrupted.store(true, std::memory_order_relaxed); - _thread.stop(); } void ThreadImpl::join() { - _thread.join(); + if (_thread.joinable()) { + _thread.join(); + } } vespalib::string ThreadImpl::get_live_thread_stack_trace() const { - return vespalib::SignalHandler::get_cross_thread_stack_trace(_thread.native_thread_id()); + auto native_handle = const_cast(_thread).native_handle(); + return vespalib::SignalHandler::get_cross_thread_stack_trace(native_handle); } void diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h index d95ba2a37ef..68ed63ea17c 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -15,12 +14,6 @@ struct ThreadPoolImpl; class ThreadImpl final : public Thread { - struct BackendThread : public document::Runnable { - ThreadImpl& _impl; - explicit BackendThread(ThreadImpl& impl) : _impl(impl) {} - void run() override { _impl.run(); } - }; - /** * Internal data race free implementation of tick data that maps to and * from ThreadTickData. We hide the atomicity of this since atomic vars @@ -52,7 +45,7 @@ class ThreadImpl final : public Thread std::atomic _tickDataPtr; std::atomic _interrupted; bool _joined; - BackendThread _thread; + std::thread _thread; std::optional _cpu_category; void run(); diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp index 95959d06b54..068de8f5880 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp @@ -15,8 +15,7 @@ using vespalib::IllegalStateException; namespace storage::framework::defaultimplementation { ThreadPoolImpl::ThreadPoolImpl(Clock& clock) - : _backendThreadPool(std::make_unique()), - _clock(clock), + : _clock(clock), _stopping(false) { } @@ -44,7 +43,6 @@ ThreadPoolImpl::~ThreadPoolImpl() } std::this_thread::sleep_for(10ms); } - _backendThreadPool->Close(); } Thread::UP diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h index b788a3eed78..07b2dd78ed9 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h @@ -15,7 +15,6 @@ class ThreadImpl; struct ThreadPoolImpl final : public ThreadPool { - std::unique_ptr _backendThreadPool; std::vector _threads; mutable std::mutex _threadVectorLock; Clock & _clock; @@ -30,7 +29,6 @@ public: std::optional cpu_category) override; void visitThreads(ThreadVisitor&) const override; void unregisterThread(ThreadImpl&); - FastOS_ThreadPool& getThreadPool() { return *_backendThreadPool; } Clock& getClock() { return _clock; } }; diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 73e8b93a2ff..c8536fc68c1 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -24,7 +24,6 @@ vespa_add_library(vespalib_vespalib_util OBJECT cpu_usage.cpp crc.cpp destructor_callbacks.cpp - document_runnable.cpp doom.cpp dual_merge_director.cpp error.cpp diff --git a/vespalib/src/vespa/vespalib/util/document_runnable.cpp b/vespalib/src/vespa/vespalib/util/document_runnable.cpp deleted file mode 100644 index c0af72dbbb1..00000000000 --- a/vespalib/src/vespa/vespalib/util/document_runnable.cpp +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "document_runnable.h" -#include -#include - -namespace document { - -Runnable::Runnable() - : _stateLock(), - _stateCond(), - _state(NOT_RUNNING) -{ -} - -Runnable::~Runnable() { - std::lock_guard monitorGuard(_stateLock); - assert(getState() == NOT_RUNNING); -} - -bool Runnable::start(FastOS_ThreadPool& pool) -{ - std::unique_lock guard(_stateLock); - _stateCond.wait(guard, [&](){ return (getState() != STOPPING);}); - - if (getState() != NOT_RUNNING) return false; - set_state(STARTING); - if (pool.NewThread(this) == nullptr) { - throw vespalib::IllegalStateException("Failed starting a new thread", VESPA_STRLOC); - } - return true; -} - -void Runnable::set_state(State new_state) noexcept -{ - _state.store(new_state, std::memory_order_relaxed); -} - -bool Runnable::stopping() const noexcept -{ - State s(getState()); - return (s == STOPPING) || (s == RUNNING && GetThread()->GetBreakFlag()); -} - -bool Runnable::running() const noexcept -{ - State s(getState()); - // Must check break-flag too, as threadpool will use that to close - // down. - return (s == STARTING || (s == RUNNING && !GetThread()->GetBreakFlag())); -} - -bool Runnable::stop() -{ - std::lock_guard monitor(_stateLock); - if (getState() == STOPPING || getState() == NOT_RUNNING) return false; - GetThread()->SetBreakFlag(); - set_state(STOPPING); - return onStop(); -} - -bool Runnable::onStop() -{ - return true; -} - -bool Runnable::join() const -{ - std::unique_lock guard(_stateLock); - assert ((getState() != STARTING) && (getState() != RUNNING)); - _stateCond.wait(guard, [&](){ return (getState() == NOT_RUNNING);}); - return true; -} - -FastOS_ThreadId Runnable::native_thread_id() const noexcept -{ - return GetThread()->GetThreadId(); -} - -void Runnable::Run(FastOS_ThreadInterface*, void*) -{ - { - std::lock_guard guard(_stateLock); - // Don't set state if its already at stopping. (And let run() be - // called even though about to stop for consistency) - if (getState() == STARTING) { - set_state(RUNNING); - } - } - - // By not catching exceptions, they should abort whole application. - // We should thus not need to have a catch-all to set state to not - // running. - run(); - - { - std::lock_guard guard(_stateLock); - set_state(NOT_RUNNING); - _stateCond.notify_all(); - } -} - -} diff --git a/vespalib/src/vespa/vespalib/util/document_runnable.h b/vespalib/src/vespa/vespalib/util/document_runnable.h deleted file mode 100644 index 89388bac34c..00000000000 --- a/vespalib/src/vespa/vespalib/util/document_runnable.h +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class document::Runnable - * @ingroup util - * - * @brief Implementation of FastOS_Runnable that implements threadsafe stop. - * - * FastOS_Runnable can easily be used unsafe. If you use the thread pointer for - * anything after your runnable had returned from Run(), it could affect another - * runnable now using that thread. - * - * Using this class should be foolproof to avoid synchronization issues during - * thread starting and stopping :) - * - * @author H�kon Humberset - * @date 2005-09-19 - */ - -#pragma once - -#include -#include - -namespace document { - -class Runnable : private FastOS_Runnable { -public: - enum State { NOT_RUNNING, STARTING, RUNNING, STOPPING }; - -private: - mutable std::mutex _stateLock; - mutable std::condition_variable _stateCond; - std::atomic _state; - - void Run(FastOS_ThreadInterface*, void*) override; - void set_state(State new_state) noexcept; // _stateLock must be held -public: - /** - * Create a runnable. - * @param pool If set, runnable will be started in constructor. - */ - Runnable(); - ~Runnable() override; - - /** - * Start this runnable. - * @param pool The threadpool from which a thread is acquired. - * @return True if thread was started, false if thread was already running. - */ - bool start(FastOS_ThreadPool& pool); - - /** - * Stop this runnable. - * @return True if thread was stopped, false if thread was not running. - */ - bool stop(); - - /** - * Called in stop(). Implement, to for instance notify any monitors that - * can be waiting. - */ - virtual bool onStop(); - - /** - * Wait for this thread to finish, if it is in the process of stopping. - * @return True if thread finished (or not running), false if thread is - * running normally and no stop is scheduled. - */ - bool join() const; - - /** - * Implement this to make the runnable actually do something. - */ - virtual void run() = 0; - - /** - * Get the current state of this runnable. - * Thread safe (but relaxed) read; may be stale if done outside _stateLock. - */ - [[nodiscard]] State getState() const noexcept { - return _state.load(std::memory_order_relaxed); - } - - /** Check if system is in the process of stopping. */ - [[nodiscard]] bool stopping() const noexcept; - - /** - * Checks if runnable is running or not. (Started is considered running) - */ - [[nodiscard]] bool running() const noexcept; - - FastOS_ThreadId native_thread_id() const noexcept; -}; - -} - -- cgit v1.2.3