diff options
35 files changed, 256 insertions, 335 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 454db5d2404..8f308ec7984 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -396,17 +396,12 @@ TEST_F(DistributorTest, tick_processes_status_requests) { StatusRequestThread thread(distributor_status_delegate()); FakeClock clock; ThreadPoolImpl pool(clock); - - uint64_t tickWaitMs = 5; - uint64_t tickMaxProcessTime = 5000; int ticksBeforeWait = 1; - framework::Thread::UP tp(pool.startThread( - thread, "statustest", tickWaitMs, tickMaxProcessTime, ticksBeforeWait)); + framework::Thread::UP tp(pool.startThread(thread, "statustest", 5ms, 5s, ticksBeforeWait)); while (true) { std::this_thread::sleep_for(1ms); - framework::TickingLockGuard guard( - distributor_thread_pool().freezeCriticalTicks()); + framework::TickingLockGuard guard(distributor_thread_pool().freezeCriticalTicks()); if (!distributor_status_todos().empty()) { break; } diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 39dae51721a..7b9d5b77c98 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -414,9 +414,7 @@ void BucketManager::onOpen() void BucketManager::startWorkerThread() { - framework::MilliSecTime maxProcessingTime(30 * 1000); - framework::MilliSecTime waitTime(1000); - _thread = _component.startThread(*this, maxProcessingTime, waitTime); + _thread = _component.startThread(*this, 30s, 1s); } // --------- Commands --------- // diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp index c5d70fda1ad..2c1a149ab4b 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp @@ -178,10 +178,7 @@ StorageBucketDBInitializer::onOpen() sendDown(msg); } } - framework::MilliSecTime maxProcessingTime(10); - framework::MilliSecTime sleepTime(1000); - _system._thread = _system._component.startThread( - *this, maxProcessingTime, sleepTime); + _system._thread = _system._component.startThread(*this, 10ms, 1s); } void @@ -203,19 +200,14 @@ StorageBucketDBInitializer::run(framework::ThreadHandle& thread) std::lock_guard<std::mutex> replyGuard(_state._replyLock); _state._replies.swap(replies); } - for (std::list<api::StorageMessage::SP>::iterator it = replies.begin(); - it != replies.end(); ++it) - { - api::InternalReply& reply(static_cast<api::InternalReply&>(**it)); + for (api::StorageMessage::SP & msg : replies) { + api::InternalReply& reply(static_cast<api::InternalReply&>(*msg)); if (reply.getType() == ReadBucketListReply::ID) { - handleReadBucketListReply( - static_cast<ReadBucketListReply&>(reply)); + handleReadBucketListReply(static_cast<ReadBucketListReply&>(reply)); } else if (reply.getType() == ReadBucketInfoReply::ID) { - handleReadBucketInfoReply( - static_cast<ReadBucketInfoReply&>(reply)); + handleReadBucketInfoReply(static_cast<ReadBucketInfoReply&>(reply)); } else if (reply.getType() == InternalBucketJoinReply::ID) { - handleInternalBucketJoinReply( - static_cast<InternalBucketJoinReply&>(reply)); + handleInternalBucketJoinReply(static_cast<InternalBucketJoinReply&>(reply)); } } if (_state._gottenInitProgress) { diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp index ea6430f9e5c..d194adabb92 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.hpp +++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp @@ -15,14 +15,14 @@ namespace storage { template<typename Message> void StorageLinkQueued::Dispatcher<Message>::terminate() { - if (_thread.get()) { + if (_thread) { _thread->interrupt(); { std::lock_guard<std::mutex> guard(_sync); _syncCond.notify_one(); } _thread->join(); - _thread.reset(0); + _thread.reset(); } } @@ -38,9 +38,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un std::ostringstream name; name << "Queued storage " << (_replyDispatcher ? "up" : "down") << "link - " << _parent.getName(); - _component.reset(new framework::Component( - parent.getComponentRegister(), - name.str())); + _component = std::make_unique<framework::Component>(parent.getComponentRegister(), name.str()); } template<typename Message> @@ -51,20 +49,16 @@ StorageLinkQueued::Dispatcher<Message>::~Dispatcher() { template<typename Message> void StorageLinkQueued::Dispatcher<Message>::start() { - assert(_thread.get() == 0); - framework::MilliSecTime maxProcessTime(5 * 1000); - framework::MilliSecTime waitTime(100); - _thread = _component->startThread(*this, maxProcessTime, waitTime); + assert( ! _thread); + _thread = _component->startThread(*this, 5s, 100ms); } template<typename Message> -void StorageLinkQueued::Dispatcher<Message>::add( - const std::shared_ptr<Message>& m) +void StorageLinkQueued::Dispatcher<Message>::add(const std::shared_ptr<Message>& m) { - using namespace std::chrono_literals; std::unique_lock<std::mutex> guard(_sync); - if (_thread.get() == 0) start(); + if ( ! _thread) start(); while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) { _syncCond.wait_for(guard, 100ms); } @@ -75,7 +69,6 @@ void StorageLinkQueued::Dispatcher<Message>::add( template<typename Message> void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) { - using namespace std::chrono_literals; while (!h.interrupted()) { h.registerTick(framework::PROCESS_CYCLE); std::shared_ptr<Message> message; diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 352da70d7e4..cb624fb6a7f 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -14,6 +14,7 @@ #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/util/memoryusage.h> +#include <vespa/vespalib/util/sync.h> #include <vespa/log/log.h> LOG_SETUP(".distributor-main"); diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp index 505828b853e..efd72227db1 100644 --- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp +++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp @@ -21,8 +21,8 @@ DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg, AppKiller: _cond(), _enableWarning(true), _enableShutdown(false), - _processSlackMs(30 * 1000), - _waitSlackMs(5 * 1000), + _processSlackMs(30s), + _waitSlackMs(5s), _reportedBucketDBLocksAtState(OK) { auto* dComp(dynamic_cast<DistributorComponentRegister*>(&compReg)); @@ -31,7 +31,7 @@ DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg, AppKiller: _component = _dComponent.get(); } else { auto* slComp(dynamic_cast<ServiceLayerComponentRegister*>(&compReg)); - assert(slComp != 0); + assert(slComp != nullptr); _slComponent = std::make_unique<ServiceLayerComponent>(*slComp, "deadlockdetector"); _component = _slComponent.get(); } @@ -96,29 +96,27 @@ DeadLockDetector::visitThreads(ThreadVisitor& visitor) const } bool -DeadLockDetector::isAboveFailThreshold( - const framework::MilliSecTime& time, - const framework::ThreadProperties& tp, - const framework::ThreadTickData& tick) const +DeadLockDetector::isAboveFailThreshold(vespalib::steady_time time, + const framework::ThreadProperties& tp, + const framework::ThreadTickData& tick) const { - if (tp.getMaxCycleTime() == 0) { + if (tp.getMaxCycleTime() == vespalib::duration::zero()) { return false; } - uint64_t slack(tick._lastTickType == framework::WAIT_CYCLE - ? getWaitSlack().getTime() : getProcessSlack().getTime()); - return (tick._lastTickMs + tp.getMaxCycleTime() + slack < time.getTime()); + vespalib::duration slack(tick._lastTickType == framework::WAIT_CYCLE + ? getWaitSlack() : getProcessSlack()); + return (tick._lastTick + tp.getMaxCycleTime() + slack < time); } bool -DeadLockDetector::isAboveWarnThreshold( - const framework::MilliSecTime& time, - const framework::ThreadProperties& tp, - const framework::ThreadTickData& tick) const +DeadLockDetector::isAboveWarnThreshold(vespalib::steady_time time, + const framework::ThreadProperties& tp, + const framework::ThreadTickData& tick) const { - if (tp.getMaxCycleTime() == 0) return false; - uint64_t slack(tick._lastTickType == framework::WAIT_CYCLE - ? getWaitSlack().getTime() : getProcessSlack().getTime()); - return (tick._lastTickMs + tp.getMaxCycleTime() + slack / 4 < time.getTime()); + if (tp.getMaxCycleTime() == vespalib::duration::zero()) return false; + vespalib::duration slack = tick._lastTickType == framework::WAIT_CYCLE + ? getWaitSlack() : getProcessSlack(); + return (tick._lastTick + tp.getMaxCycleTime() + slack / 4 < time); } vespalib::string @@ -142,9 +140,9 @@ namespace { struct ThreadChecker : public DeadLockDetector::ThreadVisitor { DeadLockDetector& _detector; - framework::MilliSecTime _currentTime; + vespalib::steady_time _currentTime; - ThreadChecker(DeadLockDetector& d, const framework::MilliSecTime& time) + ThreadChecker(DeadLockDetector& d, vespalib::steady_time time) : _detector(d), _currentTime(time) {} void visitThread(const vespalib::string& id, @@ -153,7 +151,7 @@ namespace { DeadLockDetector::State& state) override { // In case we just got a new tick, ignore the thread - if (tick._lastTickMs > _currentTime.getTime()) return; + if (tick._lastTick > _currentTime) return; // If thread is already in halted state, ignore it. if (state == DeadLockDetector::HALTED) return; @@ -174,7 +172,7 @@ namespace { } void -DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime, +DeadLockDetector::handleDeadlock(vespalib::steady_time currentTime, const vespalib::string& id, const framework::ThreadProperties&, const framework::ThreadTickData& tick, @@ -182,7 +180,7 @@ DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime, { vespalib::asciistream error; error << "Thread " << id << " has gone " - << (currentTime.getTime() - tick._lastTickMs) + << vespalib::count_ms(currentTime - tick._lastTick) << " milliseconds without registering a tick."; if (!warnOnly) { if (_enableShutdown && !warnOnly) { @@ -200,8 +198,7 @@ DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime, error.str().data()); if (_reportedBucketDBLocksAtState != WARNED) { _reportedBucketDBLocksAtState = WARNED; - LOG(info, "Locks in bucket database at deadlock time:" - "\n%s", + LOG(info, "Locks in bucket database at deadlock time:\n%s", getBucketLockInfo().c_str()); } } @@ -228,8 +225,7 @@ DeadLockDetector::run(framework::ThreadHandle& thread) { std::unique_lock sync(_lock); while (!thread.interrupted()) { - framework::MilliSecTime time(_component->getClock().getTimeInMillis()); - ThreadChecker checker(*this, time); + ThreadChecker checker(*this, _component->getClock().getMonotonicTime()); visitThreads(checker); _cond.wait_for(sync, 1s); thread.registerTick(framework::WAIT_CYCLE); @@ -264,14 +260,14 @@ namespace { struct ThreadStatusWriter : public DeadLockDetector::ThreadVisitor { ThreadTable& _table; - framework::MilliSecTime _time; - framework::MilliSecTime _processSlack; - framework::MilliSecTime _waitSlack; + vespalib::steady_time _time; + vespalib::duration _processSlack; + vespalib::duration _waitSlack; ThreadStatusWriter(ThreadTable& table, - const framework::MilliSecTime& time, - framework::MilliSecTime processSlack, - framework::MilliSecTime waitSlack) + vespalib::steady_time time, + vespalib::duration processSlack, + vespalib::duration waitSlack) : _table(table), _time(time), _processSlack(processSlack), _waitSlack(waitSlack) {} @@ -289,11 +285,11 @@ namespace { { _table._table.addRow(id); uint32_t i = _table._table.getRowCount() - 1; - _table._msSinceLastTick[i] = _time.getTime() - tick._lastTickMs; - _table._maxProcTickTime[i] = tp.getMaxProcessTime(); - _table._maxWaitTickTime[i] = tp.getWaitTime(); - _table._maxProcTickTimeSeen[i] = tick._maxProcessingTimeSeenMs; - _table._maxWaitTickTimeSeen[i] = tick._maxWaitTimeSeenMs; + _table._msSinceLastTick[i] = vespalib::count_ms(_time - tick._lastTick); + _table._maxProcTickTime[i] = vespalib::count_ms(tp.getMaxProcessTime()); + _table._maxWaitTickTime[i] = vespalib::count_ms(tp.getWaitTime()); + _table._maxProcTickTimeSeen[i] = vespalib::count_ms(tick._maxProcessingTimeSeen); + _table._maxWaitTickTimeSeen[i] = vespalib::count_ms(tick._maxWaitTimeSeen); } }; } @@ -307,14 +303,15 @@ DeadLockDetector::reportHtmlStatus(std::ostream& os, ThreadTable threads; std::lock_guard guard(_lock); framework::MilliSecTime time(_component->getClock().getTimeInMillis()); - ThreadStatusWriter writer(threads, time, getProcessSlack(), getWaitSlack()); + ThreadStatusWriter writer(threads, _component->getClock().getMonotonicTime(), + getProcessSlack(), getWaitSlack()); visitThreads(writer); std::ostringstream ost; threads._table.print(ost); out << ost.str(); out << "<p>\n" - << "Note that there is a global slack period of " << getProcessSlack() - << " ms for processing ticks and " << getWaitSlack() + << "Note that there is a global slack period of " << vespalib::count_ms(getProcessSlack()) + << " ms for processing ticks and " << vespalib::count_ms(getWaitSlack()) << " ms for wait ticks. Actual shutdown or warning logs will not" << " appear before this slack time is expendede on top of the per" << " thread value.\n" diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h index d438b4ab476..b0bd129f5f3 100644 --- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h +++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h @@ -29,32 +29,30 @@ struct DeadLockDetector : private framework::Runnable, enum State { OK, WARNED, HALTED }; DeadLockDetector(StorageComponentRegister&, - AppKiller::UP killer = AppKiller::UP(new RealAppKiller)); - ~DeadLockDetector(); + AppKiller::UP killer = std::make_unique<RealAppKiller>()); + ~DeadLockDetector() override; void enableWarning(bool enable); void enableShutdown(bool enable); // There are no data read/write dependencies on neither _processSlackMs // nor _waitSlackMs so relaxed ops suffice. - void setProcessSlack(framework::MilliSecTime slack) { - _processSlackMs.store(slack.getTime(), std::memory_order_relaxed); + void setProcessSlack(vespalib::duration slack) { + _processSlackMs.store(slack, std::memory_order_relaxed); } - framework::MilliSecTime getProcessSlack() const { - return framework::MilliSecTime( - _processSlackMs.load(std::memory_order_relaxed)); + vespalib::duration getProcessSlack() const { + return _processSlackMs.load(std::memory_order_relaxed); } - void setWaitSlack(framework::MilliSecTime slack) { - _waitSlackMs.store(slack.getTime(), std::memory_order_relaxed); + void setWaitSlack(vespalib::duration slack) { + _waitSlackMs.store(slack, std::memory_order_relaxed); } - framework::MilliSecTime getWaitSlack() const { - return framework::MilliSecTime( - _waitSlackMs.load(std::memory_order_relaxed)); + vespalib::duration getWaitSlack() const { + return _waitSlackMs.load(std::memory_order_relaxed); } // These utility functions are public as internal anonymous classes are // using them. Can also be useful for whitebox testing. struct ThreadVisitor { - virtual ~ThreadVisitor() {} + virtual ~ThreadVisitor() = default; virtual void visitThread(const vespalib::string& id, const framework::ThreadProperties&, const framework::ThreadTickData&, @@ -62,13 +60,13 @@ struct DeadLockDetector : private framework::Runnable, }; void visitThreads(ThreadVisitor&) const; - bool isAboveFailThreshold(const framework::MilliSecTime& time, + bool isAboveFailThreshold(vespalib::steady_time time, const framework::ThreadProperties& tp, const framework::ThreadTickData& tick) const; - bool isAboveWarnThreshold(const framework::MilliSecTime& time, + bool isAboveWarnThreshold(vespalib::steady_time , const framework::ThreadProperties& tp, const framework::ThreadTickData& tick) const; - void handleDeadlock(const framework::MilliSecTime& currentTime, + void handleDeadlock(vespalib::steady_time currentTime, const vespalib::string& id, const framework::ThreadProperties& tp, const framework::ThreadTickData& tick, @@ -81,8 +79,8 @@ private: std::condition_variable _cond; bool _enableWarning; bool _enableShutdown; - std::atomic<uint64_t> _processSlackMs; - std::atomic<uint64_t> _waitSlackMs; + std::atomic<vespalib::duration> _processSlackMs; + std::atomic<vespalib::duration> _waitSlackMs; State _reportedBucketDBLocksAtState; DistributorComponent::UP _dComponent; ServiceLayerComponent::UP _slComponent; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index e687a80e08f..09d142d42c0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -326,8 +326,8 @@ FileStorHandlerImpl::tryHandlePause(uint16_t disk) const if (isPaused()) { // Wait a single time to see if filestor gets unpaused. if (!_diskInfo[disk].isClosed()) { - vespalib::MonitorGuard g(_pauseMonitor); - g.wait(100); + std::unique_lock g(_pauseMonitor); + _pauseCond.wait_for(g, 100ms); } return !isPaused(); } @@ -1347,9 +1347,8 @@ FileStorHandlerImpl::pause() void FileStorHandlerImpl::resume() { - vespalib::MonitorGuard g(_pauseMonitor); _paused.store(false, std::memory_order_relaxed); - g.broadcast(); + _pauseCond.notify_all(); } } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 8e4e105b4dc..1804a47f033 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -293,8 +293,9 @@ private: std::map<document::Bucket, MergeStatus::SP> _mergeStates; vespalib::duration _getNextMessageTimeout; const uint32_t _max_active_merges_per_stripe; // Read concurrently by stripes. - vespalib::Monitor _pauseMonitor; - std::atomic<bool> _paused; + mutable std::mutex _pauseMonitor; + mutable std::condition_variable _pauseCond; + std::atomic<bool> _paused; void reply(api::StorageMessage&, DiskState state) const; diff --git a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp index 60d61a39f44..66a122ec666 100644 --- a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp @@ -85,10 +85,8 @@ ModifiedBucketChecker::configure( void ModifiedBucketChecker::onOpen() { - framework::MilliSecTime maxProcessingTime(60 * 1000); - framework::MilliSecTime waitTime(1000); if (!_singleThreadMode) { - _thread = _component->startThread(*this, maxProcessingTime, waitTime); + _thread = _component->startThread(*this, 60s, 1s); } } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 6cf0ec194e1..0050ec0012d 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -114,9 +114,7 @@ PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequence threadName << "Disk " << _env._partition << " thread " << _stripeId; _component = std::make_unique<ServiceLayerComponent>(compReg, threadName.str()); _bucketOwnershipNotifier = std::make_unique<BucketOwnershipNotifier>(*_component, filestorHandler); - framework::MilliSecTime maxProcessingTime(60 * 1000); - framework::MilliSecTime waitTime(1000); - _thread = _component->startThread(*this, maxProcessingTime, waitTime); + _thread = _component->startThread(*this, 60s, 1s); } PersistenceThread::~PersistenceThread() diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index ad43fddfb2d..6ad2c960896 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -283,8 +283,7 @@ CommunicationManager::onOpen() _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext()); _configFetcher->subscribe<vespa::config::content::core::StorCommunicationmanagerConfig>(_configUri.getConfigId(), this); _configFetcher->start(); - framework::MilliSecTime maxProcessingTime(60 * 1000); - _thread = _component.startThread(*this, maxProcessingTime); + _thread = _component.startThread(*this, 60s); if (_shared_rpc_resources) { _shared_rpc_resources->start_server_and_register_slobrok(_component.getIdentity()); diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index 8c2441b5eed..e6bee3248d4 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -74,12 +74,9 @@ DistributorNode::handleConfigChange(vespa::config::content::core::StorDistributo { framework::TickingLockGuard guard(_threadPool->freezeAllTicks()); _context.getComponentRegister().setDistributorConfig(c); - framework::MilliSecTime ticksWaitTime(c.ticksWaitTimeMs); - framework::MilliSecTime maxProcessTime(c.maxProcessTimeMs); - _threadPool->updateParametersAllThreads( - ticksWaitTime, - maxProcessTime, - c.ticksBeforeWait); + _threadPool->updateParametersAllThreads(std::chrono::milliseconds(c.ticksWaitTimeMs), + std::chrono::milliseconds(c.maxProcessTimeMs), + c.ticksBeforeWait); } void diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 34d16c6c1d3..5e8e6809422 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -260,9 +260,7 @@ MergeThrottler::~MergeThrottler() void MergeThrottler::onOpen() { - framework::MilliSecTime maxProcessingTime(30 * 1000); - framework::MilliSecTime waitTime(1000); - _thread = _component.startThread(*this, maxProcessingTime, waitTime); + _thread = _component.startThread(*this, 30s, 1s); } void diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 7dc6b20a2ef..e04bce902d4 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -71,9 +71,8 @@ StateManager::~StateManager() void StateManager::onOpen() { - framework::MilliSecTime maxProcessingTime(30 * 1000); if (!_noThreadTestMode) { - _thread = _component.startThread(*this, maxProcessingTime); + _thread = _component.startThread(*this, 30s); } } @@ -608,12 +607,10 @@ StateManager::getNodeInfo() const stream << "metrics"; try { metrics::MetricLockGuard lock(_metricManager.getMetricLock()); - std::vector<uint32_t> periods( - _metricManager.getSnapshotPeriods(lock)); + std::vector<uint32_t> periods(_metricManager.getSnapshotPeriods(lock)); if (!periods.empty()) { uint32_t period = periods[0]; - const metrics::MetricSnapshot& snapshot( - _metricManager.getMetricSnapshot(lock, period)); + const metrics::MetricSnapshot& snapshot(_metricManager.getMetricSnapshot(lock, period)); metrics::JsonWriter metricJsonWriter(stream); _metricManager.visit(lock, snapshot, metricJsonWriter, "fleetcontroller"); } else { diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index c1cfc97bab6..c19f8a7a11e 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -200,10 +200,8 @@ StorageNode::initialize() _deadLockDetector.reset(new DeadLockDetector(_context.getComponentRegister())); _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings); _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector); - _deadLockDetector->setProcessSlack(framework::MilliSecTime( - static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000))); - _deadLockDetector->setWaitSlack(framework::MilliSecTime( - static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000))); + _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); + _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); createChain(*_chain_builder); _chain = std::move(*_chain_builder).build(); @@ -240,8 +238,8 @@ void StorageNode::initializeStatusWebServer() { if (_singleThreadedDebugMode) return; - _statusWebServer.reset(new StatusWebServer(_context.getComponentRegister(), - _context.getComponentRegister(), _configUri)); + _statusWebServer = std::make_unique<StatusWebServer>(_context.getComponentRegister(), + _context.getComponentRegister(), _configUri); } #define DIFFER(a) (!(oldC.a == newC.a)) diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 7a450508375..e78a0d0193f 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -48,15 +48,13 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _configFetcher.subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this); _configFetcher.start(); _component.registerMetric(*_metrics); - framework::MilliSecTime maxProcessTime(30 * 1000); - framework::MilliSecTime waitTime(1000); - _thread = _component.startThread(*this, maxProcessTime, waitTime); + _thread = _component.startThread(*this, 30s, 1s); _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); - _visitorFactories["dumpvisitor"].reset(new DumpVisitorSingleFactory); - _visitorFactories["dumpvisitorsingle"].reset(new DumpVisitorSingleFactory); - _visitorFactories["testvisitor"].reset(new TestVisitorFactory); - _visitorFactories["countvisitor"].reset(new CountVisitorFactory); - _visitorFactories["recoveryvisitor"].reset(new RecoveryVisitorFactory); + _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>(); + _visitorFactories["dumpvisitorsingle"] = std::make_shared<DumpVisitorSingleFactory>(); + _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>(); + _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>(); + _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>(); _component.registerStatusPage(*this); } diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index e1daeb729b7..acaa96bc418 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -96,9 +96,7 @@ VisitorThread::VisitorThread(uint32_t threadIndex, _messageSessionFactory(messageSessionFac), _visitorFactories(visitorFactories) { - framework::MilliSecTime maxProcessingTime(30 * 1000); - framework::MilliSecTime waitTime(1000); - _thread = _component.startThread(*this, maxProcessingTime, waitTime); + _thread = _component.startThread(*this, 30s, 1s); _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); } diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp index 8e3d7b29cd6..b877f08474d 100644 --- a/storageframework/src/tests/thread/tickingthreadtest.cpp +++ b/storageframework/src/tests/thread/tickingthreadtest.cpp @@ -6,7 +6,6 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/exception.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/time.h> #include <thread> namespace storage::framework::defaultimplementation { @@ -28,7 +27,7 @@ struct MyApp : public TickingThread { TickingThreadPool::UP _threadPool; MyApp(int threadCount, bool doCritOverlapTest = false); - ~MyApp(); + ~MyApp() override; void start(ThreadPool& p) { _threadPool->start(p); } @@ -95,7 +94,7 @@ MyApp::MyApp(int threadCount, bool doCritOverlapTest) } } -MyApp::~MyApp() { } +MyApp::~MyApp() = default; } @@ -120,17 +119,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_basic) TEST(TickingThreadTest, test_ticks_before_wait_live_update) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg = std::make_unique<ComponentRegisterImpl>(); int threadCount = 1; MyApp app(threadCount); // Configure thread pool to send bulks of 5000 ticks each second. long unsigned int ticksBeforeWaitMs = 5000; - MilliSecTime waitTimeMs(1000); - MilliSecTime maxProcessingTime(234234); app.start(testReg.getThreadPoolImpl()); app._threadPool->updateParametersAllThreads( - waitTimeMs, maxProcessingTime, ticksBeforeWaitMs); + 1s, 234234ms, ticksBeforeWaitMs); // Check that 5000 ticks are received instantly (usually <2 ms) // (if live update is broken it will take more than an hour). @@ -146,16 +142,14 @@ TEST(TickingThreadTest, test_ticks_before_wait_live_update) TEST(TickingThreadTest, test_destroy_without_starting) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); int threadCount = 5; MyApp app(threadCount, true); } TEST(TickingThreadTest, test_verbose_stopping) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); int threadCount = 5; MyApp app(threadCount, true); app.start(testReg.getThreadPoolImpl()); @@ -167,8 +161,7 @@ TEST(TickingThreadTest, test_verbose_stopping) TEST(TickingThreadTest, test_stop_on_deletion) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); int threadCount = 5; MyApp app(threadCount, true); app.start(testReg.getThreadPoolImpl()); @@ -179,8 +172,7 @@ TEST(TickingThreadTest, test_stop_on_deletion) TEST(TickingThreadTest, test_lock_all_ticks) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); int threadCount = 5; MyApp app1(threadCount); MyApp app2(threadCount); @@ -207,8 +199,7 @@ TEST(TickingThreadTest, test_lock_all_ticks) TEST(TickingThreadTest, test_lock_critical_ticks) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); int threadCount = 5; uint64_t iterationsBeforeOverlap = 0; { @@ -290,18 +281,17 @@ struct BroadcastApp : public TickingThread { }; BroadcastApp::BroadcastApp() - : _threadPool(TickingThreadPool::createDefault("testApp", MilliSecTime(300000))) + : _threadPool(TickingThreadPool::createDefault("testApp", 300s)) { _threadPool->addThread(*this); } -BroadcastApp::~BroadcastApp() {} +BroadcastApp::~BroadcastApp() = default; } TEST(TickingThreadTest, test_broadcast) { - TestComponentRegister testReg( - ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); BroadcastApp app; app.start(testReg.getThreadPoolImpl()); app.doTask("foo"); diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp index e4560370a01..e0f441089ff 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp @@ -159,12 +159,6 @@ ComponentRegisterImpl::registerUpdateHook(vespalib::stringref name, _hooks.emplace_back(std::move(hookPtr)); } -metrics::MetricLockGuard -ComponentRegisterImpl::getMetricManagerLock() -{ - return _metricManager->getMetricLock(); -} - void ComponentRegisterImpl::registerShutdownListener(ShutdownListener& listener) { diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h index 3adef5f4838..c7b74ec631a 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h @@ -22,6 +22,7 @@ #include <vespa/storageframework/generic/metric/metricregistrator.h> #include <vespa/storageframework/generic/status/statusreportermap.h> #include <vespa/metrics/metricset.h> +#include <mutex> namespace metrics { @@ -57,7 +58,7 @@ public: typedef std::unique_ptr<ComponentRegisterImpl> UP; ComponentRegisterImpl(); - ~ComponentRegisterImpl(); + ~ComponentRegisterImpl() override; bool hasMetricManager() const { return (_metricManager != 0); } metrics::MetricManager& getMetricManager() { return *_metricManager; } @@ -75,7 +76,6 @@ public: void registerMetric(metrics::Metric&) override; void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) override; - vespalib::MonitorGuard getMetricManagerLock() override; void registerShutdownListener(ShutdownListener&); }; diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp index 4b9b2639d3b..ba3d52fb8f5 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -12,8 +12,8 @@ namespace storage::framework::defaultimplementation { ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, Runnable& runnable, vespalib::stringref id, - uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, + vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait) : Thread(id), _pool(pool), @@ -25,7 +25,7 @@ ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, _joined(false), _thread(*this) { - _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getTimeInMillis().getTime(); + _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getMonotonicTime(); _thread.start(_pool.getThreadPool()); } @@ -69,30 +69,29 @@ ThreadImpl::join() } void -ThreadImpl::registerTick(CycleType cycleType, MilliSecTime time) +ThreadImpl::registerTick(CycleType cycleType, vespalib::steady_time now) { - if (!time.isSet()) time = _pool.getClock().getTimeInMillis(); + if (now.time_since_epoch() == vespalib::duration::zero()) now = _pool.getClock().getMonotonicTime(); ThreadTickData data(getTickData()); - uint64_t previousTickMs = data._lastTickMs; - uint64_t nowMs = time.getTime(); - data._lastTickMs = nowMs; + vespalib::steady_clock::time_point previousTick = data._lastTick; + data._lastTick = now; data._lastTickType = cycleType; setTickData(data); - if (data._lastTickMs == 0) { return; } + if (data._lastTick.time_since_epoch() == vespalib::duration::zero()) { return; } - if (previousTickMs > nowMs) { + if (previousTick > now) { LOGBP(warning, "Thread is registering tick at time %" PRIu64 ", but " "last time it registered a tick, the time was %" PRIu64 ". Assuming clock has been adjusted backwards", - nowMs, previousTickMs); + vespalib::count_ms(now.time_since_epoch()), vespalib::count_ms(previousTick.time_since_epoch())); return; } - uint64_t cycleTimeMs = nowMs - previousTickMs; + vespalib::duration cycleTimeMs = now - previousTick; if (cycleType == WAIT_CYCLE) { - data._maxWaitTimeSeenMs = std::max(data._maxWaitTimeSeenMs, cycleTimeMs); + data._maxWaitTimeSeen = std::max(data._maxWaitTimeSeen, cycleTimeMs); } else { - data._maxProcessingTimeSeenMs = std::max(data._maxProcessingTimeSeenMs, cycleTimeMs); + data._maxProcessingTimeSeen = std::max(data._maxProcessingTimeSeen, cycleTimeMs); } } @@ -111,9 +110,9 @@ ThreadImpl::setTickData(const ThreadTickData& tickData) } void -ThreadImpl::updateParameters(uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, - int ticksBeforeWait) { +ThreadImpl::updateParameters(vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, + int ticksBeforeWait) { _properties.setWaitTime(waitTimeMs); _properties.setMaxProcessTime(maxProcessTimeMs); _properties.setTicksBeforeWait(ticksBeforeWait); @@ -125,9 +124,9 @@ ThreadImpl::AtomicThreadTickData::loadRelaxed() const noexcept ThreadTickData result; constexpr auto relaxed = std::memory_order_relaxed; result._lastTickType = _lastTickType.load(relaxed); - result._lastTickMs = _lastTickMs.load(relaxed); - result._maxProcessingTimeSeenMs = _maxProcessingTimeSeenMs.load(relaxed); - result._maxWaitTimeSeenMs = _maxWaitTimeSeenMs.load(relaxed); + result._lastTick = _lastTickMs.load(relaxed); + result._maxProcessingTimeSeen = _maxProcessingTimeSeenMs.load(relaxed); + result._maxWaitTimeSeen = _maxWaitTimeSeenMs.load(relaxed); return result; } @@ -137,9 +136,9 @@ ThreadImpl::AtomicThreadTickData::storeRelaxed( { constexpr auto relaxed = std::memory_order_relaxed; _lastTickType.store(newState._lastTickType, relaxed); - _lastTickMs.store(newState._lastTickMs, relaxed); - _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeenMs, relaxed); - _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeenMs, relaxed); + _lastTickMs.store(newState._lastTick, relaxed); + _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeen, relaxed); + _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeen, relaxed); } } diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h index 17967c66bdf..077e508a609 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -26,10 +26,16 @@ class ThreadImpl : public Thread * on code using it. */ struct AtomicThreadTickData { + AtomicThreadTickData() noexcept + : _lastTickType(), + _lastTickMs(vespalib::steady_time(vespalib::duration::zero())), + _maxProcessingTimeSeenMs(), + _maxWaitTimeSeenMs() + {} std::atomic<CycleType> _lastTickType; - std::atomic<uint64_t> _lastTickMs; - std::atomic<uint64_t> _maxProcessingTimeSeenMs; - std::atomic<uint64_t> _maxWaitTimeSeenMs; + std::atomic<vespalib::steady_time> _lastTickMs; + std::atomic<vespalib::duration> _maxProcessingTimeSeenMs; + std::atomic<vespalib::duration> _maxWaitTimeSeenMs; // struct stores and loads are both data race free with relaxed // memory semantics. This means it's possible to observe stale/partial // state in a case with concurrent readers/writers. @@ -49,26 +55,23 @@ class ThreadImpl : public Thread void run(); public: - ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, int ticksBeforeWait); + ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait); ~ThreadImpl(); bool interrupted() const override; bool joined() const override; void interrupt() override; void join() override; - void registerTick(CycleType, MilliSecTime) override; - uint64_t getWaitTime() const override { + void registerTick(CycleType, vespalib::steady_time) override; + vespalib::duration getWaitTime() const override { return _properties.getWaitTime(); } int getTicksBeforeWait() const override { return _properties.getTicksBeforeWait(); } - uint64_t getMaxProcessTime() const override { - return _properties.getMaxProcessTime(); - } - void updateParameters(uint64_t waitTime, uint64_t maxProcessTime, int ticksBeforeWait) override; + void updateParameters(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) override; void setTickData(const ThreadTickData&); ThreadTickData getTickData() const; diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp index e4a1188030c..e5698f0cbc2 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp @@ -46,17 +46,16 @@ ThreadPoolImpl::~ThreadPoolImpl() } Thread::UP -ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, uint64_t waitTimeMs, - uint64_t maxProcessTime, int ticksBeforeWait) +ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, vespalib::duration waitTimeMs, + vespalib::duration maxProcessTime, int ticksBeforeWait) { std::lock_guard lock(_threadVectorLock); if (_stopping) { throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC); } - ThreadImpl* ti; - Thread::UP t(ti = new ThreadImpl(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait)); - _threads.push_back(ti); - return t; + auto thread = std::make_unique<ThreadImpl>(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait); + _threads.push_back(thread.get()); + return thread; } void diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h index 5f0ee523eff..5a025ac3ad3 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h @@ -21,8 +21,8 @@ public: ThreadPoolImpl(Clock&); ~ThreadPoolImpl() override; - Thread::UP startThread(Runnable&, vespalib::stringref id, uint64_t waitTimeMs, - uint64_t maxProcessTime, int ticksBeforeWait) override; + Thread::UP startThread(Runnable&, vespalib::stringref id, vespalib::duration waitTimeMs, + vespalib::duration maxProcessTime, int ticksBeforeWait) override; void visitThreads(ThreadVisitor&) const override; void registerThread(ThreadImpl&); diff --git a/storageframework/src/vespa/storageframework/generic/clock/time.h b/storageframework/src/vespa/storageframework/generic/clock/time.h index 4c578f9b33d..811c6cfceb0 100644 --- a/storageframework/src/vespa/storageframework/generic/clock/time.h +++ b/storageframework/src/vespa/storageframework/generic/clock/time.h @@ -3,7 +3,7 @@ #include <boost/operators.hpp> #include <vespa/vespalib/stllike/string.h> -#include <chrono> +#include <vespa/vespalib/util/time.h> namespace vespalib { class asciistream; diff --git a/storageframework/src/vespa/storageframework/generic/component/component.cpp b/storageframework/src/vespa/storageframework/generic/component/component.cpp index 3818a2865d9..44468bfead3 100644 --- a/storageframework/src/vespa/storageframework/generic/component/component.cpp +++ b/storageframework/src/vespa/storageframework/generic/component/component.cpp @@ -23,12 +23,12 @@ Component::close() Component::Component(ComponentRegister& cr, vespalib::stringref name) : _componentRegister(&cr), _name(name), - _status(0), - _metric(0), - _threadPool(0), - _metricReg(0), - _clock(0), - _listener(0) + _status(nullptr), + _metric(nullptr), + _threadPool(nullptr), + _metricReg(nullptr), + _clock(nullptr), + _listener(nullptr) { cr.registerComponent(*this); } @@ -38,23 +38,23 @@ Component::~Component() = default; void Component::registerComponentStateListener(ComponentStateListener& l) { - assert(_listener == 0); + assert(_listener == nullptr); _listener = &l; } void Component::registerStatusPage(const StatusReporter& sr) { - assert(_status == 0); + assert(_status == nullptr); _status = &sr; } void Component::registerMetric(metrics::Metric& m) { - assert(_metric == 0); + assert(_metric == nullptr); _metric = &m; - if (_metricReg != 0) { + if (_metricReg != nullptr) { _metricReg->registerMetric(m); } } @@ -64,28 +64,18 @@ Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period) { assert(_metricUpdateHook.first == 0); _metricUpdateHook = std::make_pair(&hook, period); - if (_metricReg != 0) { + if (_metricReg != nullptr) { _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second); } } -vespalib::MonitorGuard -Component::getMetricManagerLock() -{ - if (_metricReg != 0) { - return _metricReg->getMetricManagerLock(); - } else { - return vespalib::MonitorGuard(); - } -} - void Component::setMetricRegistrator(MetricRegistrator& mr) { _metricReg = &mr; if (_metricUpdateHook.first != 0) { _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second); } - if (_metric != 0) { + if (_metric != nullptr) { _metricReg->registerMetric(*_metric); } } @@ -93,16 +83,16 @@ Component::setMetricRegistrator(MetricRegistrator& mr) { ThreadPool& Component::getThreadPool() const { - assert(_threadPool != 0); + assert(_threadPool != nullptr); return *_threadPool; } // Helper functions for components wanting to start a single thread. Thread::UP -Component::startThread(Runnable& runnable, MilliSecTime waitTime, MilliSecTime maxProcessTime, int ticksBeforeWait) +Component::startThread(Runnable& runnable, vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) { - return getThreadPool().startThread(runnable, getName(), waitTime.getTime(), - maxProcessTime.getTime(), ticksBeforeWait); + return getThreadPool().startThread(runnable, getName(), waitTime, + maxProcessTime, ticksBeforeWait); } void diff --git a/storageframework/src/vespa/storageframework/generic/component/component.h b/storageframework/src/vespa/storageframework/generic/component/component.h index c91d7feb532..bfc80f524b5 100644 --- a/storageframework/src/vespa/storageframework/generic/component/component.h +++ b/storageframework/src/vespa/storageframework/generic/component/component.h @@ -71,7 +71,6 @@ #include <vespa/storageframework/generic/thread/runnable.h> #include <vespa/storageframework/generic/thread/thread.h> #include <vespa/storageframework/generic/clock/clock.h> -#include <vespa/vespalib/util/sync.h> #include <atomic> namespace storage::framework { @@ -153,12 +152,6 @@ public: */ void registerMetricUpdateHook(MetricUpdateHook&, SecondTime period); - /** - * If you need to modify the metric sets that have been registered, you need - * to hold the metric manager lock while you do it. - */ - vespalib::MonitorGuard getMetricManagerLock(); - /** Get the name of the component. Must be a unique name. */ const vespalib::string& getName() const override { return _name; } @@ -185,8 +178,8 @@ public: * in this thread. (Thus one is not required to call registerTick()) */ Thread::UP startThread(Runnable&, - MilliSecTime maxProcessTime = MilliSecTime(0), - MilliSecTime waitTime = MilliSecTime(0), + vespalib::duration maxProcessTime = vespalib::duration::zero(), + vespalib::duration waitTime = vespalib::duration::zero(), int ticksBeforeWait = 1); // Check upgrade flag settings. Note that this flag may change at any time. diff --git a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h index 6cdbf58d555..53988651cbc 100644 --- a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h +++ b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h @@ -11,7 +11,6 @@ #pragma once #include <vespa/storageframework/generic/clock/time.h> -#include <vespa/vespalib/util/sync.h> namespace metrics { class Metric; @@ -26,7 +25,6 @@ struct MetricRegistrator { virtual void registerMetric(metrics::Metric&) = 0; virtual void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) = 0; - virtual vespalib::MonitorGuard getMetricManagerLock() = 0; }; } diff --git a/storageframework/src/vespa/storageframework/generic/thread/runnable.h b/storageframework/src/vespa/storageframework/generic/thread/runnable.h index 245522a7421..a2615694e7c 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/runnable.h +++ b/storageframework/src/vespa/storageframework/generic/thread/runnable.h @@ -43,11 +43,9 @@ struct ThreadHandle { * clock fetches if client already knows current time) */ virtual void registerTick(CycleType = UNKNOWN_CYCLE, - MilliSecTime currentTime = MilliSecTime(0)) = 0; + vespalib::steady_time = vespalib::steady_time()) = 0; - virtual uint64_t getWaitTime() const = 0; - - virtual uint64_t getMaxProcessTime() const = 0; + virtual vespalib::duration getWaitTime() const = 0; /** * The number of ticks done before wait is called when no more work is diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h index 814686eb4fa..e37bba4c723 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h @@ -50,8 +50,8 @@ public: */ virtual void join() = 0; - virtual void updateParameters(uint64_t waitTime, - uint64_t maxProcessTime, + virtual void updateParameters(vespalib::duration waitTime, + vespalib::duration maxProcessTime, int ticksBeforeWait) = 0; /** diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp index d72b0c54544..3c2ebba42f7 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp @@ -2,23 +2,22 @@ #include "threadpool.h" -namespace storage { -namespace framework { +namespace storage::framework { -ThreadProperties::ThreadProperties(uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, +ThreadProperties::ThreadProperties(vespalib::duration waitTimeMs, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait) { - _waitTimeMs.store(waitTimeMs); - _maxProcessTimeMs.store(maxProcessTimeMs); - _ticksBeforeWait.store(ticksBeforeWait); + setWaitTime(waitTimeMs); + setMaxProcessTime(maxProcessTimeMs); + setTicksBeforeWait(ticksBeforeWait); } -uint64_t ThreadProperties::getMaxProcessTime() const { + vespalib::duration ThreadProperties::getMaxProcessTime() const { return _maxProcessTimeMs.load(std::memory_order_relaxed); } -uint64_t ThreadProperties::getWaitTime() const { +vespalib::duration ThreadProperties::getWaitTime() const { return _waitTimeMs.load(std::memory_order_relaxed); } @@ -26,11 +25,11 @@ int ThreadProperties::getTicksBeforeWait() const { return _ticksBeforeWait.load(std::memory_order_relaxed); } -void ThreadProperties::setMaxProcessTime(uint64_t maxProcessingTimeMs) { +void ThreadProperties::setMaxProcessTime(vespalib::duration maxProcessingTimeMs) { _maxProcessTimeMs.store(maxProcessingTimeMs); } -void ThreadProperties::setWaitTime(uint64_t waitTimeMs) { +void ThreadProperties::setWaitTime(vespalib::duration waitTimeMs) { _waitTimeMs.store(waitTimeMs); } @@ -38,5 +37,4 @@ void ThreadProperties::setTicksBeforeWait(int ticksBeforeWait) { _ticksBeforeWait.store(ticksBeforeWait); } -} // framework -} // storage +} diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h index 8ebe382bddf..0ee800c43ff 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h @@ -27,12 +27,12 @@ namespace storage::framework { * thread. */ class ThreadProperties { - private: +private: /** * Time this thread should maximum use to process before a tick is * registered. (Including wait time if wait time is not set) */ - std::atomic_uint_least64_t _maxProcessTimeMs; + std::atomic<vespalib::duration> _maxProcessTimeMs; /** * Time this thread will wait in a non-interrupted wait cycle. * Used in cases where a wait cycle is registered. As long as no other @@ -40,26 +40,26 @@ class ThreadProperties { * wait time here. The deadlock detector should add a configurable * global time period before flagging deadlock anyways. */ - std::atomic_uint_least64_t _waitTimeMs; + std::atomic<vespalib::duration> _waitTimeMs; /** * Number of ticks to be done before a wait. */ std::atomic_uint _ticksBeforeWait; public: - ThreadProperties(uint64_t waitTimeMs, - uint64_t maxProcessTimeMs, + ThreadProperties(vespalib::duration waitTime, + vespalib::duration maxProcessTimeMs, int ticksBeforeWait); - void setMaxProcessTime(uint64_t); - void setWaitTime(uint64_t); + void setMaxProcessTime(vespalib::duration); + void setWaitTime(vespalib::duration); void setTicksBeforeWait(int); - uint64_t getMaxProcessTime() const; - uint64_t getWaitTime() const; + vespalib::duration getMaxProcessTime() const; + vespalib::duration getWaitTime() const; int getTicksBeforeWait() const; - uint64_t getMaxCycleTime() const { + vespalib::duration getMaxCycleTime() const { return std::max(_maxProcessTimeMs.load(std::memory_order_relaxed), _waitTimeMs.load(std::memory_order_relaxed)); } @@ -68,26 +68,26 @@ class ThreadProperties { /** Data kept on each thread due to the registerTick functinality. */ struct ThreadTickData { CycleType _lastTickType; - uint64_t _lastTickMs; - uint64_t _maxProcessingTimeSeenMs; - uint64_t _maxWaitTimeSeenMs; + vespalib::steady_time _lastTick; + vespalib::duration _maxProcessingTimeSeen; + vespalib::duration _maxWaitTimeSeen; }; /** Interface used to access data for the existing threads. */ struct ThreadVisitor { - virtual ~ThreadVisitor() {} + virtual ~ThreadVisitor() = default; virtual void visitThread(const vespalib::string& id, const ThreadProperties&, const ThreadTickData&) = 0; }; struct ThreadPool { - virtual ~ThreadPool() {} + virtual ~ThreadPool() = default; virtual Thread::UP startThread(Runnable&, vespalib::stringref id, - uint64_t waitTimeMs, - uint64_t maxProcessTime, + vespalib::duration waitTimeMs, + vespalib::duration maxProcessTime, int ticksBeforeWait) = 0; virtual void visitThreads(ThreadVisitor&) const = 0; diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp index 3178220654e..7913cdcfe84 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp @@ -22,20 +22,22 @@ ThreadWaitInfo::merge(const ThreadWaitInfo& other) { * global synchronization point where no thread is currently running. */ class TickingThreadRunner final : public Runnable { - vespalib::Monitor& _monitor; - TickingThread& _tickingThread; - uint32_t _threadIndex; - bool _wantToFreeze; - bool _frozen; - char _state; + std::mutex & _monitor; + std::condition_variable & _cond; + TickingThread & _tickingThread; + uint32_t _threadIndex; + bool _wantToFreeze; + bool _frozen; + char _state; public: typedef std::shared_ptr<TickingThreadRunner> SP; - TickingThreadRunner(vespalib::Monitor& m, + TickingThreadRunner(std::mutex& m, + std::condition_variable & cond, TickingThread& ticker, uint32_t threadIndex) noexcept - : _monitor(m), _tickingThread(ticker), + : _monitor(m), _cond(cond), _tickingThread(ticker), _threadIndex(threadIndex), _wantToFreeze(false), _frozen(false) {} /** @@ -43,10 +45,10 @@ public: * tick and has frozen. */ void freeze() { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_monitor); _wantToFreeze = true; while (!_frozen) { - guard.wait(); + _cond.wait(guard); } } @@ -54,9 +56,11 @@ public: * Call to thaw up a frozen thread so it can continue. */ void thaw() { - vespalib::MonitorGuard guard(_monitor); - _wantToFreeze = false; - guard.broadcast(); + { + std::lock_guard guard(_monitor); + _wantToFreeze = false; + } + _cond.notify_all(); } char getState() const { return _state; } @@ -68,18 +72,18 @@ private: int ticksExecutedAfterWait = 0; while (!handle.interrupted()) { { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_monitor); if (info.waitWanted()) { _state = 'w'; cycle = WAIT_CYCLE; if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) { - guard.wait(handle.getWaitTime()); + _cond.wait_for(guard, handle.getWaitTime()); ticksExecutedAfterWait = 0; } } if (_wantToFreeze) { _state = 'f'; - doFreeze(guard); + doFreeze(guard, _cond); ticksExecutedAfterWait = 0; } _state = 'c'; @@ -93,22 +97,23 @@ private: } _state = 's'; } - void doFreeze(vespalib::MonitorGuard& guard) { + void doFreeze(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { _frozen = true; - guard.broadcast(); + cond.notify_all(); while (_wantToFreeze) { - guard.wait(); + _cond.wait(guard); } _frozen = false; } }; class TickingThreadPoolImpl final : public TickingThreadPool { - vespalib::string _name; - vespalib::Monitor _monitor; - std::atomic_uint_least64_t _waitTime; - std::atomic_uint _ticksBeforeWait; - std::atomic_uint_least64_t _maxProcessTime; + vespalib::string _name; + std::mutex _lock; + std::condition_variable _cond; + std::atomic<vespalib::duration> _waitTime; + std::atomic_uint _ticksBeforeWait; + std::atomic<vespalib::duration> _maxProcessTime; std::vector<TickingThreadRunner::SP> _tickers; std::vector<std::shared_ptr<Thread>> _threads; @@ -120,40 +125,40 @@ class TickingThreadPoolImpl final : public TickingThreadPool { void broadcast() override {} }; struct CriticalGuard final : public TickingLockGuard::Impl { - vespalib::MonitorGuard _guard; + std::condition_variable &_cond; - explicit CriticalGuard(vespalib::Monitor& m) : _guard(m) {} + explicit CriticalGuard(std::condition_variable & cond) : _cond(cond) {} - void broadcast() override { _guard.broadcast(); } + void broadcast() override { _cond.notify_all(); } }; public: - TickingThreadPoolImpl(vespalib::stringref name, MilliSecTime waitTime, - int ticksBeforeWait, MilliSecTime maxProcessTime) + TickingThreadPoolImpl(vespalib::stringref name, vespalib::duration waitTime, + int ticksBeforeWait, vespalib::duration maxProcessTime) : _name(name), - _waitTime(waitTime.getTime()), + _waitTime(waitTime), _ticksBeforeWait(ticksBeforeWait), - _maxProcessTime(maxProcessTime.getTime()) {} + _maxProcessTime(maxProcessTime) {} ~TickingThreadPoolImpl() override { stop(); } - void updateParametersAllThreads(MilliSecTime waitTime, MilliSecTime maxProcessTime, + void updateParametersAllThreads(vespalib::duration waitTime, vespalib::duration maxProcessTime, int ticksBeforeWait) override { - _waitTime.store(waitTime.getTime()); - _maxProcessTime.store(maxProcessTime.getTime()); + _waitTime.store(waitTime); + _maxProcessTime.store(maxProcessTime); _ticksBeforeWait.store(ticksBeforeWait); // TODO: Add locking so threads not deleted while updating for (uint32_t i=0; i<_threads.size(); ++i) { - _threads[i]->updateParameters(waitTime.getTime(), maxProcessTime.getTime(), ticksBeforeWait); + _threads[i]->updateParameters(waitTime, maxProcessTime, ticksBeforeWait); } } void addThread(TickingThread& ticker) override { ThreadIndex index = _tickers.size(); ticker.newThreadCreated(index); - _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_monitor, ticker, index)); + _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_lock, _cond, ticker, index)); } void start(ThreadPool& pool) override { @@ -175,7 +180,7 @@ public: } TickingLockGuard freezeCriticalTicks() override { - return TickingLockGuard(std::make_unique<CriticalGuard>(_monitor)); + return TickingLockGuard(std::make_unique<CriticalGuard>(_cond)); } void stop() override { @@ -183,8 +188,7 @@ public: t->interrupt(); } { - vespalib::MonitorGuard guard(_monitor); - guard.broadcast(); + _cond.notify_all(); } for (auto& t : _threads) { t->join(); @@ -216,9 +220,9 @@ private: TickingThreadPool::UP TickingThreadPool::createDefault( vespalib::stringref name, - MilliSecTime waitTime, + vespalib::duration waitTime, int ticksBeforeWait, - MilliSecTime maxProcessTime) + vespalib::duration maxProcessTime) { return std::make_unique<TickingThreadPoolImpl>(name, waitTime, ticksBeforeWait, maxProcessTime); } diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h index 1ed02d0f6fd..0649d914c75 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h @@ -80,13 +80,13 @@ struct TickingThreadPool : public ThreadLock { static TickingThreadPool::UP createDefault( vespalib::stringref name, - MilliSecTime waitTime = MilliSecTime(5), + vespalib::duration waitTime = 5ms, int ticksBeforeWait = 1, - MilliSecTime maxProcessTime = SecondTime(5).getMillis()); + vespalib::duration maxProcessTime = 5s); virtual void updateParametersAllThreads( - MilliSecTime waitTime, - MilliSecTime maxProcessTime, + vespalib::duration waitTime, + vespalib::duration maxProcessTime, int ticksBeforeWait) = 0; ~TickingThreadPool() override = default; |