diff options
author | Håvard Pettersen <[email protected]> | 2023-02-20 11:39:12 +0000 |
---|---|---|
committer | Håvard Pettersen <[email protected]> | 2023-02-20 12:33:29 +0000 |
commit | d9b2cc5dd6aa9210241efeac249be84772377b35 (patch) | |
tree | a78022b7588a4deba71544c79dd6efaa53792f94 /storage | |
parent | a5d5a7dd7bab499554691fa59e08b3771b5e32d3 (diff) |
remove document::Runnable
use std::thread directly instead
Diffstat (limited to 'storage')
17 files changed, 39 insertions, 52 deletions
diff --git a/storage/src/tests/bucketdb/lockablemaptest.cpp b/storage/src/tests/bucketdb/lockablemaptest.cpp index 582e6957c22..3a16ee170fe 100644 --- a/storage/src/tests/bucketdb/lockablemaptest.cpp +++ b/storage/src/tests/bucketdb/lockablemaptest.cpp @@ -1,6 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/vespalib/util/document_runnable.h> #include <vespa/storage/bucketdb/btree_lockable_map.hpp> #include <vespa/storage/bucketdb/striped_btree_lockable_map.hpp> #include <vespa/vespalib/datastore/buffer_type.hpp> diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index 97d1c22364f..78fa32e24e5 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -100,7 +100,7 @@ void MetricsTest::SetUp() { _visitorMetrics = std::make_shared<VisitorMetrics>(); _visitorMetrics->initThreads(4); _topSet->registerMetric(*_visitorMetrics); - _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool()); + _metricManager->init(config::ConfigUri(_config->getConfigId())); } void MetricsTest::TearDown() { diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 50ad7b54382..7f3fe06fc29 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -533,17 +533,18 @@ TEST_F(FileStorManagerTest, handler_priority) { ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } -class MessagePusherThread : public document::Runnable { +class MessagePusherThread { public: FileStorHandler& _handler; Document::SP _doc; std::atomic<bool> _done; std::atomic<bool> _threadDone; - + std::thread _thread; + MessagePusherThread(FileStorHandler& handler, Document::SP doc); - ~MessagePusherThread() override; + ~MessagePusherThread(); - void run() override { + void run() { while (!_done) { document::BucketIdFactory factory; document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId()); @@ -558,11 +559,16 @@ public: }; MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc) - : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false) -{} -MessagePusherThread::~MessagePusherThread() = default; + : _handler(handler), _doc(std::move(doc)), _done(false), _threadDone(false), _thread() +{ + _thread = std::thread([this](){run();}); +} +MessagePusherThread::~MessagePusherThread() +{ + _thread.join(); +} -class MessageFetchingThread : public document::Runnable { +class MessageFetchingThread { public: const uint32_t _threadId; FileStorHandler& _handler; @@ -571,13 +577,17 @@ public: std::atomic<bool> _done; std::atomic<bool> _failed; std::atomic<bool> _threadDone; - + std::thread _thread; + explicit MessageFetchingThread(FileStorHandler& handler) : _threadId(0), _handler(handler), _config(0), _fetchedCount(0), _done(false), - _failed(false), _threadDone(false) - {} - - void run() override { + _failed(false), _threadDone(false), _thread() + { + _thread = std::thread([this](){run();}); + } + ~MessageFetchingThread(); + + void run() { while (!_done) { FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId); if (msg.msg.get()) { @@ -596,6 +606,10 @@ public: _threadDone = true; }; }; +MessageFetchingThread::~MessageFetchingThread() +{ + _thread.join(); +} TEST_F(FileStorManagerTest, handler_paused_multi_thread) { FileStorHandlerComponents c(*this); @@ -606,12 +620,8 @@ TEST_F(FileStorManagerTest, handler_paused_multi_thread) { Document::SP doc(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release()); - FastOS_ThreadPool pool; MessagePusherThread pushthread(filestorHandler, doc); - pushthread.start(pool); - MessageFetchingThread thread(filestorHandler); - thread.start(pool); for (uint32_t i = 0; i < 50; ++i) { std::this_thread::sleep_for(2ms); diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp index b7903de0fe2..1fb5a9730c4 100644 --- a/storage/src/tests/storageserver/statereportertest.cpp +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -97,7 +97,7 @@ void StateReporterTest::SetUp() { _filestorMetrics->initDiskMetrics(1, 1); _topSet->registerMetric(*_filestorMetrics); - _metricManager->init(config::ConfigUri(_config->getConfigId()), _node->getThreadPool()); + _metricManager->init(config::ConfigUri(_config->getConfigId())); } void StateReporterTest::TearDown() { diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h index 74434c0116b..17a344a368a 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.h +++ b/storage/src/vespa/storage/common/storagelinkqueued.h @@ -16,7 +16,6 @@ #include "storagelink.h" #include <vespa/storageframework/generic/thread/runnable.h> -#include <vespa/vespalib/util/document_runnable.h> #include <deque> #include <limits> #include <mutex> diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 08a48cc2d8a..99f61c62cd1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -10,7 +10,6 @@ #include "filestorhandler.h" #include "service_layer_host_info_reporter.h" -#include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/document/bucket/bucketid.h> #include <vespa/persistence/spi/bucketexecutor.h> diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 593640e7d03..156ec8bc031 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -23,7 +23,6 @@ #include <vespa/messagebus/imessagehandler.h> #include <vespa/messagebus/ireplyhandler.h> #include <vespa/config/helper/ifetchercallback.h> -#include <vespa/vespalib/util/document_runnable.h> #include <vespa/config/subscription/configuri.h> #include <vespa/config-bucketspaces.h> #include <map> diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 0adfb209e66..dddcd42aad7 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -15,7 +15,6 @@ #include <vespa/storageframework/generic/thread/runnable.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/document/bucket/bucket.h> -#include <vespa/vespalib/util/document_runnable.h> #include <vespa/metrics/metricset.h> #include <vespa/metrics/summetric.h> #include <vespa/metrics/countmetric.h> diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 2836ab80acf..a09abb25f7a 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -201,7 +201,7 @@ StorageNode::initialize() // have been created, such that we don't need to pay the extra cost of // reinitializing metric manager often. if ( ! _context.getComponentRegister().getMetricManager().isInitialized() ) { - _context.getComponentRegister().getMetricManager().init(_configUri, _context.getThreadPool()); + _context.getComponentRegister().getMetricManager().init(_configUri); } if (_chain) { diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h index f07bdd37cd4..52709fb1d9b 100644 --- a/storage/src/vespa/storage/storageserver/storagenodecontext.h +++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h @@ -28,12 +28,6 @@ struct StorageNodeContext { */ ComponentRegister& getComponentRegister() { return *_componentRegister; } - /** - * There currently exist threads that doesn't use the component model. - * Let the backend threadpool be accessible for now. - */ - FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } - ~StorageNodeContext(); protected: diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 0c5ec08fb4c..02bb37db59f 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -30,7 +30,6 @@ #include <vespa/storageapi/message/internal.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/config/helper/ifetchercallback.h> -#include <vespa/vespalib/util/document_runnable.h> namespace config { class ConfigUri; diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index 729b675df3a..f6204fed438 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -22,7 +22,6 @@ #include <vespa/storageframework/generic/thread/runnable.h> #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/metrics/metrictimer.h> -#include <vespa/vespalib/util/document_runnable.h> #include <atomic> #include <deque> diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h index d228dace1ed..1aede4d12e8 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h +++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h @@ -31,7 +31,6 @@ public: virtual ComponentRegisterImpl& getComponentRegister() { return *_compReg; } FakeClock& getClock() { return _clock; } ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; } - FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } }; } diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp index 925c9cda248..314434a4c1a 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -28,11 +28,11 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, _tickDataPtr(0), _interrupted(false), _joined(false), - _thread(*this), + _thread(), _cpu_category(cpu_category) { _tickData[load_relaxed(_tickDataPtr)]._lastTick = pool.getClock().getMonotonicTime(); - _thread.start(_pool.getThreadPool()); + _thread = std::thread([this](){run();}); } ThreadImpl::~ThreadImpl() @@ -70,19 +70,21 @@ void ThreadImpl::interrupt() { _interrupted.store(true, std::memory_order_relaxed); - _thread.stop(); } void ThreadImpl::join() { - _thread.join(); + if (_thread.joinable()) { + _thread.join(); + } } vespalib::string ThreadImpl::get_live_thread_stack_trace() const { - return vespalib::SignalHandler::get_cross_thread_stack_trace(_thread.native_thread_id()); + auto native_handle = const_cast<std::thread&>(_thread).native_handle(); + return vespalib::SignalHandler::get_cross_thread_stack_trace(native_handle); } void diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h index d95ba2a37ef..68ed63ea17c 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -4,7 +4,6 @@ #include <vespa/storageframework/generic/thread/thread.h> #include <vespa/vespalib/util/cpu_usage.h> -#include <vespa/vespalib/util/document_runnable.h> #include <array> #include <atomic> #include <optional> @@ -15,12 +14,6 @@ struct ThreadPoolImpl; class ThreadImpl final : public Thread { - struct BackendThread : public document::Runnable { - ThreadImpl& _impl; - explicit BackendThread(ThreadImpl& impl) : _impl(impl) {} - void run() override { _impl.run(); } - }; - /** * Internal data race free implementation of tick data that maps to and * from ThreadTickData. We hide the atomicity of this since atomic vars @@ -52,7 +45,7 @@ class ThreadImpl final : public Thread std::atomic<uint32_t> _tickDataPtr; std::atomic<bool> _interrupted; bool _joined; - BackendThread _thread; + std::thread _thread; std::optional<vespalib::CpuUsage::Category> _cpu_category; void run(); diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp index 95959d06b54..068de8f5880 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp @@ -15,8 +15,7 @@ using vespalib::IllegalStateException; namespace storage::framework::defaultimplementation { ThreadPoolImpl::ThreadPoolImpl(Clock& clock) - : _backendThreadPool(std::make_unique<FastOS_ThreadPool>()), - _clock(clock), + : _clock(clock), _stopping(false) { } @@ -44,7 +43,6 @@ ThreadPoolImpl::~ThreadPoolImpl() } std::this_thread::sleep_for(10ms); } - _backendThreadPool->Close(); } Thread::UP diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h index b788a3eed78..07b2dd78ed9 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h @@ -15,7 +15,6 @@ class ThreadImpl; struct ThreadPoolImpl final : public ThreadPool { - std::unique_ptr<FastOS_ThreadPool> _backendThreadPool; std::vector<ThreadImpl*> _threads; mutable std::mutex _threadVectorLock; Clock & _clock; @@ -30,7 +29,6 @@ public: std::optional<vespalib::CpuUsage::Category> cpu_category) override; void visitThreads(ThreadVisitor&) const override; void unregisterThread(ThreadImpl&); - FastOS_ThreadPool& getThreadPool() { return *_backendThreadPool; } Clock& getClock() { return _clock; } }; |