From d030fc60e38ff893ec01267211ab00890bcd2159 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 13 Oct 2020 11:01:30 +0000 Subject: - Use std::mutex/std::condition_varaible over vespalib::Monitor. - use vespa::duration over storage::framework::XXXTime. --- .../src/tests/thread/tickingthreadtest.cpp | 34 +++------ .../component/componentregisterimpl.cpp | 6 -- .../component/componentregisterimpl.h | 4 +- .../defaultimplementation/thread/threadimpl.cpp | 45 ++++++----- .../defaultimplementation/thread/threadimpl.h | 25 ++++--- .../thread/threadpoolimpl.cpp | 11 ++- .../defaultimplementation/thread/threadpoolimpl.h | 4 +- .../vespa/storageframework/generic/clock/time.h | 2 +- .../generic/component/component.cpp | 42 ++++------- .../storageframework/generic/component/component.h | 11 +-- .../generic/metric/metricregistrator.h | 2 - .../storageframework/generic/thread/runnable.h | 6 +- .../vespa/storageframework/generic/thread/thread.h | 4 +- .../storageframework/generic/thread/threadpool.cpp | 24 +++--- .../storageframework/generic/thread/threadpool.h | 34 ++++----- .../generic/thread/tickingthread.cpp | 86 +++++++++++----------- .../generic/thread/tickingthread.h | 8 +- 17 files changed, 157 insertions(+), 191 deletions(-) (limited to 'storageframework') diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp index 8e3d7b29cd6..b877f08474d 100644 --- a/storageframework/src/tests/thread/tickingthreadtest.cpp +++ b/storageframework/src/tests/thread/tickingthreadtest.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include namespace storage::framework::defaultimplementation { @@ -28,7 +27,7 @@ struct MyApp : public TickingThread { TickingThreadPool::UP _threadPool; MyApp(int threadCount, bool doCritOverlapTest = false); - ~MyApp(); + ~MyApp() override; void start(ThreadPool& p) { _threadPool->start(p); } @@ -95,7 +94,7 @@ MyApp::MyApp(int threadCount, bool doCritOverlapTest) } } -MyApp::~MyApp() { } +MyApp::~MyApp() = default; } @@ -120,17 +119,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_basic) TEST(TickingThreadTest, test_ticks_before_wait_live_update) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg = std::make_unique(); int threadCount = 1; MyApp app(threadCount); // Configure thread pool to send bulks of 5000 ticks each second. long unsigned int ticksBeforeWaitMs = 5000; - MilliSecTime waitTimeMs(1000); - MilliSecTime maxProcessingTime(234234); app.start(testReg.getThreadPoolImpl()); app._threadPool->updateParametersAllThreads( - waitTimeMs, maxProcessingTime, ticksBeforeWaitMs); + 1s, 234234ms, ticksBeforeWaitMs); // Check that 5000 ticks are received instantly (usually <2 ms) // (if live update is broken it will take more than an hour). @@ -146,16 +142,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_live_update) TEST(TickingThreadTest, test_destroy_without_starting) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique()); int threadCount = 5; MyApp app(threadCount, true); } TEST(TickingThreadTest, test_verbose_stopping) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique()); int threadCount = 5; MyApp app(threadCount, true); app.start(testReg.getThreadPoolImpl()); @@ -167,8 +161,7 @@ TEST(TickingThreadTest, test_verbose_stopping) TEST(TickingThreadTest, test_stop_on_deletion) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique()); int threadCount = 5; MyApp app(threadCount, true); app.start(testReg.getThreadPoolImpl()); @@ -179,8 +172,7 @@ TEST(TickingThreadTest, test_stop_on_deletion) TEST(TickingThreadTest, test_lock_all_ticks) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique()); int threadCount = 5; MyApp app1(threadCount); MyApp app2(threadCount); @@ -207,8 +199,7 @@ TEST(TickingThreadTest, test_lock_all_ticks) TEST(TickingThreadTest, test_lock_critical_ticks) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique()); int threadCount = 5; uint64_t iterationsBeforeOverlap = 0; { @@ -290,18 +281,17 @@ struct BroadcastApp : public TickingThread { }; BroadcastApp::BroadcastApp() - : _threadPool(TickingThreadPool::createDefault("testApp", MilliSecTime(300000))) + : _threadPool(TickingThreadPool::createDefault("testApp", 300s)) { _threadPool->addThread(*this); } -BroadcastApp::~BroadcastApp() {} +BroadcastApp::~BroadcastApp() = default; } TEST(TickingThreadTest, test_broadcast) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique()); BroadcastApp app; app.start(testReg.getThreadPoolImpl()); app.doTask("foo"); diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp index e4560370a01..e0f441089ff 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp @@ -159,12 +159,6 @@ ComponentRegisterImpl::registerUpdateHook(vespalib::stringref name, _hooks.emplace_back(std::move(hookPtr)); } -metrics::MetricLockGuard -ComponentRegisterImpl::getMetricManagerLock() -{ - return _metricManager->getMetricLock(); -} - void ComponentRegisterImpl::registerShutdownListener(ShutdownListener& listener) { diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h index 3adef5f4838..c7b74ec631a 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h @@ -22,6 +22,7 @@ #include #include #include +#include namespace metrics { @@ -57,7 +58,7 @@ public: typedef std::unique_ptr UP; ComponentRegisterImpl(); - ~ComponentRegisterImpl(); + ~ComponentRegisterImpl() override; bool hasMetricManager() const { return (_metricManager != 0); } metrics::MetricManager& getMetricManager() { return *_metricManager; } @@ -75,7 +76,6 @@ public: void registerMetric(metrics::Metric&) override; void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) override; - vespalib::MonitorGuard getMetricManagerLock() override; void registerShutdownListener(ShutdownListener&); }; diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp index 4b9b2639d3b..ba3d52fb8f5 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -12,8 +12,8 @@ namespace storage::framework::defaultimplementation { ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, Runnable& runnable, vespalib::stringref id, - uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, + vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait) : Thread(id), _pool(pool), @@ -25,7 +25,7 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, _joined(false), _thread(*this) { - _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getTimeInMillis().getTime(); + _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getMonotonicTime(); _thread.start(_pool.getThreadPool()); } @@ -69,30 +69,29 @@ ThreadImpl::join() } void -ThreadImpl::registerTick(CycleType cycleType, MilliSecTime time) +ThreadImpl::registerTick(CycleType cycleType, vespalib::steady_time now) { - if (!time.isSet()) time = _pool.getClock().getTimeInMillis(); + if (now.time_since_epoch() == vespalib::duration::zero()) now = _pool.getClock().getMonotonicTime(); ThreadTickData data(getTickData()); - uint64_t previousTickMs = data._lastTickMs; - uint64_t nowMs = time.getTime(); - data._lastTickMs = nowMs; + vespalib::steady_clock::time_point previousTick = data._lastTick; + data._lastTick = now; data._lastTickType = cycleType; setTickData(data); - if (data._lastTickMs == 0) { return; } + if (data._lastTick.time_since_epoch() == vespalib::duration::zero()) { return; } - if (previousTickMs > nowMs) { + if (previousTick > now) { LOGBP(warning, "Thread is registering tick at time %" PRIu64 ", but " "last time it registered a tick, the time was %" PRIu64 ". Assuming clock has been adjusted backwards", - nowMs, previousTickMs); + vespalib::count_ms(now.time_since_epoch()), vespalib::count_ms(previousTick.time_since_epoch())); return; } - uint64_t cycleTimeMs = nowMs - previousTickMs; + vespalib::duration cycleTimeMs = now - previousTick; if (cycleType == WAIT_CYCLE) { - data._maxWaitTimeSeenMs = std::max(data._maxWaitTimeSeenMs, cycleTimeMs); + data._maxWaitTimeSeen = std::max(data._maxWaitTimeSeen, cycleTimeMs); } else { - data._maxProcessingTimeSeenMs = std::max(data._maxProcessingTimeSeenMs, cycleTimeMs); + data._maxProcessingTimeSeen = std::max(data._maxProcessingTimeSeen, cycleTimeMs); } } @@ -111,9 +110,9 @@ ThreadImpl::setTickData(const ThreadTickData& tickData) } void -ThreadImpl::updateParameters(uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, - int ticksBeforeWait) { +ThreadImpl::updateParameters(vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, + int ticksBeforeWait) { _properties.setWaitTime(waitTimeMs); _properties.setMaxProcessTime(maxProcessTimeMs); _properties.setTicksBeforeWait(ticksBeforeWait); @@ -125,9 +124,9 @@ ThreadImpl::AtomicThreadTickData::loadRelaxed() const noexcept ThreadTickData result; constexpr auto relaxed = std::memory_order_relaxed; result._lastTickType = _lastTickType.load(relaxed); - result._lastTickMs = _lastTickMs.load(relaxed); - result._maxProcessingTimeSeenMs = _maxProcessingTimeSeenMs.load(relaxed); - result._maxWaitTimeSeenMs = _maxWaitTimeSeenMs.load(relaxed); + result._lastTick = _lastTickMs.load(relaxed); + result._maxProcessingTimeSeen = _maxProcessingTimeSeenMs.load(relaxed); + result._maxWaitTimeSeen = _maxWaitTimeSeenMs.load(relaxed); return result; } @@ -137,9 +136,9 @@ ThreadImpl::AtomicThreadTickData::storeRelaxed( { constexpr auto relaxed = std::memory_order_relaxed; _lastTickType.store(newState._lastTickType, relaxed); - _lastTickMs.store(newState._lastTickMs, relaxed); - _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeenMs, relaxed); - _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeenMs, relaxed); + _lastTickMs.store(newState._lastTick, relaxed); + _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeen, relaxed); + _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeen, relaxed); } } diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h index 17967c66bdf..077e508a609 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -26,10 +26,16 @@ class ThreadImpl : public Thread * on code using it. */ struct AtomicThreadTickData { + AtomicThreadTickData() noexcept + : _lastTickType(), + _lastTickMs(vespalib::steady_time(vespalib::duration::zero())), + _maxProcessingTimeSeenMs(), + _maxWaitTimeSeenMs() + {} std::atomic _lastTickType; - std::atomic _lastTickMs; - std::atomic _maxProcessingTimeSeenMs; - std::atomic _maxWaitTimeSeenMs; + std::atomic _lastTickMs; + std::atomic _maxProcessingTimeSeenMs; + std::atomic _maxWaitTimeSeenMs; // struct stores and loads are both data race free with relaxed // memory semantics. This means it's possible to observe stale/partial // state in a case with concurrent readers/writers. @@ -49,26 +55,23 @@ class ThreadImpl : public Thread void run(); public: - ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, int ticksBeforeWait); + ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait); ~ThreadImpl(); bool interrupted() const override; bool joined() const override; void interrupt() override; void join() override; - void registerTick(CycleType, MilliSecTime) override; - uint64_t getWaitTime() const override { + void registerTick(CycleType, vespalib::steady_time) override; + vespalib::duration getWaitTime() const override { return _properties.getWaitTime(); } int getTicksBeforeWait() const override { return _properties.getTicksBeforeWait(); } - uint64_t getMaxProcessTime() const override { - return _properties.getMaxProcessTime(); - } - void updateParameters(uint64_t waitTime, uint64_t maxProcessTime, int ticksBeforeWait) override; + void updateParameters(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) override; void setTickData(const ThreadTickData&); ThreadTickData getTickData() const; diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp index e4a1188030c..e5698f0cbc2 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp @@ -46,17 +46,16 @@ ThreadPoolImpl::~ThreadPoolImpl() } Thread::UP -ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, uint64_t waitTimeMs, - uint64_t maxProcessTime, int ticksBeforeWait) +ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, vespalib::duration waitTimeMs, + vespalib::duration maxProcessTime, int ticksBeforeWait) { std::lock_guard lock(_threadVectorLock); if (_stopping) { throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC); } - ThreadImpl* ti; - Thread::UP t(ti = new ThreadImpl(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait)); - _threads.push_back(ti); - return t; + auto thread = std::make_unique(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait); + _threads.push_back(thread.get()); + return thread; } void diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h index 5f0ee523eff..5a025ac3ad3 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h @@ -21,8 +21,8 @@ public: ThreadPoolImpl(Clock&); ~ThreadPoolImpl() override; - Thread::UP startThread(Runnable&, vespalib::stringref id, uint64_t waitTimeMs, - uint64_t maxProcessTime, int ticksBeforeWait) override; + Thread::UP startThread(Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs, + vespalib::duration maxProcessTime, int ticksBeforeWait) override; void visitThreads(ThreadVisitor&) const override; void registerThread(ThreadImpl&); diff --git a/storageframework/src/vespa/storageframework/generic/clock/time.h b/storageframework/src/vespa/storageframework/generic/clock/time.h index 4c578f9b33d..811c6cfceb0 100644 --- a/storageframework/src/vespa/storageframework/generic/clock/time.h +++ b/storageframework/src/vespa/storageframework/generic/clock/time.h @@ -3,7 +3,7 @@ #include #include -#include +#include namespace vespalib { class asciistream; diff --git a/storageframework/src/vespa/storageframework/generic/component/component.cpp b/storageframework/src/vespa/storageframework/generic/component/component.cpp index 3818a2865d9..44468bfead3 100644 --- a/storageframework/src/vespa/storageframework/generic/component/component.cpp +++ b/storageframework/src/vespa/storageframework/generic/component/component.cpp @@ -23,12 +23,12 @@ Component::close() Component::Component(ComponentRegister& cr, vespalib::stringref name) : _componentRegister(&cr), _name(name), - _status(0), - _metric(0), - _threadPool(0), - _metricReg(0), - _clock(0), - _listener(0) + _status(nullptr), + _metric(nullptr), + _threadPool(nullptr), + _metricReg(nullptr), + _clock(nullptr), + _listener(nullptr) { cr.registerComponent(*this); } @@ -38,23 +38,23 @@ Component::~Component() = default; void Component::registerComponentStateListener(ComponentStateListener& l) { - assert(_listener == 0); + assert(_listener == nullptr); _listener = &l; } void Component::registerStatusPage(const StatusReporter& sr) { - assert(_status == 0); + assert(_status == nullptr); _status = &sr; } void Component::registerMetric(metrics::Metric& m) { - assert(_metric == 0); + assert(_metric == nullptr); _metric = &m; - if (_metricReg != 0) { + if (_metricReg != nullptr) { _metricReg->registerMetric(m); } } @@ -64,28 +64,18 @@ Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period) { assert(_metricUpdateHook.first == 0); _metricUpdateHook = std::make_pair(&hook, period); - if (_metricReg != 0) { + if (_metricReg != nullptr) { _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second); } } -vespalib::MonitorGuard -Component::getMetricManagerLock() -{ - if (_metricReg != 0) { - return _metricReg->getMetricManagerLock(); - } else { - return vespalib::MonitorGuard(); - } -} - void Component::setMetricRegistrator(MetricRegistrator& mr) { _metricReg = &mr; if (_metricUpdateHook.first != 0) { _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second); } - if (_metric != 0) { + if (_metric != nullptr) { _metricReg->registerMetric(*_metric); } } @@ -93,16 +83,16 @@ Component::setMetricRegistrator(MetricRegistrator& mr) { ThreadPool& Component::getThreadPool() const { - assert(_threadPool != 0); + assert(_threadPool != nullptr); return *_threadPool; } // Helper functions for components wanting to start a single thread. Thread::UP -Component::startThread(Runnable& runnable, MilliSecTime waitTime, MilliSecTime maxProcessTime, int ticksBeforeWait) +Component::startThread(Runnable& runnable, vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) { - return getThreadPool().startThread(runnable, getName(), waitTime.getTime(), - maxProcessTime.getTime(), ticksBeforeWait); + return getThreadPool().startThread(runnable, getName(), waitTime, + maxProcessTime, ticksBeforeWait); } void diff --git a/storageframework/src/vespa/storageframework/generic/component/component.h b/storageframework/src/vespa/storageframework/generic/component/component.h index c91d7feb532..bfc80f524b5 100644 --- a/storageframework/src/vespa/storageframework/generic/component/component.h +++ b/storageframework/src/vespa/storageframework/generic/component/component.h @@ -71,7 +71,6 @@ #include #include #include -#include #include namespace storage::framework { @@ -153,12 +152,6 @@ public: */ void registerMetricUpdateHook(MetricUpdateHook&, SecondTime period); - /** - * If you need to modify the metric sets that have been registered, you need - * to hold the metric manager lock while you do it. - */ - vespalib::MonitorGuard getMetricManagerLock(); - /** Get the name of the component. Must be a unique name. */ const vespalib::string& getName() const override { return _name; } @@ -185,8 +178,8 @@ public: * in this thread. (Thus one is not required to call registerTick()) */ Thread::UP startThread(Runnable&, - MilliSecTime maxProcessTime = MilliSecTime(0), - MilliSecTime waitTime = MilliSecTime(0), + vespalib::duration maxProcessTime = vespalib::duration::zero(), + vespalib::duration waitTime = vespalib::duration::zero(), int ticksBeforeWait = 1); // Check upgrade flag settings. Note that this flag may change at any time. diff --git a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h index 6cdbf58d555..53988651cbc 100644 --- a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h +++ b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h @@ -11,7 +11,6 @@ #pragma once #include -#include namespace metrics { class Metric; @@ -26,7 +25,6 @@ struct MetricRegistrator { virtual void registerMetric(metrics::Metric&) = 0; virtual void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) = 0; - virtual vespalib::MonitorGuard getMetricManagerLock() = 0; }; } diff --git a/storageframework/src/vespa/storageframework/generic/thread/runnable.h b/storageframework/src/vespa/storageframework/generic/thread/runnable.h index 245522a7421..a2615694e7c 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/runnable.h +++ b/storageframework/src/vespa/storageframework/generic/thread/runnable.h @@ -43,11 +43,9 @@ struct ThreadHandle { * clock fetches if client already knows current time) */ virtual void registerTick(CycleType = UNKNOWN_CYCLE, - MilliSecTime currentTime = MilliSecTime(0)) = 0; + vespalib::steady_time = vespalib::steady_time()) = 0; - virtual uint64_t getWaitTime() const = 0; - - virtual uint64_t getMaxProcessTime() const = 0; + virtual vespalib::duration getWaitTime() const = 0; /** * The number of ticks done before wait is called when no more work is diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h index 814686eb4fa..e37bba4c723 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h @@ -50,8 +50,8 @@ public: */ virtual void join() = 0; - virtual void updateParameters(uint64_t waitTime, - uint64_t maxProcessTime, + virtual void updateParameters(vespalib::duration waitTime, + vespalib::duration maxProcessTime, int ticksBeforeWait) = 0; /** diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp index d72b0c54544..3c2ebba42f7 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp @@ -2,23 +2,22 @@ #include "threadpool.h" -namespace storage { -namespace framework { +namespace storage::framework { -ThreadProperties::ThreadProperties(uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, +ThreadProperties::ThreadProperties(vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait) { - _waitTimeMs.store(waitTimeMs); - _maxProcessTimeMs.store(maxProcessTimeMs); - _ticksBeforeWait.store(ticksBeforeWait); + setWaitTime(waitTimeMs); + setMaxProcessTime(maxProcessTimeMs); + setTicksBeforeWait(ticksBeforeWait); } -uint64_t ThreadProperties::getMaxProcessTime() const { + vespalib::duration ThreadProperties::getMaxProcessTime() const { return _maxProcessTimeMs.load(std::memory_order_relaxed); } -uint64_t ThreadProperties::getWaitTime() const { +vespalib::duration ThreadProperties::getWaitTime() const { return _waitTimeMs.load(std::memory_order_relaxed); } @@ -26,11 +25,11 @@ int ThreadProperties::getTicksBeforeWait() const { return _ticksBeforeWait.load(std::memory_order_relaxed); } -void ThreadProperties::setMaxProcessTime(uint64_t maxProcessingTimeMs) { +void ThreadProperties::setMaxProcessTime(vespalib::duration maxProcessingTimeMs) { _maxProcessTimeMs.store(maxProcessingTimeMs); } -void ThreadProperties::setWaitTime(uint64_t waitTimeMs) { +void ThreadProperties::setWaitTime(vespalib::duration waitTimeMs) { _waitTimeMs.store(waitTimeMs); } @@ -38,5 +37,4 @@ void ThreadProperties::setTicksBeforeWait(int ticksBeforeWait) { _ticksBeforeWait.store(ticksBeforeWait); } -} // framework -} // storage +} diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h index 8ebe382bddf..0ee800c43ff 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h @@ -27,12 +27,12 @@ namespace storage::framework { * thread. */ class ThreadProperties { - private: +private: /** * Time this thread should maximum use to process before a tick is * registered. (Including wait time if wait time is not set) */ - std::atomic_uint_least64_t _maxProcessTimeMs; + std::atomic _maxProcessTimeMs; /** * Time this thread will wait in a non-interrupted wait cycle. * Used in cases where a wait cycle is registered. As long as no other @@ -40,26 +40,26 @@ class ThreadProperties { * wait time here. The deadlock detector should add a configurable * global time period before flagging deadlock anyways. */ - std::atomic_uint_least64_t _waitTimeMs; + std::atomic _waitTimeMs; /** * Number of ticks to be done before a wait. */ std::atomic_uint _ticksBeforeWait; public: - ThreadProperties(uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, + ThreadProperties(vespalib::duration waitTime, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait); - void setMaxProcessTime(uint64_t); - void setWaitTime(uint64_t); + void setMaxProcessTime(vespalib::duration); + void setWaitTime(vespalib::duration); void setTicksBeforeWait(int); - uint64_t getMaxProcessTime() const; - uint64_t getWaitTime() const; + vespalib::duration getMaxProcessTime() const; + vespalib::duration getWaitTime() const; int getTicksBeforeWait() const; - uint64_t getMaxCycleTime() const { + vespalib::duration getMaxCycleTime() const { return std::max(_maxProcessTimeMs.load(std::memory_order_relaxed), _waitTimeMs.load(std::memory_order_relaxed)); } @@ -68,26 +68,26 @@ class ThreadProperties { /** Data kept on each thread due to the registerTick functinality. */ struct ThreadTickData { CycleType _lastTickType; - uint64_t _lastTickMs; - uint64_t _maxProcessingTimeSeenMs; - uint64_t _maxWaitTimeSeenMs; + vespalib::steady_time _lastTick; + vespalib::duration _maxProcessingTimeSeen; + vespalib::duration _maxWaitTimeSeen; }; /** Interface used to access data for the existing threads. */ struct ThreadVisitor { - virtual ~ThreadVisitor() {} + virtual ~ThreadVisitor() = default; virtual void visitThread(const vespalib::string& id, const ThreadProperties&, const ThreadTickData&) = 0; }; struct ThreadPool { - virtual ~ThreadPool() {} + virtual ~ThreadPool() = default; virtual Thread::UP startThread(Runnable&, vespalib::stringref id, - uint64_t waitTimeMs, - uint64_t maxProcessTime, + vespalib::duration waitTimeMs, + vespalib::duration maxProcessTime, int ticksBeforeWait) = 0; virtual void visitThreads(ThreadVisitor&) const = 0; diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp index 3178220654e..7913cdcfe84 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp @@ -22,20 +22,22 @@ ThreadWaitInfo::merge(const ThreadWaitInfo& other) { * global synchronization point where no thread is currently running. */ class TickingThreadRunner final : public Runnable { - vespalib::Monitor& _monitor; - TickingThread& _tickingThread; - uint32_t _threadIndex; - bool _wantToFreeze; - bool _frozen; - char _state; + std::mutex & _monitor; + std::condition_variable & _cond; + TickingThread & _tickingThread; + uint32_t _threadIndex; + bool _wantToFreeze; + bool _frozen; + char _state; public: typedef std::shared_ptr SP; - TickingThreadRunner(vespalib::Monitor& m, + TickingThreadRunner(std::mutex& m, + std::condition_variable & cond, TickingThread& ticker, uint32_t threadIndex) noexcept - : _monitor(m), _tickingThread(ticker), + : _monitor(m), _cond(cond), _tickingThread(ticker), _threadIndex(threadIndex), _wantToFreeze(false), _frozen(false) {} /** @@ -43,10 +45,10 @@ public: * tick and has frozen. */ void freeze() { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_monitor); _wantToFreeze = true; while (!_frozen) { - guard.wait(); + _cond.wait(guard); } } @@ -54,9 +56,11 @@ public: * Call to thaw up a frozen thread so it can continue. */ void thaw() { - vespalib::MonitorGuard guard(_monitor); - _wantToFreeze = false; - guard.broadcast(); + { + std::lock_guard guard(_monitor); + _wantToFreeze = false; + } + _cond.notify_all(); } char getState() const { return _state; } @@ -68,18 +72,18 @@ private: int ticksExecutedAfterWait = 0; while (!handle.interrupted()) { { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_monitor); if (info.waitWanted()) { _state = 'w'; cycle = WAIT_CYCLE; if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) { - guard.wait(handle.getWaitTime()); + _cond.wait_for(guard, handle.getWaitTime()); ticksExecutedAfterWait = 0; } } if (_wantToFreeze) { _state = 'f'; - doFreeze(guard); + doFreeze(guard, _cond); ticksExecutedAfterWait = 0; } _state = 'c'; @@ -93,22 +97,23 @@ private: } _state = 's'; } - void doFreeze(vespalib::MonitorGuard& guard) { + void doFreeze(std::unique_lock & guard, std::condition_variable & cond) { _frozen = true; - guard.broadcast(); + cond.notify_all(); while (_wantToFreeze) { - guard.wait(); + _cond.wait(guard); } _frozen = false; } }; class TickingThreadPoolImpl final : public TickingThreadPool { - vespalib::string _name; - vespalib::Monitor _monitor; - std::atomic_uint_least64_t _waitTime; - std::atomic_uint _ticksBeforeWait; - std::atomic_uint_least64_t _maxProcessTime; + vespalib::string _name; + std::mutex _lock; + std::condition_variable _cond; + std::atomic _waitTime; + std::atomic_uint _ticksBeforeWait; + std::atomic _maxProcessTime; std::vector _tickers; std::vector> _threads; @@ -120,40 +125,40 @@ class TickingThreadPoolImpl final : public TickingThreadPool { void broadcast() override {} }; struct CriticalGuard final : public TickingLockGuard::Impl { - vespalib::MonitorGuard _guard; + std::condition_variable &_cond; - explicit CriticalGuard(vespalib::Monitor& m) : _guard(m) {} + explicit CriticalGuard(std::condition_variable & cond) : _cond(cond) {} - void broadcast() override { _guard.broadcast(); } + void broadcast() override { _cond.notify_all(); } }; public: - TickingThreadPoolImpl(vespalib::stringref name, MilliSecTime waitTime, - int ticksBeforeWait, MilliSecTime maxProcessTime) + TickingThreadPoolImpl(vespalib::stringref name, vespalib::duration waitTime, + int ticksBeforeWait, vespalib::duration maxProcessTime) : _name(name), - _waitTime(waitTime.getTime()), + _waitTime(waitTime), _ticksBeforeWait(ticksBeforeWait), - _maxProcessTime(maxProcessTime.getTime()) {} + _maxProcessTime(maxProcessTime) {} ~TickingThreadPoolImpl() override { stop(); } - void updateParametersAllThreads(MilliSecTime waitTime, MilliSecTime maxProcessTime, + void updateParametersAllThreads(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) override { - _waitTime.store(waitTime.getTime()); - _maxProcessTime.store(maxProcessTime.getTime()); + _waitTime.store(waitTime); + _maxProcessTime.store(maxProcessTime); _ticksBeforeWait.store(ticksBeforeWait); // TODO: Add locking so threads not deleted while updating for (uint32_t i=0; i<_threads.size(); ++i) { - _threads[i]->updateParameters(waitTime.getTime(), maxProcessTime.getTime(), ticksBeforeWait); + _threads[i]->updateParameters(waitTime, maxProcessTime, ticksBeforeWait); } } void addThread(TickingThread& ticker) override { ThreadIndex index = _tickers.size(); ticker.newThreadCreated(index); - _tickers.emplace_back(std::make_shared(_monitor, ticker, index)); + _tickers.emplace_back(std::make_shared(_lock, _cond, ticker, index)); } void start(ThreadPool& pool) override { @@ -175,7 +180,7 @@ public: } TickingLockGuard freezeCriticalTicks() override { - return TickingLockGuard(std::make_unique(_monitor)); + return TickingLockGuard(std::make_unique(_cond)); } void stop() override { @@ -183,8 +188,7 @@ public: t->interrupt(); } { - vespalib::MonitorGuard guard(_monitor); - guard.broadcast(); + _cond.notify_all(); } for (auto& t : _threads) { t->join(); @@ -216,9 +220,9 @@ private: TickingThreadPool::UP TickingThreadPool::createDefault( vespalib::stringref name, - MilliSecTime waitTime, + vespalib::duration waitTime, int ticksBeforeWait, - MilliSecTime maxProcessTime) + vespalib::duration maxProcessTime) { return std::make_unique(name, waitTime, ticksBeforeWait, maxProcessTime); } diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h index 1ed02d0f6fd..0649d914c75 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h @@ -80,13 +80,13 @@ struct TickingThreadPool : public ThreadLock { static TickingThreadPool::UP createDefault( vespalib::stringref name, - MilliSecTime waitTime = MilliSecTime(5), + vespalib::duration waitTime = 5ms, int ticksBeforeWait = 1, - MilliSecTime maxProcessTime = SecondTime(5).getMillis()); + vespalib::duration maxProcessTime = 5s); virtual void updateParametersAllThreads( - MilliSecTime waitTime, - MilliSecTime maxProcessTime, + vespalib::duration waitTime, + vespalib::duration maxProcessTime, int ticksBeforeWait) = 0; ~TickingThreadPool() override = default; -- cgit v1.2.3