summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/distributortest.cpp9
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp4
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp20
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.hpp21
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp1
-rw-r--r--storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp81
-rw-r--r--storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h34
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp10
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp14
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp4
-rw-r--r--storageframework/src/tests/thread/tickingthreadtest.cpp34
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp6
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h4
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp45
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h25
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp11
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h4
-rw-r--r--storageframework/src/vespa/storageframework/generic/clock/time.h2
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.cpp42
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.h11
-rw-r--r--storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h2
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/runnable.h6
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/thread.h4
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp24
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/threadpool.h34
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp86
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/tickingthread.h8
35 files changed, 256 insertions, 335 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 454db5d2404..8f308ec7984 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -396,17 +396,12 @@ TEST_F(DistributorTest, tick_processes_status_requests) {
StatusRequestThread thread(distributor_status_delegate());
FakeClock clock;
ThreadPoolImpl pool(clock);
-
- uint64_t tickWaitMs = 5;
- uint64_t tickMaxProcessTime = 5000;
int ticksBeforeWait = 1;
- framework::Thread::UP tp(pool.startThread(
- thread, "statustest", tickWaitMs, tickMaxProcessTime, ticksBeforeWait));
+ framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait));
while (true) {
std::this_thread::sleep_for(1ms);
- framework::TickingLockGuard guard(
- distributor_thread_pool().freezeCriticalTicks());
+ framework::TickingLockGuard guard(distributor_thread_pool().freezeCriticalTicks());
if (!distributor_status_todos().empty()) {
break;
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 39dae51721a..7b9d5b77c98 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -414,9 +414,7 @@ void BucketManager::onOpen()
void BucketManager::startWorkerThread()
{
- framework::MilliSecTime maxProcessingTime(30 * 1000);
- framework::MilliSecTime waitTime(1000);
- _thread = _component.startThread(*this, maxProcessingTime, waitTime);
+ _thread = _component.startThread(*this, 30s, 1s);
}
// --------- Commands --------- //
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
index c5d70fda1ad..2c1a149ab4b 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
+++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
@@ -178,10 +178,7 @@ StorageBucketDBInitializer::onOpen()
sendDown(msg);
}
}
- framework::MilliSecTime maxProcessingTime(10);
- framework::MilliSecTime sleepTime(1000);
- _system._thread = _system._component.startThread(
- *this, maxProcessingTime, sleepTime);
+ _system._thread = _system._component.startThread(*this, 10ms, 1s);
}
void
@@ -203,19 +200,14 @@ StorageBucketDBInitializer::run(framework::ThreadHandle& thread)
std::lock_guard<std::mutex> replyGuard(_state._replyLock);
_state._replies.swap(replies);
}
- for (std::list<api::StorageMessage::SP>::iterator it = replies.begin();
- it != replies.end(); ++it)
- {
- api::InternalReply& reply(static_cast<api::InternalReply&>(**it));
+ for (api::StorageMessage::SP & msg : replies) {
+ api::InternalReply& reply(static_cast<api::InternalReply&>(*msg));
if (reply.getType() == ReadBucketListReply::ID) {
- handleReadBucketListReply(
- static_cast<ReadBucketListReply&>(reply));
+ handleReadBucketListReply(static_cast<ReadBucketListReply&>(reply));
} else if (reply.getType() == ReadBucketInfoReply::ID) {
- handleReadBucketInfoReply(
- static_cast<ReadBucketInfoReply&>(reply));
+ handleReadBucketInfoReply(static_cast<ReadBucketInfoReply&>(reply));
} else if (reply.getType() == InternalBucketJoinReply::ID) {
- handleInternalBucketJoinReply(
- static_cast<InternalBucketJoinReply&>(reply));
+ handleInternalBucketJoinReply(static_cast<InternalBucketJoinReply&>(reply));
}
}
if (_state._gottenInitProgress) {
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp
index ea6430f9e5c..d194adabb92 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.hpp
+++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp
@@ -15,14 +15,14 @@ namespace storage {
template<typename Message>
void
StorageLinkQueued::Dispatcher<Message>::terminate() {
- if (_thread.get()) {
+ if (_thread) {
_thread->interrupt();
{
std::lock_guard<std::mutex> guard(_sync);
_syncCond.notify_one();
}
_thread->join();
- _thread.reset(0);
+ _thread.reset();
}
}
@@ -38,9 +38,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un
std::ostringstream name;
name << "Queued storage " << (_replyDispatcher ? "up" : "down")
<< "link - " << _parent.getName();
- _component.reset(new framework::Component(
- parent.getComponentRegister(),
- name.str()));
+ _component = std::make_unique<framework::Component>(parent.getComponentRegister(), name.str());
}
template<typename Message>
@@ -51,20 +49,16 @@ StorageLinkQueued::Dispatcher<Message>::~Dispatcher() {
template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::start()
{
- assert(_thread.get() == 0);
- framework::MilliSecTime maxProcessTime(5 * 1000);
- framework::MilliSecTime waitTime(100);
- _thread = _component->startThread(*this, maxProcessTime, waitTime);
+ assert( ! _thread);
+ _thread = _component->startThread(*this, 5s, 100ms);
}
template<typename Message>
-void StorageLinkQueued::Dispatcher<Message>::add(
- const std::shared_ptr<Message>& m)
+void StorageLinkQueued::Dispatcher<Message>::add(const std::shared_ptr<Message>& m)
{
- using namespace std::chrono_literals;
std::unique_lock<std::mutex> guard(_sync);
- if (_thread.get() == 0) start();
+ if ( ! _thread) start();
while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) {
_syncCond.wait_for(guard, 100ms);
}
@@ -75,7 +69,6 @@ void StorageLinkQueued::Dispatcher<Message>::add(
template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
{
- using namespace std::chrono_literals;
while (!h.interrupted()) {
h.registerTick(framework::PROCESS_CYCLE);
std::shared_ptr<Message> message;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 352da70d7e4..cb624fb6a7f 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -14,6 +14,7 @@
#include <vespa/storageframework/generic/status/xmlstatusreporter.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vespalib/util/memoryusage.h>
+#include <vespa/vespalib/util/sync.h>
#include <vespa/log/log.h>
LOG_SETUP(".distributor-main");
diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp
index 505828b853e..efd72227db1 100644
--- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp
@@ -21,8 +21,8 @@ DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg, AppKiller:
_cond(),
_enableWarning(true),
_enableShutdown(false),
- _processSlackMs(30 * 1000),
- _waitSlackMs(5 * 1000),
+ _processSlackMs(30s),
+ _waitSlackMs(5s),
_reportedBucketDBLocksAtState(OK)
{
auto* dComp(dynamic_cast<DistributorComponentRegister*>(&compReg));
@@ -31,7 +31,7 @@ DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg, AppKiller:
_component = _dComponent.get();
} else {
auto* slComp(dynamic_cast<ServiceLayerComponentRegister*>(&compReg));
- assert(slComp != 0);
+ assert(slComp != nullptr);
_slComponent = std::make_unique<ServiceLayerComponent>(*slComp, "deadlockdetector");
_component = _slComponent.get();
}
@@ -96,29 +96,27 @@ DeadLockDetector::visitThreads(ThreadVisitor& visitor) const
}
bool
-DeadLockDetector::isAboveFailThreshold(
- const framework::MilliSecTime& time,
- const framework::ThreadProperties& tp,
- const framework::ThreadTickData& tick) const
+DeadLockDetector::isAboveFailThreshold(vespalib::steady_time time,
+ const framework::ThreadProperties& tp,
+ const framework::ThreadTickData& tick) const
{
- if (tp.getMaxCycleTime() == 0) {
+ if (tp.getMaxCycleTime() == vespalib::duration::zero()) {
return false;
}
- uint64_t slack(tick._lastTickType == framework::WAIT_CYCLE
- ? getWaitSlack().getTime() : getProcessSlack().getTime());
- return (tick._lastTickMs + tp.getMaxCycleTime() + slack < time.getTime());
+ vespalib::duration slack(tick._lastTickType == framework::WAIT_CYCLE
+ ? getWaitSlack() : getProcessSlack());
+ return (tick._lastTick + tp.getMaxCycleTime() + slack < time);
}
bool
-DeadLockDetector::isAboveWarnThreshold(
- const framework::MilliSecTime& time,
- const framework::ThreadProperties& tp,
- const framework::ThreadTickData& tick) const
+DeadLockDetector::isAboveWarnThreshold(vespalib::steady_time time,
+ const framework::ThreadProperties& tp,
+ const framework::ThreadTickData& tick) const
{
- if (tp.getMaxCycleTime() == 0) return false;
- uint64_t slack(tick._lastTickType == framework::WAIT_CYCLE
- ? getWaitSlack().getTime() : getProcessSlack().getTime());
- return (tick._lastTickMs + tp.getMaxCycleTime() + slack / 4 < time.getTime());
+ if (tp.getMaxCycleTime() == vespalib::duration::zero()) return false;
+ vespalib::duration slack = tick._lastTickType == framework::WAIT_CYCLE
+ ? getWaitSlack() : getProcessSlack();
+ return (tick._lastTick + tp.getMaxCycleTime() + slack / 4 < time);
}
vespalib::string
@@ -142,9 +140,9 @@ namespace {
struct ThreadChecker : public DeadLockDetector::ThreadVisitor
{
DeadLockDetector& _detector;
- framework::MilliSecTime _currentTime;
+ vespalib::steady_time _currentTime;
- ThreadChecker(DeadLockDetector& d, const framework::MilliSecTime& time)
+ ThreadChecker(DeadLockDetector& d, vespalib::steady_time time)
: _detector(d), _currentTime(time) {}
void visitThread(const vespalib::string& id,
@@ -153,7 +151,7 @@ namespace {
DeadLockDetector::State& state) override
{
// In case we just got a new tick, ignore the thread
- if (tick._lastTickMs > _currentTime.getTime()) return;
+ if (tick._lastTick > _currentTime) return;
// If thread is already in halted state, ignore it.
if (state == DeadLockDetector::HALTED) return;
@@ -174,7 +172,7 @@ namespace {
}
void
-DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime,
+DeadLockDetector::handleDeadlock(vespalib::steady_time currentTime,
const vespalib::string& id,
const framework::ThreadProperties&,
const framework::ThreadTickData& tick,
@@ -182,7 +180,7 @@ DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime,
{
vespalib::asciistream error;
error << "Thread " << id << " has gone "
- << (currentTime.getTime() - tick._lastTickMs)
+ << vespalib::count_ms(currentTime - tick._lastTick)
<< " milliseconds without registering a tick.";
if (!warnOnly) {
if (_enableShutdown && !warnOnly) {
@@ -200,8 +198,7 @@ DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime,
error.str().data());
if (_reportedBucketDBLocksAtState != WARNED) {
_reportedBucketDBLocksAtState = WARNED;
- LOG(info, "Locks in bucket database at deadlock time:"
- "\n%s",
+ LOG(info, "Locks in bucket database at deadlock time:\n%s",
getBucketLockInfo().c_str());
}
}
@@ -228,8 +225,7 @@ DeadLockDetector::run(framework::ThreadHandle& thread)
{
std::unique_lock sync(_lock);
while (!thread.interrupted()) {
- framework::MilliSecTime time(_component->getClock().getTimeInMillis());
- ThreadChecker checker(*this, time);
+ ThreadChecker checker(*this, _component->getClock().getMonotonicTime());
visitThreads(checker);
_cond.wait_for(sync, 1s);
thread.registerTick(framework::WAIT_CYCLE);
@@ -264,14 +260,14 @@ namespace {
struct ThreadStatusWriter : public DeadLockDetector::ThreadVisitor {
ThreadTable& _table;
- framework::MilliSecTime _time;
- framework::MilliSecTime _processSlack;
- framework::MilliSecTime _waitSlack;
+ vespalib::steady_time _time;
+ vespalib::duration _processSlack;
+ vespalib::duration _waitSlack;
ThreadStatusWriter(ThreadTable& table,
- const framework::MilliSecTime& time,
- framework::MilliSecTime processSlack,
- framework::MilliSecTime waitSlack)
+ vespalib::steady_time time,
+ vespalib::duration processSlack,
+ vespalib::duration waitSlack)
: _table(table), _time(time),
_processSlack(processSlack), _waitSlack(waitSlack) {}
@@ -289,11 +285,11 @@ namespace {
{
_table._table.addRow(id);
uint32_t i = _table._table.getRowCount() - 1;
- _table._msSinceLastTick[i] = _time.getTime() - tick._lastTickMs;
- _table._maxProcTickTime[i] = tp.getMaxProcessTime();
- _table._maxWaitTickTime[i] = tp.getWaitTime();
- _table._maxProcTickTimeSeen[i] = tick._maxProcessingTimeSeenMs;
- _table._maxWaitTickTimeSeen[i] = tick._maxWaitTimeSeenMs;
+ _table._msSinceLastTick[i] = vespalib::count_ms(_time - tick._lastTick);
+ _table._maxProcTickTime[i] = vespalib::count_ms(tp.getMaxProcessTime());
+ _table._maxWaitTickTime[i] = vespalib::count_ms(tp.getWaitTime());
+ _table._maxProcTickTimeSeen[i] = vespalib::count_ms(tick._maxProcessingTimeSeen);
+ _table._maxWaitTickTimeSeen[i] = vespalib::count_ms(tick._maxWaitTimeSeen);
}
};
}
@@ -307,14 +303,15 @@ DeadLockDetector::reportHtmlStatus(std::ostream& os,
ThreadTable threads;
std::lock_guard guard(_lock);
framework::MilliSecTime time(_component->getClock().getTimeInMillis());
- ThreadStatusWriter writer(threads, time, getProcessSlack(), getWaitSlack());
+ ThreadStatusWriter writer(threads, _component->getClock().getMonotonicTime(),
+ getProcessSlack(), getWaitSlack());
visitThreads(writer);
std::ostringstream ost;
threads._table.print(ost);
out << ost.str();
out << "<p>\n"
- << "Note that there is a global slack period of " << getProcessSlack()
- << " ms for processing ticks and " << getWaitSlack()
+ << "Note that there is a global slack period of " << vespalib::count_ms(getProcessSlack())
+ << " ms for processing ticks and " << vespalib::count_ms(getWaitSlack())
<< " ms for wait ticks. Actual shutdown or warning logs will not"
<< " appear before this slack time is expendede on top of the per"
<< " thread value.\n"
diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h
index d438b4ab476..b0bd129f5f3 100644
--- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h
+++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h
@@ -29,32 +29,30 @@ struct DeadLockDetector : private framework::Runnable,
enum State { OK, WARNED, HALTED };
DeadLockDetector(StorageComponentRegister&,
- AppKiller::UP killer = AppKiller::UP(new RealAppKiller));
- ~DeadLockDetector();
+ AppKiller::UP killer = std::make_unique<RealAppKiller>());
+ ~DeadLockDetector() override;
void enableWarning(bool enable);
void enableShutdown(bool enable);
// There are no data read/write dependencies on neither _processSlackMs
// nor _waitSlackMs so relaxed ops suffice.
- void setProcessSlack(framework::MilliSecTime slack) {
- _processSlackMs.store(slack.getTime(), std::memory_order_relaxed);
+ void setProcessSlack(vespalib::duration slack) {
+ _processSlackMs.store(slack, std::memory_order_relaxed);
}
- framework::MilliSecTime getProcessSlack() const {
- return framework::MilliSecTime(
- _processSlackMs.load(std::memory_order_relaxed));
+ vespalib::duration getProcessSlack() const {
+ return _processSlackMs.load(std::memory_order_relaxed);
}
- void setWaitSlack(framework::MilliSecTime slack) {
- _waitSlackMs.store(slack.getTime(), std::memory_order_relaxed);
+ void setWaitSlack(vespalib::duration slack) {
+ _waitSlackMs.store(slack, std::memory_order_relaxed);
}
- framework::MilliSecTime getWaitSlack() const {
- return framework::MilliSecTime(
- _waitSlackMs.load(std::memory_order_relaxed));
+ vespalib::duration getWaitSlack() const {
+ return _waitSlackMs.load(std::memory_order_relaxed);
}
// These utility functions are public as internal anonymous classes are
// using them. Can also be useful for whitebox testing.
struct ThreadVisitor {
- virtual ~ThreadVisitor() {}
+ virtual ~ThreadVisitor() = default;
virtual void visitThread(const vespalib::string& id,
const framework::ThreadProperties&,
const framework::ThreadTickData&,
@@ -62,13 +60,13 @@ struct DeadLockDetector : private framework::Runnable,
};
void visitThreads(ThreadVisitor&) const;
- bool isAboveFailThreshold(const framework::MilliSecTime& time,
+ bool isAboveFailThreshold(vespalib::steady_time time,
const framework::ThreadProperties& tp,
const framework::ThreadTickData& tick) const;
- bool isAboveWarnThreshold(const framework::MilliSecTime& time,
+ bool isAboveWarnThreshold(vespalib::steady_time ,
const framework::ThreadProperties& tp,
const framework::ThreadTickData& tick) const;
- void handleDeadlock(const framework::MilliSecTime& currentTime,
+ void handleDeadlock(vespalib::steady_time currentTime,
const vespalib::string& id,
const framework::ThreadProperties& tp,
const framework::ThreadTickData& tick,
@@ -81,8 +79,8 @@ private:
std::condition_variable _cond;
bool _enableWarning;
bool _enableShutdown;
- std::atomic<uint64_t> _processSlackMs;
- std::atomic<uint64_t> _waitSlackMs;
+ std::atomic<vespalib::duration> _processSlackMs;
+ std::atomic<vespalib::duration> _waitSlackMs;
State _reportedBucketDBLocksAtState;
DistributorComponent::UP _dComponent;
ServiceLayerComponent::UP _slComponent;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index e687a80e08f..09d142d42c0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -326,8 +326,8 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const
if (isPaused()) {
// Wait a single time to see if filestor gets unpaused.
if (!_diskInfo[disk].isClosed()) {
- vespalib::MonitorGuard g(_pauseMonitor);
- g.wait(100);
+ std::unique_lock g(_pauseMonitor);
+ _pauseCond.wait_for(g, 100ms);
}
return !isPaused();
}
@@ -1347,9 +1347,8 @@ FileStorHandlerImpl::pause()
void
FileStorHandlerImpl::resume()
{
- vespalib::MonitorGuard g(_pauseMonitor);
_paused.store(false, std::memory_order_relaxed);
- g.broadcast();
+ _pauseCond.notify_all();
}
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 8e4e105b4dc..1804a47f033 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -293,8 +293,9 @@ private:
std::map<document::Bucket, MergeStatus::SP> _mergeStates;
vespalib::duration _getNextMessageTimeout;
const uint32_t _max_active_merges_per_stripe; // Read concurrently by stripes.
- vespalib::Monitor _pauseMonitor;
- std::atomic<bool> _paused;
+ mutable std::mutex _pauseMonitor;
+ mutable std::condition_variable _pauseCond;
+ std::atomic<bool> _paused;
void reply(api::StorageMessage&, DiskState state) const;
diff --git a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp
index 60d61a39f44..66a122ec666 100644
--- a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp
@@ -85,10 +85,8 @@ ModifiedBucketChecker::configure(
void
ModifiedBucketChecker::onOpen()
{
- framework::MilliSecTime maxProcessingTime(60 * 1000);
- framework::MilliSecTime waitTime(1000);
if (!_singleThreadMode) {
- _thread = _component->startThread(*this, maxProcessingTime, waitTime);
+ _thread = _component->startThread(*this, 60s, 1s);
}
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 6cf0ec194e1..0050ec0012d 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -114,9 +114,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequence
threadName << "Disk " << _env._partition << " thread " << _stripeId;
_component = std::make_unique<ServiceLayerComponent>(compReg, threadName.str());
_bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler);
- framework::MilliSecTime maxProcessingTime(60 * 1000);
- framework::MilliSecTime waitTime(1000);
- _thread = _component->startThread(*this, maxProcessingTime, waitTime);
+ _thread = _component->startThread(*this, 60s, 1s);
}
PersistenceThread::~PersistenceThread()
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ad43fddfb2d..6ad2c960896 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -283,8 +283,7 @@ CommunicationManager::onOpen()
_configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext());
_configFetcher->subscribe<vespa::config::content::core::StorCommunicationmanagerConfig>(_configUri.getConfigId(), this);
_configFetcher->start();
- framework::MilliSecTime maxProcessingTime(60 * 1000);
- _thread = _component.startThread(*this, maxProcessingTime);
+ _thread = _component.startThread(*this, 60s);
if (_shared_rpc_resources) {
_shared_rpc_resources->start_server_and_register_slobrok(_component.getIdentity());
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index 8c2441b5eed..e6bee3248d4 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -74,12 +74,9 @@ DistributorNode::handleConfigChange(vespa::config::content::core::StorDistributo
{
framework::TickingLockGuard guard(_threadPool->freezeAllTicks());
_context.getComponentRegister().setDistributorConfig(c);
- framework::MilliSecTime ticksWaitTime(c.ticksWaitTimeMs);
- framework::MilliSecTime maxProcessTime(c.maxProcessTimeMs);
- _threadPool->updateParametersAllThreads(
- ticksWaitTime,
- maxProcessTime,
- c.ticksBeforeWait);
+ _threadPool->updateParametersAllThreads(std::chrono::milliseconds(c.ticksWaitTimeMs),
+ std::chrono::milliseconds(c.maxProcessTimeMs),
+ c.ticksBeforeWait);
}
void
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 34d16c6c1d3..5e8e6809422 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -260,9 +260,7 @@ MergeThrottler::~MergeThrottler()
void
MergeThrottler::onOpen()
{
- framework::MilliSecTime maxProcessingTime(30 * 1000);
- framework::MilliSecTime waitTime(1000);
- _thread = _component.startThread(*this, maxProcessingTime, waitTime);
+ _thread = _component.startThread(*this, 30s, 1s);
}
void
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index 7dc6b20a2ef..e04bce902d4 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -71,9 +71,8 @@ StateManager::~StateManager()
void
StateManager::onOpen()
{
- framework::MilliSecTime maxProcessingTime(30 * 1000);
if (!_noThreadTestMode) {
- _thread = _component.startThread(*this, maxProcessingTime);
+ _thread = _component.startThread(*this, 30s);
}
}
@@ -608,12 +607,10 @@ StateManager::getNodeInfo() const
stream << "metrics";
try {
metrics::MetricLockGuard lock(_metricManager.getMetricLock());
- std::vector<uint32_t> periods(
- _metricManager.getSnapshotPeriods(lock));
+ std::vector<uint32_t> periods(_metricManager.getSnapshotPeriods(lock));
if (!periods.empty()) {
uint32_t period = periods[0];
- const metrics::MetricSnapshot& snapshot(
- _metricManager.getMetricSnapshot(lock, period));
+ const metrics::MetricSnapshot& snapshot(_metricManager.getMetricSnapshot(lock, period));
metrics::JsonWriter metricJsonWriter(stream);
_metricManager.visit(lock, snapshot, metricJsonWriter, "fleetcontroller");
} else {
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index c1cfc97bab6..c19f8a7a11e 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -200,10 +200,8 @@ StorageNode::initialize()
_deadLockDetector.reset(new DeadLockDetector(_context.getComponentRegister()));
_deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector);
- _deadLockDetector->setProcessSlack(framework::MilliSecTime(
- static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000)));
- _deadLockDetector->setWaitSlack(framework::MilliSecTime(
- static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000)));
+ _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
+ _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
createChain(*_chain_builder);
_chain = std::move(*_chain_builder).build();
@@ -240,8 +238,8 @@ void
StorageNode::initializeStatusWebServer()
{
if (_singleThreadedDebugMode) return;
- _statusWebServer.reset(new StatusWebServer(_context.getComponentRegister(),
- _context.getComponentRegister(), _configUri));
+ _statusWebServer = std::make_unique<StatusWebServer>(_context.getComponentRegister(),
+ _context.getComponentRegister(), _configUri);
}
#define DIFFER(a) (!(oldC.a == newC.a))
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index 7a450508375..e78a0d0193f 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -48,15 +48,13 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
_configFetcher.subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this);
_configFetcher.start();
_component.registerMetric(*_metrics);
- framework::MilliSecTime maxProcessTime(30 * 1000);
- framework::MilliSecTime waitTime(1000);
- _thread = _component.startThread(*this, maxProcessTime, waitTime);
+ _thread = _component.startThread(*this, 30s, 1s);
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
- _visitorFactories["dumpvisitor"].reset(new DumpVisitorSingleFactory);
- _visitorFactories["dumpvisitorsingle"].reset(new DumpVisitorSingleFactory);
- _visitorFactories["testvisitor"].reset(new TestVisitorFactory);
- _visitorFactories["countvisitor"].reset(new CountVisitorFactory);
- _visitorFactories["recoveryvisitor"].reset(new RecoveryVisitorFactory);
+ _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>();
+ _visitorFactories["dumpvisitorsingle"] = std::make_shared<DumpVisitorSingleFactory>();
+ _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>();
+ _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>();
+ _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>();
_component.registerStatusPage(*this);
}
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index e1daeb729b7..acaa96bc418 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -96,9 +96,7 @@ VisitorThread::VisitorThread(uint32_t threadIndex,
_messageSessionFactory(messageSessionFac),
_visitorFactories(visitorFactories)
{
- framework::MilliSecTime maxProcessingTime(30 * 1000);
- framework::MilliSecTime waitTime(1000);
- _thread = _component.startThread(*this, maxProcessingTime, waitTime);
+ _thread = _component.startThread(*this, 30s, 1s);
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
}
diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp
index 8e3d7b29cd6..b877f08474d 100644
--- a/storageframework/src/tests/thread/tickingthreadtest.cpp
+++ b/storageframework/src/tests/thread/tickingthreadtest.cpp
@@ -6,7 +6,6 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/exception.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/time.h>
#include <thread>
namespace storage::framework::defaultimplementation {
@@ -28,7 +27,7 @@ struct MyApp : public TickingThread {
TickingThreadPool::UP _threadPool;
MyApp(int threadCount, bool doCritOverlapTest = false);
- ~MyApp();
+ ~MyApp() override;
void start(ThreadPool& p) { _threadPool->start(p); }
@@ -95,7 +94,7 @@ MyApp::MyApp(int threadCount, bool doCritOverlapTest)
}
}
-MyApp::~MyApp() { }
+MyApp::~MyApp() = default;
}
@@ -120,17 +119,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_basic)
TEST(TickingThreadTest, test_ticks_before_wait_live_update)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg = std::make_unique<ComponentRegisterImpl>();
int threadCount = 1;
MyApp app(threadCount);
// Configure thread pool to send bulks of 5000 ticks each second.
long unsigned int ticksBeforeWaitMs = 5000;
- MilliSecTime waitTimeMs(1000);
- MilliSecTime maxProcessingTime(234234);
app.start(testReg.getThreadPoolImpl());
app._threadPool->updateParametersAllThreads(
- waitTimeMs, maxProcessingTime, ticksBeforeWaitMs);
+ 1s, 234234ms, ticksBeforeWaitMs);
// Check that 5000 ticks are received instantly (usually <2 ms)
// (if live update is broken it will take more than an hour).
@@ -146,16 +142,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_live_update)
TEST(TickingThreadTest, test_destroy_without_starting)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app(threadCount, true);
}
TEST(TickingThreadTest, test_verbose_stopping)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app(threadCount, true);
app.start(testReg.getThreadPoolImpl());
@@ -167,8 +161,7 @@ TEST(TickingThreadTest, test_verbose_stopping)
TEST(TickingThreadTest, test_stop_on_deletion)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app(threadCount, true);
app.start(testReg.getThreadPoolImpl());
@@ -179,8 +172,7 @@ TEST(TickingThreadTest, test_stop_on_deletion)
TEST(TickingThreadTest, test_lock_all_ticks)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
MyApp app1(threadCount);
MyApp app2(threadCount);
@@ -207,8 +199,7 @@ TEST(TickingThreadTest, test_lock_all_ticks)
TEST(TickingThreadTest, test_lock_critical_ticks)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
int threadCount = 5;
uint64_t iterationsBeforeOverlap = 0;
{
@@ -290,18 +281,17 @@ struct BroadcastApp : public TickingThread {
};
BroadcastApp::BroadcastApp()
- : _threadPool(TickingThreadPool::createDefault("testApp", MilliSecTime(300000)))
+ : _threadPool(TickingThreadPool::createDefault("testApp", 300s))
{
_threadPool->addThread(*this);
}
-BroadcastApp::~BroadcastApp() {}
+BroadcastApp::~BroadcastApp() = default;
}
TEST(TickingThreadTest, test_broadcast)
{
- TestComponentRegister testReg(
- ComponentRegisterImpl::UP(new ComponentRegisterImpl));
+ TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>());
BroadcastApp app;
app.start(testReg.getThreadPoolImpl());
app.doTask("foo");
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp
index e4560370a01..e0f441089ff 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp
@@ -159,12 +159,6 @@ ComponentRegisterImpl::registerUpdateHook(vespalib::stringref name,
_hooks.emplace_back(std::move(hookPtr));
}
-metrics::MetricLockGuard
-ComponentRegisterImpl::getMetricManagerLock()
-{
- return _metricManager->getMetricLock();
-}
-
void
ComponentRegisterImpl::registerShutdownListener(ShutdownListener& listener)
{
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h
index 3adef5f4838..c7b74ec631a 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h
@@ -22,6 +22,7 @@
#include <vespa/storageframework/generic/metric/metricregistrator.h>
#include <vespa/storageframework/generic/status/statusreportermap.h>
#include <vespa/metrics/metricset.h>
+#include <mutex>
namespace metrics {
@@ -57,7 +58,7 @@ public:
typedef std::unique_ptr<ComponentRegisterImpl> UP;
ComponentRegisterImpl();
- ~ComponentRegisterImpl();
+ ~ComponentRegisterImpl() override;
bool hasMetricManager() const { return (_metricManager != 0); }
metrics::MetricManager& getMetricManager() { return *_metricManager; }
@@ -75,7 +76,6 @@ public:
void registerMetric(metrics::Metric&) override;
void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) override;
- vespalib::MonitorGuard getMetricManagerLock() override;
void registerShutdownListener(ShutdownListener&);
};
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
index 4b9b2639d3b..ba3d52fb8f5 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp
@@ -12,8 +12,8 @@ namespace storage::framework::defaultimplementation {
ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
Runnable& runnable,
vespalib::stringref id,
- uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
+ vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs,
int ticksBeforeWait)
: Thread(id),
_pool(pool),
@@ -25,7 +25,7 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool,
_joined(false),
_thread(*this)
{
- _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getTimeInMillis().getTime();
+ _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getMonotonicTime();
_thread.start(_pool.getThreadPool());
}
@@ -69,30 +69,29 @@ ThreadImpl::join()
}
void
-ThreadImpl::registerTick(CycleType cycleType, MilliSecTime time)
+ThreadImpl::registerTick(CycleType cycleType, vespalib::steady_time now)
{
- if (!time.isSet()) time = _pool.getClock().getTimeInMillis();
+ if (now.time_since_epoch() == vespalib::duration::zero()) now = _pool.getClock().getMonotonicTime();
ThreadTickData data(getTickData());
- uint64_t previousTickMs = data._lastTickMs;
- uint64_t nowMs = time.getTime();
- data._lastTickMs = nowMs;
+ vespalib::steady_clock::time_point previousTick = data._lastTick;
+ data._lastTick = now;
data._lastTickType = cycleType;
setTickData(data);
- if (data._lastTickMs == 0) { return; }
+ if (data._lastTick.time_since_epoch() == vespalib::duration::zero()) { return; }
- if (previousTickMs > nowMs) {
+ if (previousTick > now) {
LOGBP(warning, "Thread is registering tick at time %" PRIu64 ", but "
"last time it registered a tick, the time was %" PRIu64
". Assuming clock has been adjusted backwards",
- nowMs, previousTickMs);
+ vespalib::count_ms(now.time_since_epoch()), vespalib::count_ms(previousTick.time_since_epoch()));
return;
}
- uint64_t cycleTimeMs = nowMs - previousTickMs;
+ vespalib::duration cycleTimeMs = now - previousTick;
if (cycleType == WAIT_CYCLE) {
- data._maxWaitTimeSeenMs = std::max(data._maxWaitTimeSeenMs, cycleTimeMs);
+ data._maxWaitTimeSeen = std::max(data._maxWaitTimeSeen, cycleTimeMs);
} else {
- data._maxProcessingTimeSeenMs = std::max(data._maxProcessingTimeSeenMs, cycleTimeMs);
+ data._maxProcessingTimeSeen = std::max(data._maxProcessingTimeSeen, cycleTimeMs);
}
}
@@ -111,9 +110,9 @@ ThreadImpl::setTickData(const ThreadTickData& tickData)
}
void
-ThreadImpl::updateParameters(uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
- int ticksBeforeWait) {
+ThreadImpl::updateParameters(vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs,
+ int ticksBeforeWait) {
_properties.setWaitTime(waitTimeMs);
_properties.setMaxProcessTime(maxProcessTimeMs);
_properties.setTicksBeforeWait(ticksBeforeWait);
@@ -125,9 +124,9 @@ ThreadImpl::AtomicThreadTickData::loadRelaxed() const noexcept
ThreadTickData result;
constexpr auto relaxed = std::memory_order_relaxed;
result._lastTickType = _lastTickType.load(relaxed);
- result._lastTickMs = _lastTickMs.load(relaxed);
- result._maxProcessingTimeSeenMs = _maxProcessingTimeSeenMs.load(relaxed);
- result._maxWaitTimeSeenMs = _maxWaitTimeSeenMs.load(relaxed);
+ result._lastTick = _lastTickMs.load(relaxed);
+ result._maxProcessingTimeSeen = _maxProcessingTimeSeenMs.load(relaxed);
+ result._maxWaitTimeSeen = _maxWaitTimeSeenMs.load(relaxed);
return result;
}
@@ -137,9 +136,9 @@ ThreadImpl::AtomicThreadTickData::storeRelaxed(
{
constexpr auto relaxed = std::memory_order_relaxed;
_lastTickType.store(newState._lastTickType, relaxed);
- _lastTickMs.store(newState._lastTickMs, relaxed);
- _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeenMs, relaxed);
- _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeenMs, relaxed);
+ _lastTickMs.store(newState._lastTick, relaxed);
+ _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeen, relaxed);
+ _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeen, relaxed);
}
}
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
index 17967c66bdf..077e508a609 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
@@ -26,10 +26,16 @@ class ThreadImpl : public Thread
* on code using it.
*/
struct AtomicThreadTickData {
+ AtomicThreadTickData() noexcept
+ : _lastTickType(),
+ _lastTickMs(vespalib::steady_time(vespalib::duration::zero())),
+ _maxProcessingTimeSeenMs(),
+ _maxWaitTimeSeenMs()
+ {}
std::atomic<CycleType> _lastTickType;
- std::atomic<uint64_t> _lastTickMs;
- std::atomic<uint64_t> _maxProcessingTimeSeenMs;
- std::atomic<uint64_t> _maxWaitTimeSeenMs;
+ std::atomic<vespalib::steady_time> _lastTickMs;
+ std::atomic<vespalib::duration> _maxProcessingTimeSeenMs;
+ std::atomic<vespalib::duration> _maxWaitTimeSeenMs;
// struct stores and loads are both data race free with relaxed
// memory semantics. This means it's possible to observe stale/partial
// state in a case with concurrent readers/writers.
@@ -49,26 +55,23 @@ class ThreadImpl : public Thread
void run();
public:
- ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs, int ticksBeforeWait);
+ ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs, int ticksBeforeWait);
~ThreadImpl();
bool interrupted() const override;
bool joined() const override;
void interrupt() override;
void join() override;
- void registerTick(CycleType, MilliSecTime) override;
- uint64_t getWaitTime() const override {
+ void registerTick(CycleType, vespalib::steady_time) override;
+ vespalib::duration getWaitTime() const override {
return _properties.getWaitTime();
}
int getTicksBeforeWait() const override {
return _properties.getTicksBeforeWait();
}
- uint64_t getMaxProcessTime() const override {
- return _properties.getMaxProcessTime();
- }
- void updateParameters(uint64_t waitTime, uint64_t maxProcessTime, int ticksBeforeWait) override;
+ void updateParameters(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) override;
void setTickData(const ThreadTickData&);
ThreadTickData getTickData() const;
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
index e4a1188030c..e5698f0cbc2 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
@@ -46,17 +46,16 @@ ThreadPoolImpl::~ThreadPoolImpl()
}
Thread::UP
-ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, uint64_t waitTimeMs,
- uint64_t maxProcessTime, int ticksBeforeWait)
+ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTime, int ticksBeforeWait)
{
std::lock_guard lock(_threadVectorLock);
if (_stopping) {
throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC);
}
- ThreadImpl* ti;
- Thread::UP t(ti = new ThreadImpl(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait));
- _threads.push_back(ti);
- return t;
+ auto thread = std::make_unique<ThreadImpl>(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait);
+ _threads.push_back(thread.get());
+ return thread;
}
void
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
index 5f0ee523eff..5a025ac3ad3 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
@@ -21,8 +21,8 @@ public:
ThreadPoolImpl(Clock&);
~ThreadPoolImpl() override;
- Thread::UP startThread(Runnable&, vespalib::stringref id, uint64_t waitTimeMs,
- uint64_t maxProcessTime, int ticksBeforeWait) override;
+ Thread::UP startThread(Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTime, int ticksBeforeWait) override;
void visitThreads(ThreadVisitor&) const override;
void registerThread(ThreadImpl&);
diff --git a/storageframework/src/vespa/storageframework/generic/clock/time.h b/storageframework/src/vespa/storageframework/generic/clock/time.h
index 4c578f9b33d..811c6cfceb0 100644
--- a/storageframework/src/vespa/storageframework/generic/clock/time.h
+++ b/storageframework/src/vespa/storageframework/generic/clock/time.h
@@ -3,7 +3,7 @@
#include <boost/operators.hpp>
#include <vespa/vespalib/stllike/string.h>
-#include <chrono>
+#include <vespa/vespalib/util/time.h>
namespace vespalib {
class asciistream;
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.cpp b/storageframework/src/vespa/storageframework/generic/component/component.cpp
index 3818a2865d9..44468bfead3 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.cpp
+++ b/storageframework/src/vespa/storageframework/generic/component/component.cpp
@@ -23,12 +23,12 @@ Component::close()
Component::Component(ComponentRegister& cr, vespalib::stringref name)
: _componentRegister(&cr),
_name(name),
- _status(0),
- _metric(0),
- _threadPool(0),
- _metricReg(0),
- _clock(0),
- _listener(0)
+ _status(nullptr),
+ _metric(nullptr),
+ _threadPool(nullptr),
+ _metricReg(nullptr),
+ _clock(nullptr),
+ _listener(nullptr)
{
cr.registerComponent(*this);
}
@@ -38,23 +38,23 @@ Component::~Component() = default;
void
Component::registerComponentStateListener(ComponentStateListener& l)
{
- assert(_listener == 0);
+ assert(_listener == nullptr);
_listener = &l;
}
void
Component::registerStatusPage(const StatusReporter& sr)
{
- assert(_status == 0);
+ assert(_status == nullptr);
_status = &sr;
}
void
Component::registerMetric(metrics::Metric& m)
{
- assert(_metric == 0);
+ assert(_metric == nullptr);
_metric = &m;
- if (_metricReg != 0) {
+ if (_metricReg != nullptr) {
_metricReg->registerMetric(m);
}
}
@@ -64,28 +64,18 @@ Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period)
{
assert(_metricUpdateHook.first == 0);
_metricUpdateHook = std::make_pair(&hook, period);
- if (_metricReg != 0) {
+ if (_metricReg != nullptr) {
_metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
}
-vespalib::MonitorGuard
-Component::getMetricManagerLock()
-{
- if (_metricReg != 0) {
- return _metricReg->getMetricManagerLock();
- } else {
- return vespalib::MonitorGuard();
- }
-}
-
void
Component::setMetricRegistrator(MetricRegistrator& mr) {
_metricReg = &mr;
if (_metricUpdateHook.first != 0) {
_metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
- if (_metric != 0) {
+ if (_metric != nullptr) {
_metricReg->registerMetric(*_metric);
}
}
@@ -93,16 +83,16 @@ Component::setMetricRegistrator(MetricRegistrator& mr) {
ThreadPool&
Component::getThreadPool() const
{
- assert(_threadPool != 0);
+ assert(_threadPool != nullptr);
return *_threadPool;
}
// Helper functions for components wanting to start a single thread.
Thread::UP
-Component::startThread(Runnable& runnable, MilliSecTime waitTime, MilliSecTime maxProcessTime, int ticksBeforeWait)
+Component::startThread(Runnable& runnable, vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait)
{
- return getThreadPool().startThread(runnable, getName(), waitTime.getTime(),
- maxProcessTime.getTime(), ticksBeforeWait);
+ return getThreadPool().startThread(runnable, getName(), waitTime,
+ maxProcessTime, ticksBeforeWait);
}
void
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.h b/storageframework/src/vespa/storageframework/generic/component/component.h
index c91d7feb532..bfc80f524b5 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.h
+++ b/storageframework/src/vespa/storageframework/generic/component/component.h
@@ -71,7 +71,6 @@
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageframework/generic/thread/thread.h>
#include <vespa/storageframework/generic/clock/clock.h>
-#include <vespa/vespalib/util/sync.h>
#include <atomic>
namespace storage::framework {
@@ -153,12 +152,6 @@ public:
*/
void registerMetricUpdateHook(MetricUpdateHook&, SecondTime period);
- /**
- * If you need to modify the metric sets that have been registered, you need
- * to hold the metric manager lock while you do it.
- */
- vespalib::MonitorGuard getMetricManagerLock();
-
/** Get the name of the component. Must be a unique name. */
const vespalib::string& getName() const override { return _name; }
@@ -185,8 +178,8 @@ public:
* in this thread. (Thus one is not required to call registerTick())
*/
Thread::UP startThread(Runnable&,
- MilliSecTime maxProcessTime = MilliSecTime(0),
- MilliSecTime waitTime = MilliSecTime(0),
+ vespalib::duration maxProcessTime = vespalib::duration::zero(),
+ vespalib::duration waitTime = vespalib::duration::zero(),
int ticksBeforeWait = 1);
// Check upgrade flag settings. Note that this flag may change at any time.
diff --git a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h
index 6cdbf58d555..53988651cbc 100644
--- a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h
+++ b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h
@@ -11,7 +11,6 @@
#pragma once
#include <vespa/storageframework/generic/clock/time.h>
-#include <vespa/vespalib/util/sync.h>
namespace metrics {
class Metric;
@@ -26,7 +25,6 @@ struct MetricRegistrator {
virtual void registerMetric(metrics::Metric&) = 0;
virtual void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) = 0;
- virtual vespalib::MonitorGuard getMetricManagerLock() = 0;
};
}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/runnable.h b/storageframework/src/vespa/storageframework/generic/thread/runnable.h
index 245522a7421..a2615694e7c 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/runnable.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/runnable.h
@@ -43,11 +43,9 @@ struct ThreadHandle {
* clock fetches if client already knows current time)
*/
virtual void registerTick(CycleType = UNKNOWN_CYCLE,
- MilliSecTime currentTime = MilliSecTime(0)) = 0;
+ vespalib::steady_time = vespalib::steady_time()) = 0;
- virtual uint64_t getWaitTime() const = 0;
-
- virtual uint64_t getMaxProcessTime() const = 0;
+ virtual vespalib::duration getWaitTime() const = 0;
/**
* The number of ticks done before wait is called when no more work is
diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h
index 814686eb4fa..e37bba4c723 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/thread.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h
@@ -50,8 +50,8 @@ public:
*/
virtual void join() = 0;
- virtual void updateParameters(uint64_t waitTime,
- uint64_t maxProcessTime,
+ virtual void updateParameters(vespalib::duration waitTime,
+ vespalib::duration maxProcessTime,
int ticksBeforeWait) = 0;
/**
diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp
index d72b0c54544..3c2ebba42f7 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp
+++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp
@@ -2,23 +2,22 @@
#include "threadpool.h"
-namespace storage {
-namespace framework {
+namespace storage::framework {
-ThreadProperties::ThreadProperties(uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
+ThreadProperties::ThreadProperties(vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTimeMs,
int ticksBeforeWait)
{
- _waitTimeMs.store(waitTimeMs);
- _maxProcessTimeMs.store(maxProcessTimeMs);
- _ticksBeforeWait.store(ticksBeforeWait);
+ setWaitTime(waitTimeMs);
+ setMaxProcessTime(maxProcessTimeMs);
+ setTicksBeforeWait(ticksBeforeWait);
}
-uint64_t ThreadProperties::getMaxProcessTime() const {
+ vespalib::duration ThreadProperties::getMaxProcessTime() const {
return _maxProcessTimeMs.load(std::memory_order_relaxed);
}
-uint64_t ThreadProperties::getWaitTime() const {
+vespalib::duration ThreadProperties::getWaitTime() const {
return _waitTimeMs.load(std::memory_order_relaxed);
}
@@ -26,11 +25,11 @@ int ThreadProperties::getTicksBeforeWait() const {
return _ticksBeforeWait.load(std::memory_order_relaxed);
}
-void ThreadProperties::setMaxProcessTime(uint64_t maxProcessingTimeMs) {
+void ThreadProperties::setMaxProcessTime(vespalib::duration maxProcessingTimeMs) {
_maxProcessTimeMs.store(maxProcessingTimeMs);
}
-void ThreadProperties::setWaitTime(uint64_t waitTimeMs) {
+void ThreadProperties::setWaitTime(vespalib::duration waitTimeMs) {
_waitTimeMs.store(waitTimeMs);
}
@@ -38,5 +37,4 @@ void ThreadProperties::setTicksBeforeWait(int ticksBeforeWait) {
_ticksBeforeWait.store(ticksBeforeWait);
}
-} // framework
-} // storage
+}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h
index 8ebe382bddf..0ee800c43ff 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h
@@ -27,12 +27,12 @@ namespace storage::framework {
* thread.
*/
class ThreadProperties {
- private:
+private:
/**
* Time this thread should maximum use to process before a tick is
* registered. (Including wait time if wait time is not set)
*/
- std::atomic_uint_least64_t _maxProcessTimeMs;
+ std::atomic<vespalib::duration> _maxProcessTimeMs;
/**
* Time this thread will wait in a non-interrupted wait cycle.
* Used in cases where a wait cycle is registered. As long as no other
@@ -40,26 +40,26 @@ class ThreadProperties {
* wait time here. The deadlock detector should add a configurable
* global time period before flagging deadlock anyways.
*/
- std::atomic_uint_least64_t _waitTimeMs;
+ std::atomic<vespalib::duration> _waitTimeMs;
/**
* Number of ticks to be done before a wait.
*/
std::atomic_uint _ticksBeforeWait;
public:
- ThreadProperties(uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
+ ThreadProperties(vespalib::duration waitTime,
+ vespalib::duration maxProcessTimeMs,
int ticksBeforeWait);
- void setMaxProcessTime(uint64_t);
- void setWaitTime(uint64_t);
+ void setMaxProcessTime(vespalib::duration);
+ void setWaitTime(vespalib::duration);
void setTicksBeforeWait(int);
- uint64_t getMaxProcessTime() const;
- uint64_t getWaitTime() const;
+ vespalib::duration getMaxProcessTime() const;
+ vespalib::duration getWaitTime() const;
int getTicksBeforeWait() const;
- uint64_t getMaxCycleTime() const {
+ vespalib::duration getMaxCycleTime() const {
return std::max(_maxProcessTimeMs.load(std::memory_order_relaxed),
_waitTimeMs.load(std::memory_order_relaxed));
}
@@ -68,26 +68,26 @@ class ThreadProperties {
/** Data kept on each thread due to the registerTick functinality. */
struct ThreadTickData {
CycleType _lastTickType;
- uint64_t _lastTickMs;
- uint64_t _maxProcessingTimeSeenMs;
- uint64_t _maxWaitTimeSeenMs;
+ vespalib::steady_time _lastTick;
+ vespalib::duration _maxProcessingTimeSeen;
+ vespalib::duration _maxWaitTimeSeen;
};
/** Interface used to access data for the existing threads. */
struct ThreadVisitor {
- virtual ~ThreadVisitor() {}
+ virtual ~ThreadVisitor() = default;
virtual void visitThread(const vespalib::string& id,
const ThreadProperties&,
const ThreadTickData&) = 0;
};
struct ThreadPool {
- virtual ~ThreadPool() {}
+ virtual ~ThreadPool() = default;
virtual Thread::UP startThread(Runnable&,
vespalib::stringref id,
- uint64_t waitTimeMs,
- uint64_t maxProcessTime,
+ vespalib::duration waitTimeMs,
+ vespalib::duration maxProcessTime,
int ticksBeforeWait) = 0;
virtual void visitThreads(ThreadVisitor&) const = 0;
diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
index 3178220654e..7913cdcfe84 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
+++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp
@@ -22,20 +22,22 @@ ThreadWaitInfo::merge(const ThreadWaitInfo& other) {
* global synchronization point where no thread is currently running.
*/
class TickingThreadRunner final : public Runnable {
- vespalib::Monitor& _monitor;
- TickingThread& _tickingThread;
- uint32_t _threadIndex;
- bool _wantToFreeze;
- bool _frozen;
- char _state;
+ std::mutex & _monitor;
+ std::condition_variable & _cond;
+ TickingThread & _tickingThread;
+ uint32_t _threadIndex;
+ bool _wantToFreeze;
+ bool _frozen;
+ char _state;
public:
typedef std::shared_ptr<TickingThreadRunner> SP;
- TickingThreadRunner(vespalib::Monitor& m,
+ TickingThreadRunner(std::mutex& m,
+ std::condition_variable & cond,
TickingThread& ticker,
uint32_t threadIndex) noexcept
- : _monitor(m), _tickingThread(ticker),
+ : _monitor(m), _cond(cond), _tickingThread(ticker),
_threadIndex(threadIndex), _wantToFreeze(false), _frozen(false) {}
/**
@@ -43,10 +45,10 @@ public:
* tick and has frozen.
*/
void freeze() {
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_monitor);
_wantToFreeze = true;
while (!_frozen) {
- guard.wait();
+ _cond.wait(guard);
}
}
@@ -54,9 +56,11 @@ public:
* Call to thaw up a frozen thread so it can continue.
*/
void thaw() {
- vespalib::MonitorGuard guard(_monitor);
- _wantToFreeze = false;
- guard.broadcast();
+ {
+ std::lock_guard guard(_monitor);
+ _wantToFreeze = false;
+ }
+ _cond.notify_all();
}
char getState() const { return _state; }
@@ -68,18 +72,18 @@ private:
int ticksExecutedAfterWait = 0;
while (!handle.interrupted()) {
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_monitor);
if (info.waitWanted()) {
_state = 'w';
cycle = WAIT_CYCLE;
if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) {
- guard.wait(handle.getWaitTime());
+ _cond.wait_for(guard, handle.getWaitTime());
ticksExecutedAfterWait = 0;
}
}
if (_wantToFreeze) {
_state = 'f';
- doFreeze(guard);
+ doFreeze(guard, _cond);
ticksExecutedAfterWait = 0;
}
_state = 'c';
@@ -93,22 +97,23 @@ private:
}
_state = 's';
}
- void doFreeze(vespalib::MonitorGuard& guard) {
+ void doFreeze(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) {
_frozen = true;
- guard.broadcast();
+ cond.notify_all();
while (_wantToFreeze) {
- guard.wait();
+ _cond.wait(guard);
}
_frozen = false;
}
};
class TickingThreadPoolImpl final : public TickingThreadPool {
- vespalib::string _name;
- vespalib::Monitor _monitor;
- std::atomic_uint_least64_t _waitTime;
- std::atomic_uint _ticksBeforeWait;
- std::atomic_uint_least64_t _maxProcessTime;
+ vespalib::string _name;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ std::atomic<vespalib::duration> _waitTime;
+ std::atomic_uint _ticksBeforeWait;
+ std::atomic<vespalib::duration> _maxProcessTime;
std::vector<TickingThreadRunner::SP> _tickers;
std::vector<std::shared_ptr<Thread>> _threads;
@@ -120,40 +125,40 @@ class TickingThreadPoolImpl final : public TickingThreadPool {
void broadcast() override {}
};
struct CriticalGuard final : public TickingLockGuard::Impl {
- vespalib::MonitorGuard _guard;
+ std::condition_variable &_cond;
- explicit CriticalGuard(vespalib::Monitor& m) : _guard(m) {}
+ explicit CriticalGuard(std::condition_variable & cond) : _cond(cond) {}
- void broadcast() override { _guard.broadcast(); }
+ void broadcast() override { _cond.notify_all(); }
};
public:
- TickingThreadPoolImpl(vespalib::stringref name, MilliSecTime waitTime,
- int ticksBeforeWait, MilliSecTime maxProcessTime)
+ TickingThreadPoolImpl(vespalib::stringref name, vespalib::duration waitTime,
+ int ticksBeforeWait, vespalib::duration maxProcessTime)
: _name(name),
- _waitTime(waitTime.getTime()),
+ _waitTime(waitTime),
_ticksBeforeWait(ticksBeforeWait),
- _maxProcessTime(maxProcessTime.getTime()) {}
+ _maxProcessTime(maxProcessTime) {}
~TickingThreadPoolImpl() override {
stop();
}
- void updateParametersAllThreads(MilliSecTime waitTime, MilliSecTime maxProcessTime,
+ void updateParametersAllThreads(vespalib::duration waitTime, vespalib::duration maxProcessTime,
int ticksBeforeWait) override {
- _waitTime.store(waitTime.getTime());
- _maxProcessTime.store(maxProcessTime.getTime());
+ _waitTime.store(waitTime);
+ _maxProcessTime.store(maxProcessTime);
_ticksBeforeWait.store(ticksBeforeWait);
// TODO: Add locking so threads not deleted while updating
for (uint32_t i=0; i<_threads.size(); ++i) {
- _threads[i]->updateParameters(waitTime.getTime(), maxProcessTime.getTime(), ticksBeforeWait);
+ _threads[i]->updateParameters(waitTime, maxProcessTime, ticksBeforeWait);
}
}
void addThread(TickingThread& ticker) override {
ThreadIndex index = _tickers.size();
ticker.newThreadCreated(index);
- _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_monitor, ticker, index));
+ _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_lock, _cond, ticker, index));
}
void start(ThreadPool& pool) override {
@@ -175,7 +180,7 @@ public:
}
TickingLockGuard freezeCriticalTicks() override {
- return TickingLockGuard(std::make_unique<CriticalGuard>(_monitor));
+ return TickingLockGuard(std::make_unique<CriticalGuard>(_cond));
}
void stop() override {
@@ -183,8 +188,7 @@ public:
t->interrupt();
}
{
- vespalib::MonitorGuard guard(_monitor);
- guard.broadcast();
+ _cond.notify_all();
}
for (auto& t : _threads) {
t->join();
@@ -216,9 +220,9 @@ private:
TickingThreadPool::UP
TickingThreadPool::createDefault(
vespalib::stringref name,
- MilliSecTime waitTime,
+ vespalib::duration waitTime,
int ticksBeforeWait,
- MilliSecTime maxProcessTime)
+ vespalib::duration maxProcessTime)
{
return std::make_unique<TickingThreadPoolImpl>(name, waitTime, ticksBeforeWait, maxProcessTime);
}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
index 1ed02d0f6fd..0649d914c75 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h
@@ -80,13 +80,13 @@ struct TickingThreadPool : public ThreadLock {
static TickingThreadPool::UP createDefault(
vespalib::stringref name,
- MilliSecTime waitTime = MilliSecTime(5),
+ vespalib::duration waitTime = 5ms,
int ticksBeforeWait = 1,
- MilliSecTime maxProcessTime = SecondTime(5).getMillis());
+ vespalib::duration maxProcessTime = 5s);
virtual void updateParametersAllThreads(
- MilliSecTime waitTime,
- MilliSecTime maxProcessTime,
+ vespalib::duration waitTime,
+ vespalib::duration maxProcessTime,
int ticksBeforeWait) = 0;
~TickingThreadPool() override = default;