aboutsummaryrefslogtreecommitdiffstats
path: root/storageframework
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-13 11:01:30 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-13 11:02:53 +0000
commitd030fc60e38ff893ec01267211ab00890bcd2159 (patch)
treed76ecf42b873ab86312468b4dff77beab979ab87 /storageframework
parent6f31b8fe84694406d4e42ca0ca9f1b62ab5c8ee6 (diff)
- Use std::mutex/std::condition_varaible over vespalib::Monitor.
- use vespa::duration over storage::framework::XXXTime.
Diffstat (limited to 'storageframework')
-rw-r--r--storageframework/src/tests/thread/tickingthreadtest.cpp34
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp6
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h4
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp45
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h25
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp11
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h4
-rw-r--r--storageframework/src/vespa/storageframework/generic/clock/time.h2
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.cpp42
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.h11
-rw-r--r--storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h2
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/runnable.h6
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/thread.h4
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp24
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/threadpool.h34
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp86
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/tickingthread.h8
17 files changed, 157 insertions, 191 deletions
diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp
index 8e3d7b29cd6..b877f08474d 100644
--- a/storageframework/src/tests/thread/tickingthreadtest.cpp
+++ b/storageframework/src/tests/thread/tickingthreadtest.cpp
@@ -6,7 +6,6 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/exception.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/time.h>
#include <thread>
namespace storage::framework::defaultimplementation {
@@ -28,7 +27,7 @@ struct MyApp : public TickingThread {
TickingThreadPool::UP _threadPool;
MyApp(int threadCount, bool doCritOverlapTest = false);
- ~MyApp();
+ ~MyApp() override;
void start(ThreadPool& p) { _threadPool->start(p); }
@@ -95,7 +94,7 @@ MyApp::MyApp(int threadCount, bool doCritOverlapTest)
}
}
-MyApp::~MyApp() { }
+MyApp::~MyApp() = default;
}
@@ -120,17 +119,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_basic)
TEST(TickingThreadTest, test_ticks_before_wait_live_update)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg = std::make_unique<ComponentRegisterImpl>();
int threadCount = 1;
MyApp app(threadCount);
// Configure thread pool to send bulks of 5000 ticks each second.
long unsigned int ticksBeforeWaitMs = 5000;
- MilliSecTime waitTimeMs(1000);
- MilliSecTime maxProcessingTime(234234);
app.start(testReg.getThreadPoolImpl());
app._threadPool->updateParametersAllThreads(
- waitTimeMs, maxProcessingTime, ticksBeforeWaitMs);
+ 1s, 234234ms, ticksBeforeWaitMs);
// Check that 5000 ticks are received instantly (usually <2 ms)
// (if live update is broken it will take more than an hour).
@@ -146,16 +142,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_live_update)
TEST(TickingThreadTest, test_destroy_without_starting)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app(threadCount, true);
}
TEST(TickingThreadTest, test_verbose_stopping)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app(threadCount, true);
app.start(testReg.getThreadPoolImpl());
@@ -167,8 +161,7 @@ TEST(TickingThreadTest, test_verbose_stopping)
TEST(TickingThreadTest, test_stop_on_deletion)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app(threadCount, true);
app.start(testReg.getThreadPoolImpl());
@@ -179,8 +172,7 @@ TEST(TickingThreadTest, test_stop_on_deletion)
TEST(TickingThreadTest, test_lock_all_ticks)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app1(threadCount);
MyApp app2(threadCount);
@@ -207,8 +199,7 @@ TEST(TickingThreadTest, test_lock_all_ticks)
TEST(TickingThreadTest, test_lock_critical_ticks)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
uint64_t iterationsBeforeOverlap = 0;
{
@@ -290,18 +281,17 @@ struct BroadcastApp : public TickingThread {
};
BroadcastApp::BroadcastApp()
- : _threadPool(TickingThreadPool::createDefault("testApp", MilliSecTime(300000)))
+ : _threadPool(TickingThreadPool::createDefault("testApp", 300s))
{
_threadPool->addThread(*this);
}
-BroadcastApp::~BroadcastApp() {}
+BroadcastApp::~BroadcastApp() = default;
}
TEST(TickingThreadTest, test_broadcast)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
BroadcastApp app;
app.start(testReg.getThreadPoolImpl());
app.doTask("foo");
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp
index e4560370a01..e0f441089ff 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp
@@ -159,12 +159,6 @@ ComponentRegisterImpl::registerUpdateHook(vespalib::stringref name,
_hooks.emplace_back(std::move(hookPtr));
}
-metrics::MetricLockGuard
-ComponentRegisterImpl::getMetricManagerLock()
-{
- return _metricManager->getMetricLock();
-}
-
void
ComponentRegisterImpl::registerShutdownListener(ShutdownListener& listener)
{
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h
index 3adef5f4838..c7b74ec631a 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h
@@ -22,6 +22,7 @@
#include <vespa/storageframework/generic/metric/metricregistrator.h>
#include <vespa/storageframework/generic/status/statusreportermap.h>
#include <vespa/metrics/metricset.h>
+#include <mutex>
namespace metrics {
@@ -57,7 +58,7 @@ public:
typedef std::unique_ptr<ComponentRegisterImpl> UP;
ComponentRegisterImpl();
- ~ComponentRegisterImpl();
+ ~ComponentRegisterImpl() override;
bool hasMetricManager() const { return (_metricManager != 0); }
metrics::MetricManager& getMetricManager() { return *_metricManager; }
@@ -75,7 +76,6 @@ public:
void registerMetric(metrics::Metric&) override;
void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) override;
- vespalib::MonitorGuard getMetricManagerLock() override;
void registerShutdownListener(ShutdownListener&);
};
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
index 4b9b2639d3b..ba3d52fb8f5 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
@@ -12,8 +12,8 @@ namespace storage::framework::defaultimplementation {
ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
Runnable& runnable,
vespalib::stringref id,
- uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
+ vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs,
int ticksBeforeWait)
: Thread(id),
_pool(pool),
@@ -25,7 +25,7 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
_joined(false),
_thread(*this)
{
- _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getTimeInMillis().getTime();
+ _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getMonotonicTime();
_thread.start(_pool.getThreadPool());
}
@@ -69,30 +69,29 @@ ThreadImpl::join()
}
void
-ThreadImpl::registerTick(CycleType cycleType, MilliSecTime time)
+ThreadImpl::registerTick(CycleType cycleType, vespalib::steady_time now)
{
- if (!time.isSet()) time = _pool.getClock().getTimeInMillis();
+ if (now.time_since_epoch() == vespalib::duration::zero()) now = _pool.getClock().getMonotonicTime();
ThreadTickData data(getTickData());
- uint64_t previousTickMs = data._lastTickMs;
- uint64_t nowMs = time.getTime();
- data._lastTickMs = nowMs;
+ vespalib::steady_clock::time_point previousTick = data._lastTick;
+ data._lastTick = now;
data._lastTickType = cycleType;
setTickData(data);
- if (data._lastTickMs == 0) { return; }
+ if (data._lastTick.time_since_epoch() == vespalib::duration::zero()) { return; }
- if (previousTickMs > nowMs) {
+ if (previousTick > now) {
LOGBP(warning, "Thread is registering tick at time %" PRIu64 ", but "
"last time it registered a tick, the time was %" PRIu64
". Assuming clock has been adjusted backwards",
- nowMs, previousTickMs);
+ vespalib::count_ms(now.time_since_epoch()), vespalib::count_ms(previousTick.time_since_epoch()));
return;
}
- uint64_t cycleTimeMs = nowMs - previousTickMs;
+ vespalib::duration cycleTimeMs = now - previousTick;
if (cycleType == WAIT_CYCLE) {
- data._maxWaitTimeSeenMs = std::max(data._maxWaitTimeSeenMs, cycleTimeMs);
+ data._maxWaitTimeSeen = std::max(data._maxWaitTimeSeen, cycleTimeMs);
} else {
- data._maxProcessingTimeSeenMs = std::max(data._maxProcessingTimeSeenMs, cycleTimeMs);
+ data._maxProcessingTimeSeen = std::max(data._maxProcessingTimeSeen, cycleTimeMs);
}
}
@@ -111,9 +110,9 @@ ThreadImpl::setTickData(const ThreadTickData& tickData)
}
void
-ThreadImpl::updateParameters(uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
- int ticksBeforeWait) {
+ThreadImpl::updateParameters(vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs,
+ int ticksBeforeWait) {
_properties.setWaitTime(waitTimeMs);
_properties.setMaxProcessTime(maxProcessTimeMs);
_properties.setTicksBeforeWait(ticksBeforeWait);
@@ -125,9 +124,9 @@ ThreadImpl::AtomicThreadTickData::loadRelaxed() const noexcept
ThreadTickData result;
constexpr auto relaxed = std::memory_order_relaxed;
result._lastTickType = _lastTickType.load(relaxed);
- result._lastTickMs = _lastTickMs.load(relaxed);
- result._maxProcessingTimeSeenMs = _maxProcessingTimeSeenMs.load(relaxed);
- result._maxWaitTimeSeenMs = _maxWaitTimeSeenMs.load(relaxed);
+ result._lastTick = _lastTickMs.load(relaxed);
+ result._maxProcessingTimeSeen = _maxProcessingTimeSeenMs.load(relaxed);
+ result._maxWaitTimeSeen = _maxWaitTimeSeenMs.load(relaxed);
return result;
}
@@ -137,9 +136,9 @@ ThreadImpl::AtomicThreadTickData::storeRelaxed(
{
constexpr auto relaxed = std::memory_order_relaxed;
_lastTickType.store(newState._lastTickType, relaxed);
- _lastTickMs.store(newState._lastTickMs, relaxed);
- _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeenMs, relaxed);
- _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeenMs, relaxed);
+ _lastTickMs.store(newState._lastTick, relaxed);
+ _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeen, relaxed);
+ _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeen, relaxed);
}
}
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
index 17967c66bdf..077e508a609 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
@@ -26,10 +26,16 @@ class ThreadImpl : public Thread
* on code using it.
*/
struct AtomicThreadTickData {
+ AtomicThreadTickData() noexcept
+ : _lastTickType(),
+ _lastTickMs(vespalib::steady_time(vespalib::duration::zero())),
+ _maxProcessingTimeSeenMs(),
+ _maxWaitTimeSeenMs()
+ {}
std::atomic<CycleType> _lastTickType;
- std::atomic<uint64_t> _lastTickMs;
- std::atomic<uint64_t> _maxProcessingTimeSeenMs;
- std::atomic<uint64_t> _maxWaitTimeSeenMs;
+ std::atomic<vespalib::steady_time> _lastTickMs;
+ std::atomic<vespalib::duration> _maxProcessingTimeSeenMs;
+ std::atomic<vespalib::duration> _maxWaitTimeSeenMs;
// struct stores and loads are both data race free with relaxed
// memory semantics. This means it's possible to observe stale/partial
// state in a case with concurrent readers/writers.
@@ -49,26 +55,23 @@ class ThreadImpl : public Thread
void run();
public:
- ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs, int ticksBeforeWait);
+ ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs, int ticksBeforeWait);
~ThreadImpl();
bool interrupted() const override;
bool joined() const override;
void interrupt() override;
void join() override;
- void registerTick(CycleType, MilliSecTime) override;
- uint64_t getWaitTime() const override {
+ void registerTick(CycleType, vespalib::steady_time) override;
+ vespalib::duration getWaitTime() const override {
return _properties.getWaitTime();
}
int getTicksBeforeWait() const override {
return _properties.getTicksBeforeWait();
}
- uint64_t getMaxProcessTime() const override {
- return _properties.getMaxProcessTime();
- }
- void updateParameters(uint64_t waitTime, uint64_t maxProcessTime, int ticksBeforeWait) override;
+ void updateParameters(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) override;
void setTickData(const ThreadTickData&);
ThreadTickData getTickData() const;
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
index e4a1188030c..e5698f0cbc2 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
@@ -46,17 +46,16 @@ ThreadPoolImpl::~ThreadPoolImpl()
}
Thread::UP
-ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, uint64_t waitTimeMs,
- uint64_t maxProcessTime, int ticksBeforeWait)
+ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTime, int ticksBeforeWait)
{
std::lock_guard lock(_threadVectorLock);
if (_stopping) {
throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC);
}
- ThreadImpl* ti;
- Thread::UP t(ti = new ThreadImpl(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait));
- _threads.push_back(ti);
- return t;
+ auto thread = std::make_unique<ThreadImpl>(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait);
+ _threads.push_back(thread.get());
+ return thread;
}
void
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
index 5f0ee523eff..5a025ac3ad3 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
@@ -21,8 +21,8 @@ public:
ThreadPoolImpl(Clock&);
~ThreadPoolImpl() override;
- Thread::UP startThread(Runnable&, vespalib::stringref id, uint64_t waitTimeMs,
- uint64_t maxProcessTime, int ticksBeforeWait) override;
+ Thread::UP startThread(Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTime, int ticksBeforeWait) override;
void visitThreads(ThreadVisitor&) const override;
void registerThread(ThreadImpl&);
diff --git a/storageframework/src/vespa/storageframework/generic/clock/time.h b/storageframework/src/vespa/storageframework/generic/clock/time.h
index 4c578f9b33d..811c6cfceb0 100644
--- a/storageframework/src/vespa/storageframework/generic/clock/time.h
+++ b/storageframework/src/vespa/storageframework/generic/clock/time.h
@@ -3,7 +3,7 @@
#include <boost/operators.hpp>
#include <vespa/vespalib/stllike/string.h>
-#include <chrono>
+#include <vespa/vespalib/util/time.h>
namespace vespalib {
class asciistream;
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.cpp b/storageframework/src/vespa/storageframework/generic/component/component.cpp
index 3818a2865d9..44468bfead3 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.cpp
+++ b/storageframework/src/vespa/storageframework/generic/component/component.cpp
@@ -23,12 +23,12 @@ Component::close()
Component::Component(ComponentRegister& cr, vespalib::stringref name)
: _componentRegister(&cr),
_name(name),
- _status(0),
- _metric(0),
- _threadPool(0),
- _metricReg(0),
- _clock(0),
- _listener(0)
+ _status(nullptr),
+ _metric(nullptr),
+ _threadPool(nullptr),
+ _metricReg(nullptr),
+ _clock(nullptr),
+ _listener(nullptr)
{
cr.registerComponent(*this);
}
@@ -38,23 +38,23 @@ Component::~Component() = default;
void
Component::registerComponentStateListener(ComponentStateListener& l)
{
- assert(_listener == 0);
+ assert(_listener == nullptr);
_listener = &l;
}
void
Component::registerStatusPage(const StatusReporter& sr)
{
- assert(_status == 0);
+ assert(_status == nullptr);
_status = &sr;
}
void
Component::registerMetric(metrics::Metric& m)
{
- assert(_metric == 0);
+ assert(_metric == nullptr);
_metric = &m;
- if (_metricReg != 0) {
+ if (_metricReg != nullptr) {
_metricReg->registerMetric(m);
}
}
@@ -64,28 +64,18 @@ Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period)
{
assert(_metricUpdateHook.first == 0);
_metricUpdateHook = std::make_pair(&hook, period);
- if (_metricReg != 0) {
+ if (_metricReg != nullptr) {
_metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
}
-vespalib::MonitorGuard
-Component::getMetricManagerLock()
-{
- if (_metricReg != 0) {
- return _metricReg->getMetricManagerLock();
- } else {
- return vespalib::MonitorGuard();
- }
-}
-
void
Component::setMetricRegistrator(MetricRegistrator& mr) {
_metricReg = &mr;
if (_metricUpdateHook.first != 0) {
_metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
- if (_metric != 0) {
+ if (_metric != nullptr) {
_metricReg->registerMetric(*_metric);
}
}
@@ -93,16 +83,16 @@ Component::setMetricRegistrator(MetricRegistrator& mr) {
ThreadPool&
Component::getThreadPool() const
{
- assert(_threadPool != 0);
+ assert(_threadPool != nullptr);
return *_threadPool;
}
// Helper functions for components wanting to start a single thread.
Thread::UP
-Component::startThread(Runnable& runnable, MilliSecTime waitTime, MilliSecTime maxProcessTime, int ticksBeforeWait)
+Component::startThread(Runnable& runnable, vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait)
{
- return getThreadPool().startThread(runnable, getName(), waitTime.getTime(),
- maxProcessTime.getTime(), ticksBeforeWait);
+ return getThreadPool().startThread(runnable, getName(), waitTime,
+ maxProcessTime, ticksBeforeWait);
}
void
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.h b/storageframework/src/vespa/storageframework/generic/component/component.h
index c91d7feb532..bfc80f524b5 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.h
+++ b/storageframework/src/vespa/storageframework/generic/component/component.h
@@ -71,7 +71,6 @@
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageframework/generic/thread/thread.h>
#include <vespa/storageframework/generic/clock/clock.h>
-#include <vespa/vespalib/util/sync.h>
#include <atomic>
namespace storage::framework {
@@ -153,12 +152,6 @@ public:
*/
void registerMetricUpdateHook(MetricUpdateHook&, SecondTime period);
- /**
- * If you need to modify the metric sets that have been registered, you need
- * to hold the metric manager lock while you do it.
- */
- vespalib::MonitorGuard getMetricManagerLock();
-
/** Get the name of the component. Must be a unique name. */
const vespalib::string& getName() const override { return _name; }
@@ -185,8 +178,8 @@ public:
* in this thread. (Thus one is not required to call registerTick())
*/
Thread::UP startThread(Runnable&,
- MilliSecTime maxProcessTime = MilliSecTime(0),
- MilliSecTime waitTime = MilliSecTime(0),
+ vespalib::duration maxProcessTime = vespalib::duration::zero(),
+ vespalib::duration waitTime = vespalib::duration::zero(),
int ticksBeforeWait = 1);
// Check upgrade flag settings. Note that this flag may change at any time.
diff --git a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h
index 6cdbf58d555..53988651cbc 100644
--- a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h
+++ b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h
@@ -11,7 +11,6 @@
#pragma once
#include <vespa/storageframework/generic/clock/time.h>
-#include <vespa/vespalib/util/sync.h>
namespace metrics {
class Metric;
@@ -26,7 +25,6 @@ struct MetricRegistrator {
virtual void registerMetric(metrics::Metric&) = 0;
virtual void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) = 0;
- virtual vespalib::MonitorGuard getMetricManagerLock() = 0;
};
}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/runnable.h b/storageframework/src/vespa/storageframework/generic/thread/runnable.h
index 245522a7421..a2615694e7c 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/runnable.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/runnable.h
@@ -43,11 +43,9 @@ struct ThreadHandle {
* clock fetches if client already knows current time)
*/
virtual void registerTick(CycleType = UNKNOWN_CYCLE,
- MilliSecTime currentTime = MilliSecTime(0)) = 0;
+ vespalib::steady_time = vespalib::steady_time()) = 0;
- virtual uint64_t getWaitTime() const = 0;
-
- virtual uint64_t getMaxProcessTime() const = 0;
+ virtual vespalib::duration getWaitTime() const = 0;
/**
* The number of ticks done before wait is called when no more work is
diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h
index 814686eb4fa..e37bba4c723 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/thread.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h
@@ -50,8 +50,8 @@ public:
*/
virtual void join() = 0;
- virtual void updateParameters(uint64_t waitTime,
- uint64_t maxProcessTime,
+ virtual void updateParameters(vespalib::duration waitTime,
+ vespalib::duration maxProcessTime,
int ticksBeforeWait) = 0;
/**
diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp
index d72b0c54544..3c2ebba42f7 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp
+++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp
@@ -2,23 +2,22 @@
#include "threadpool.h"
-namespace storage {
-namespace framework {
+namespace storage::framework {
-ThreadProperties::ThreadProperties(uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
+ThreadProperties::ThreadProperties(vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs,
int ticksBeforeWait)
{
- _waitTimeMs.store(waitTimeMs);
- _maxProcessTimeMs.store(maxProcessTimeMs);
- _ticksBeforeWait.store(ticksBeforeWait);
+ setWaitTime(waitTimeMs);
+ setMaxProcessTime(maxProcessTimeMs);
+ setTicksBeforeWait(ticksBeforeWait);
}
-uint64_t ThreadProperties::getMaxProcessTime() const {
+ vespalib::duration ThreadProperties::getMaxProcessTime() const {
return _maxProcessTimeMs.load(std::memory_order_relaxed);
}
-uint64_t ThreadProperties::getWaitTime() const {
+vespalib::duration ThreadProperties::getWaitTime() const {
return _waitTimeMs.load(std::memory_order_relaxed);
}
@@ -26,11 +25,11 @@ int ThreadProperties::getTicksBeforeWait() const {
return _ticksBeforeWait.load(std::memory_order_relaxed);
}
-void ThreadProperties::setMaxProcessTime(uint64_t maxProcessingTimeMs) {
+void ThreadProperties::setMaxProcessTime(vespalib::duration maxProcessingTimeMs) {
_maxProcessTimeMs.store(maxProcessingTimeMs);
}
-void ThreadProperties::setWaitTime(uint64_t waitTimeMs) {
+void ThreadProperties::setWaitTime(vespalib::duration waitTimeMs) {
_waitTimeMs.store(waitTimeMs);
}
@@ -38,5 +37,4 @@ void ThreadProperties::setTicksBeforeWait(int ticksBeforeWait) {
_ticksBeforeWait.store(ticksBeforeWait);
}
-} // framework
-} // storage
+}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h
index 8ebe382bddf..0ee800c43ff 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h
@@ -27,12 +27,12 @@ namespace storage::framework {
* thread.
*/
class ThreadProperties {
- private:
+private:
/**
* Time this thread should maximum use to process before a tick is
* registered. (Including wait time if wait time is not set)
*/
- std::atomic_uint_least64_t _maxProcessTimeMs;
+ std::atomic<vespalib::duration> _maxProcessTimeMs;
/**
* Time this thread will wait in a non-interrupted wait cycle.
* Used in cases where a wait cycle is registered. As long as no other
@@ -40,26 +40,26 @@ class ThreadProperties {
* wait time here. The deadlock detector should add a configurable
* global time period before flagging deadlock anyways.
*/
- std::atomic_uint_least64_t _waitTimeMs;
+ std::atomic<vespalib::duration> _waitTimeMs;
/**
* Number of ticks to be done before a wait.
*/
std::atomic_uint _ticksBeforeWait;
public:
- ThreadProperties(uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
+ ThreadProperties(vespalib::duration waitTime,
+ vespalib::duration maxProcessTimeMs,
int ticksBeforeWait);
- void setMaxProcessTime(uint64_t);
- void setWaitTime(uint64_t);
+ void setMaxProcessTime(vespalib::duration);
+ void setWaitTime(vespalib::duration);
void setTicksBeforeWait(int);
- uint64_t getMaxProcessTime() const;
- uint64_t getWaitTime() const;
+ vespalib::duration getMaxProcessTime() const;
+ vespalib::duration getWaitTime() const;
int getTicksBeforeWait() const;
- uint64_t getMaxCycleTime() const {
+ vespalib::duration getMaxCycleTime() const {
return std::max(_maxProcessTimeMs.load(std::memory_order_relaxed),
_waitTimeMs.load(std::memory_order_relaxed));
}
@@ -68,26 +68,26 @@ class ThreadProperties {
/** Data kept on each thread due to the registerTick functinality. */
struct ThreadTickData {
CycleType _lastTickType;
- uint64_t _lastTickMs;
- uint64_t _maxProcessingTimeSeenMs;
- uint64_t _maxWaitTimeSeenMs;
+ vespalib::steady_time _lastTick;
+ vespalib::duration _maxProcessingTimeSeen;
+ vespalib::duration _maxWaitTimeSeen;
};
/** Interface used to access data for the existing threads. */
struct ThreadVisitor {
- virtual ~ThreadVisitor() {}
+ virtual ~ThreadVisitor() = default;
virtual void visitThread(const vespalib::string& id,
const ThreadProperties&,
const ThreadTickData&) = 0;
};
struct ThreadPool {
- virtual ~ThreadPool() {}
+ virtual ~ThreadPool() = default;
virtual Thread::UP startThread(Runnable&,
vespalib::stringref id,
- uint64_t waitTimeMs,
- uint64_t maxProcessTime,
+ vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTime,
int ticksBeforeWait) = 0;
virtual void visitThreads(ThreadVisitor&) const = 0;
diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
index 3178220654e..7913cdcfe84 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
+++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
@@ -22,20 +22,22 @@ ThreadWaitInfo::merge(const ThreadWaitInfo& other) {
* global synchronization point where no thread is currently running.
*/
class TickingThreadRunner final : public Runnable {
- vespalib::Monitor& _monitor;
- TickingThread& _tickingThread;
- uint32_t _threadIndex;
- bool _wantToFreeze;
- bool _frozen;
- char _state;
+ std::mutex & _monitor;
+ std::condition_variable & _cond;
+ TickingThread & _tickingThread;
+ uint32_t _threadIndex;
+ bool _wantToFreeze;
+ bool _frozen;
+ char _state;
public:
typedef std::shared_ptr<TickingThreadRunner> SP;
- TickingThreadRunner(vespalib::Monitor& m,
+ TickingThreadRunner(std::mutex& m,
+ std::condition_variable & cond,
TickingThread& ticker,
uint32_t threadIndex) noexcept
- : _monitor(m), _tickingThread(ticker),
+ : _monitor(m), _cond(cond), _tickingThread(ticker),
_threadIndex(threadIndex), _wantToFreeze(false), _frozen(false) {}
/**
@@ -43,10 +45,10 @@ public:
* tick and has frozen.
*/
void freeze() {
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_monitor);
_wantToFreeze = true;
while (!_frozen) {
- guard.wait();
+ _cond.wait(guard);
}
}
@@ -54,9 +56,11 @@ public:
* Call to thaw up a frozen thread so it can continue.
*/
void thaw() {
- vespalib::MonitorGuard guard(_monitor);
- _wantToFreeze = false;
- guard.broadcast();
+ {
+ std::lock_guard guard(_monitor);
+ _wantToFreeze = false;
+ }
+ _cond.notify_all();
}
char getState() const { return _state; }
@@ -68,18 +72,18 @@ private:
int ticksExecutedAfterWait = 0;
while (!handle.interrupted()) {
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_monitor);
if (info.waitWanted()) {
_state = 'w';
cycle = WAIT_CYCLE;
if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) {
- guard.wait(handle.getWaitTime());
+ _cond.wait_for(guard, handle.getWaitTime());
ticksExecutedAfterWait = 0;
}
}
if (_wantToFreeze) {
_state = 'f';
- doFreeze(guard);
+ doFreeze(guard, _cond);
ticksExecutedAfterWait = 0;
}
_state = 'c';
@@ -93,22 +97,23 @@ private:
}
_state = 's';
}
- void doFreeze(vespalib::MonitorGuard& guard) {
+ void doFreeze(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) {
_frozen = true;
- guard.broadcast();
+ cond.notify_all();
while (_wantToFreeze) {
- guard.wait();
+ _cond.wait(guard);
}
_frozen = false;
}
};
class TickingThreadPoolImpl final : public TickingThreadPool {
- vespalib::string _name;
- vespalib::Monitor _monitor;
- std::atomic_uint_least64_t _waitTime;
- std::atomic_uint _ticksBeforeWait;
- std::atomic_uint_least64_t _maxProcessTime;
+ vespalib::string _name;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ std::atomic<vespalib::duration> _waitTime;
+ std::atomic_uint _ticksBeforeWait;
+ std::atomic<vespalib::duration> _maxProcessTime;
std::vector<TickingThreadRunner::SP> _tickers;
std::vector<std::shared_ptr<Thread>> _threads;
@@ -120,40 +125,40 @@ class TickingThreadPoolImpl final : public TickingThreadPool {
void broadcast() override {}
};
struct CriticalGuard final : public TickingLockGuard::Impl {
- vespalib::MonitorGuard _guard;
+ std::condition_variable &_cond;
- explicit CriticalGuard(vespalib::Monitor& m) : _guard(m) {}
+ explicit CriticalGuard(std::condition_variable & cond) : _cond(cond) {}
- void broadcast() override { _guard.broadcast(); }
+ void broadcast() override { _cond.notify_all(); }
};
public:
- TickingThreadPoolImpl(vespalib::stringref name, MilliSecTime waitTime,
- int ticksBeforeWait, MilliSecTime maxProcessTime)
+ TickingThreadPoolImpl(vespalib::stringref name, vespalib::duration waitTime,
+ int ticksBeforeWait, vespalib::duration maxProcessTime)
: _name(name),
- _waitTime(waitTime.getTime()),
+ _waitTime(waitTime),
_ticksBeforeWait(ticksBeforeWait),
- _maxProcessTime(maxProcessTime.getTime()) {}
+ _maxProcessTime(maxProcessTime) {}
~TickingThreadPoolImpl() override {
stop();
}
- void updateParametersAllThreads(MilliSecTime waitTime, MilliSecTime maxProcessTime,
+ void updateParametersAllThreads(vespalib::duration waitTime, vespalib::duration maxProcessTime,
int ticksBeforeWait) override {
- _waitTime.store(waitTime.getTime());
- _maxProcessTime.store(maxProcessTime.getTime());
+ _waitTime.store(waitTime);
+ _maxProcessTime.store(maxProcessTime);
_ticksBeforeWait.store(ticksBeforeWait);
// TODO: Add locking so threads not deleted while updating
for (uint32_t i=0; i<_threads.size(); ++i) {
- _threads[i]->updateParameters(waitTime.getTime(), maxProcessTime.getTime(), ticksBeforeWait);
+ _threads[i]->updateParameters(waitTime, maxProcessTime, ticksBeforeWait);
}
}
void addThread(TickingThread& ticker) override {
ThreadIndex index = _tickers.size();
ticker.newThreadCreated(index);
- _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_monitor, ticker, index));
+ _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_lock, _cond, ticker, index));
}
void start(ThreadPool& pool) override {
@@ -175,7 +180,7 @@ public:
}
TickingLockGuard freezeCriticalTicks() override {
- return TickingLockGuard(std::make_unique<CriticalGuard>(_monitor));
+ return TickingLockGuard(std::make_unique<CriticalGuard>(_cond));
}
void stop() override {
@@ -183,8 +188,7 @@ public:
t->interrupt();
}
{
- vespalib::MonitorGuard guard(_monitor);
- guard.broadcast();
+ _cond.notify_all();
}
for (auto& t : _threads) {
t->join();
@@ -216,9 +220,9 @@ private:
TickingThreadPool::UP
TickingThreadPool::createDefault(
vespalib::stringref name,
- MilliSecTime waitTime,
+ vespalib::duration waitTime,
int ticksBeforeWait,
- MilliSecTime maxProcessTime)
+ vespalib::duration maxProcessTime)
{
return std::make_unique<TickingThreadPoolImpl>(name, waitTime, ticksBeforeWait, maxProcessTime);
}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
index 1ed02d0f6fd..0649d914c75 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
@@ -80,13 +80,13 @@ struct TickingThreadPool : public ThreadLock {
static TickingThreadPool::UP createDefault(
vespalib::stringref name,
- MilliSecTime waitTime = MilliSecTime(5),
+ vespalib::duration waitTime = 5ms,
int ticksBeforeWait = 1,
- MilliSecTime maxProcessTime = SecondTime(5).getMillis());
+ vespalib::duration maxProcessTime = 5s);
virtual void updateParametersAllThreads(
- MilliSecTime waitTime,
- MilliSecTime maxProcessTime,
+ vespalib::duration waitTime,
+ vespalib::duration maxProcessTime,
int ticksBeforeWait) = 0;
~TickingThreadPool() override = default;