diff options
Diffstat (limited to 'vespalib/src')
9 files changed, 136 insertions, 20 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index 63c6856afd2..b55f54f9339 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -33,17 +33,18 @@ std::atomic<uint32_t> MyTask::runCnt(0); std::atomic<uint32_t> MyTask::deleteCnt(0); struct MyState { + static constexpr uint32_t NUM_THREADS = 10; Gate gate; // to block workers CountDownLatch latch; // to wait for workers ThreadStackExecutor executor; bool checked; - MyState() : gate(), latch(10), executor(10, 128000, 20), checked(false) + MyState() : gate(), latch(10), executor(NUM_THREADS, 128000, 20), checked(false) { MyTask::resetStats(); } MyState &execute(uint32_t cnt) { for (uint32_t i = 0; i < cnt; ++i) { - executor.execute(Task::UP(new MyTask(gate, latch))); + executor.execute(std::make_unique<MyTask>(gate, latch)); } return *this; } @@ -70,7 +71,7 @@ struct MyState { { ASSERT_TRUE(!checked); checked = true; - ThreadStackExecutor::Stats stats = executor.getStats(); + ExecutorStats stats = executor.getStats(); EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt); EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt); EXPECT_EQUAL(expect_queue + expect_running + expect_deleted, @@ -187,11 +188,11 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor( } TEST("require that stats can be accumulated") { - ThreadStackExecutor::Stats stats(ThreadExecutor::Stats::QueueSizeT(1) ,2,3); + ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3); EXPECT_EQUAL(1u, stats.queueSize.max()); EXPECT_EQUAL(2u, stats.acceptedTasks); EXPECT_EQUAL(3u, stats.rejectedTasks); - stats += ThreadStackExecutor::Stats(ThreadExecutor::Stats::QueueSizeT(7),8,9); + stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9); EXPECT_EQUAL(2u, stats.queueSize.count()); EXPECT_EQUAL(8u, stats.queueSize.total()); EXPECT_EQUAL(8u, stats.queueSize.max()); diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 9e2917775f0..6115c21623d 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -38,6 +38,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT memoryusage.cpp mmap_file_allocator.cpp mmap_file_allocator_factory.cpp + monitored_refcount.cpp printable.cpp priority_queue.cpp random.cpp diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h index 49d83c96714..f1f58685570 100644 --- a/vespalib/src/vespa/vespalib/util/executor_stats.h +++ b/vespalib/src/vespa/vespalib/util/executor_stats.h @@ -3,6 +3,7 @@ #pragma once #include <limits> +#include <cstdint> namespace vespalib { @@ -15,10 +16,10 @@ public: AggregatedAverage() : AggregatedAverage(0ul, T(0), std::numeric_limits<T>::max(), std::numeric_limits<T>::min()) { } explicit AggregatedAverage(T value) : AggregatedAverage(1, value, value, value) { } AggregatedAverage(size_t count_in, T total_in, T min_in, T max_in) - : _count(count_in), - _total(total_in), - _min(min_in), - _max(max_in) + : _count(count_in), + _total(total_in), + _min(min_in), + _max(max_in) { } AggregatedAverage & operator += (const AggregatedAverage & rhs) { add(rhs); diff --git a/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp new file mode 100644 index 00000000000..4376e26bb66 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp @@ -0,0 +1,44 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "monitored_refcount.h" +#include <cassert> + +namespace vespalib { + +MonitoredRefCount::MonitoredRefCount() + : _lock(), + _cv(), + _refCount(0u) +{ +} + +MonitoredRefCount::~MonitoredRefCount() +{ + assert(_refCount == 0u); +} + +void +MonitoredRefCount::retain() noexcept +{ + std::lock_guard<std::mutex> guard(_lock); + ++_refCount; +} + +void +MonitoredRefCount::release() noexcept +{ + std::lock_guard<std::mutex> guard(_lock); + --_refCount; + if (_refCount == 0u) { + _cv.notify_all(); + } +} + +void +MonitoredRefCount::waitForZeroRefCount() +{ + std::unique_lock<std::mutex> guard(_lock); + _cv.wait(guard, [this] { return (_refCount == 0u); }); +} + +} diff --git a/vespalib/src/vespa/vespalib/util/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/monitored_refcount.h new file mode 100644 index 00000000000..465284b6fd3 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.h @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <mutex> +#include <condition_variable> + +namespace vespalib { + +class RetainGuard; +/* + * Class containing a reference count that can be waited on to become zero. + * Typically ancestor or member of a class that has to be careful of when + * portions object can be properly torn down before destruction itself. + */ +class MonitoredRefCount +{ + std::mutex _lock; + std::condition_variable _cv; + uint32_t _refCount; + void retain() noexcept; + void release() noexcept; + friend RetainGuard; +public: + MonitoredRefCount(); + virtual ~MonitoredRefCount(); + void waitForZeroRefCount(); +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/retain_guard.h b/vespalib/src/vespa/vespalib/util/retain_guard.h new file mode 100644 index 00000000000..090f3ce75cf --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/retain_guard.h @@ -0,0 +1,45 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "monitored_refcount.h" + +namespace vespalib { + +/* + * Class containing a reference to a monitored reference count, + * intended to block teardown of the class owning the monitored + * reference count. + */ +class RetainGuard { +public: + RetainGuard(MonitoredRefCount & refCount) noexcept + : _refCount(&refCount) + { + _refCount->retain(); + } + RetainGuard(const RetainGuard & rhs) = delete; + RetainGuard & operator=(const RetainGuard & rhs) = delete; + RetainGuard(RetainGuard && rhs) noexcept + : _refCount(rhs._refCount) + { + rhs._refCount = nullptr; + } + RetainGuard & operator=(RetainGuard && rhs) noexcept { + release(); + _refCount = rhs._refCount; + rhs._refCount = nullptr; + return *this; + } + ~RetainGuard() { release(); } +private: + void release() noexcept{ + if (_refCount != nullptr) { + _refCount->release(); + _refCount = nullptr; + } + } + MonitoredRefCount * _refCount; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h index 9ab7aacfc4b..36c72fa4bb0 100644 --- a/vespalib/src/vespa/vespalib/util/threadexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h @@ -12,11 +12,6 @@ class ThreadExecutor : public Executor { public: /** - * Internal stats that we want to observe externally. Note that - * all stats are reset each time they are observed. - **/ - using Stats = ExecutorStats; - /** * Get number of threads in the executor pool. * @return number of threads in the pool */ @@ -26,7 +21,7 @@ public: * Observe and reset stats for this object. * @return stats **/ - virtual Stats getStats() = 0; + virtual ExecutorStats getStats() = 0; /** * Sets a new upper limit for accepted number of tasks. diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 32e47f366cc..f80a5b4ce32 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -203,12 +203,12 @@ ThreadStackExecutorBase::num_idle_workers() const return _workers.size(); } -ThreadStackExecutorBase::Stats +ExecutorStats ThreadStackExecutorBase::getStats() { std::unique_lock guard(_lock); - Stats stats = _stats; - _stats = Stats(); + ExecutorStats stats = _stats; + _stats = ExecutorStats(); _stats.queueSize.add(_taskCount); return stats; } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 59ed385b4f4..66a34bfde95 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -80,7 +80,7 @@ private: std::unique_ptr<FastOS_ThreadPool> _pool; mutable std::mutex _lock; std::condition_variable _cond; - Stats _stats; + ExecutorStats _stats; Gate _executorCompletion; ArrayQueue<TaggedTask> _tasks; ArrayQueue<Worker*> _workers; @@ -188,7 +188,7 @@ public: **/ size_t num_idle_workers() const; - Stats getStats() override; + ExecutorStats getStats() override; Task::UP execute(Task::UP task) override; |