summaryrefslogtreecommitdiffstats
path: root/vespalib/src
diff options
context:
space:
mode:
Diffstat (limited to 'vespalib/src')
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp11
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/util/executor_stats.h9
-rw-r--r--vespalib/src/vespa/vespalib/util/monitored_refcount.cpp44
-rw-r--r--vespalib/src/vespa/vespalib/util/monitored_refcount.h29
-rw-r--r--vespalib/src/vespa/vespalib/util/retain_guard.h45
-rw-r--r--vespalib/src/vespa/vespalib/util/threadexecutor.h7
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h4
9 files changed, 136 insertions, 20 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index 63c6856afd2..b55f54f9339 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -33,17 +33,18 @@ std::atomic<uint32_t> MyTask::runCnt(0);
std::atomic<uint32_t> MyTask::deleteCnt(0);
struct MyState {
+ static constexpr uint32_t NUM_THREADS = 10;
Gate gate; // to block workers
CountDownLatch latch; // to wait for workers
ThreadStackExecutor executor;
bool checked;
- MyState() : gate(), latch(10), executor(10, 128000, 20), checked(false)
+ MyState() : gate(), latch(10), executor(NUM_THREADS, 128000, 20), checked(false)
{
MyTask::resetStats();
}
MyState &execute(uint32_t cnt) {
for (uint32_t i = 0; i < cnt; ++i) {
- executor.execute(Task::UP(new MyTask(gate, latch)));
+ executor.execute(std::make_unique<MyTask>(gate, latch));
}
return *this;
}
@@ -70,7 +71,7 @@ struct MyState {
{
ASSERT_TRUE(!checked);
checked = true;
- ThreadStackExecutor::Stats stats = executor.getStats();
+ ExecutorStats stats = executor.getStats();
EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt);
EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt);
EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,
@@ -187,11 +188,11 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(
}
TEST("require that stats can be accumulated") {
- ThreadStackExecutor::Stats stats(ThreadExecutor::Stats::QueueSizeT(1) ,2,3);
+ ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3);
EXPECT_EQUAL(1u, stats.queueSize.max());
EXPECT_EQUAL(2u, stats.acceptedTasks);
EXPECT_EQUAL(3u, stats.rejectedTasks);
- stats += ThreadStackExecutor::Stats(ThreadExecutor::Stats::QueueSizeT(7),8,9);
+ stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9);
EXPECT_EQUAL(2u, stats.queueSize.count());
EXPECT_EQUAL(8u, stats.queueSize.total());
EXPECT_EQUAL(8u, stats.queueSize.max());
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 9e2917775f0..6115c21623d 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -38,6 +38,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
memoryusage.cpp
mmap_file_allocator.cpp
mmap_file_allocator_factory.cpp
+ monitored_refcount.cpp
printable.cpp
priority_queue.cpp
random.cpp
diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h
index 49d83c96714..f1f58685570 100644
--- a/vespalib/src/vespa/vespalib/util/executor_stats.h
+++ b/vespalib/src/vespa/vespalib/util/executor_stats.h
@@ -3,6 +3,7 @@
#pragma once
#include <limits>
+#include <cstdint>
namespace vespalib {
@@ -15,10 +16,10 @@ public:
AggregatedAverage() : AggregatedAverage(0ul, T(0), std::numeric_limits<T>::max(), std::numeric_limits<T>::min()) { }
explicit AggregatedAverage(T value) : AggregatedAverage(1, value, value, value) { }
AggregatedAverage(size_t count_in, T total_in, T min_in, T max_in)
- : _count(count_in),
- _total(total_in),
- _min(min_in),
- _max(max_in)
+ : _count(count_in),
+ _total(total_in),
+ _min(min_in),
+ _max(max_in)
{ }
AggregatedAverage & operator += (const AggregatedAverage & rhs) {
add(rhs);
diff --git a/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp
new file mode 100644
index 00000000000..4376e26bb66
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp
@@ -0,0 +1,44 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "monitored_refcount.h"
+#include <cassert>
+
+namespace vespalib {
+
+MonitoredRefCount::MonitoredRefCount()
+ : _lock(),
+ _cv(),
+ _refCount(0u)
+{
+}
+
+MonitoredRefCount::~MonitoredRefCount()
+{
+ assert(_refCount == 0u);
+}
+
+void
+MonitoredRefCount::retain() noexcept
+{
+ std::lock_guard<std::mutex> guard(_lock);
+ ++_refCount;
+}
+
+void
+MonitoredRefCount::release() noexcept
+{
+ std::lock_guard<std::mutex> guard(_lock);
+ --_refCount;
+ if (_refCount == 0u) {
+ _cv.notify_all();
+ }
+}
+
+void
+MonitoredRefCount::waitForZeroRefCount()
+{
+ std::unique_lock<std::mutex> guard(_lock);
+ _cv.wait(guard, [this] { return (_refCount == 0u); });
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/monitored_refcount.h
new file mode 100644
index 00000000000..465284b6fd3
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.h
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+
+namespace vespalib {
+
+class RetainGuard;
+/*
+ * Class containing a reference count that can be waited on to become zero.
+ * Typically ancestor or member of a class that has to be careful of when
+ * portions object can be properly torn down before destruction itself.
+ */
+class MonitoredRefCount
+{
+ std::mutex _lock;
+ std::condition_variable _cv;
+ uint32_t _refCount;
+ void retain() noexcept;
+ void release() noexcept;
+ friend RetainGuard;
+public:
+ MonitoredRefCount();
+ virtual ~MonitoredRefCount();
+ void waitForZeroRefCount();
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/retain_guard.h b/vespalib/src/vespa/vespalib/util/retain_guard.h
new file mode 100644
index 00000000000..090f3ce75cf
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/retain_guard.h
@@ -0,0 +1,45 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "monitored_refcount.h"
+
+namespace vespalib {
+
+/*
+ * Class containing a reference to a monitored reference count,
+ * intended to block teardown of the class owning the monitored
+ * reference count.
+ */
+class RetainGuard {
+public:
+ RetainGuard(MonitoredRefCount & refCount) noexcept
+ : _refCount(&refCount)
+ {
+ _refCount->retain();
+ }
+ RetainGuard(const RetainGuard & rhs) = delete;
+ RetainGuard & operator=(const RetainGuard & rhs) = delete;
+ RetainGuard(RetainGuard && rhs) noexcept
+ : _refCount(rhs._refCount)
+ {
+ rhs._refCount = nullptr;
+ }
+ RetainGuard & operator=(RetainGuard && rhs) noexcept {
+ release();
+ _refCount = rhs._refCount;
+ rhs._refCount = nullptr;
+ return *this;
+ }
+ ~RetainGuard() { release(); }
+private:
+ void release() noexcept{
+ if (_refCount != nullptr) {
+ _refCount->release();
+ _refCount = nullptr;
+ }
+ }
+ MonitoredRefCount * _refCount;
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h
index 9ab7aacfc4b..36c72fa4bb0 100644
--- a/vespalib/src/vespa/vespalib/util/threadexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h
@@ -12,11 +12,6 @@ class ThreadExecutor : public Executor
{
public:
/**
- * Internal stats that we want to observe externally. Note that
- * all stats are reset each time they are observed.
- **/
- using Stats = ExecutorStats;
- /**
* Get number of threads in the executor pool.
* @return number of threads in the pool
*/
@@ -26,7 +21,7 @@ public:
* Observe and reset stats for this object.
* @return stats
**/
- virtual Stats getStats() = 0;
+ virtual ExecutorStats getStats() = 0;
/**
* Sets a new upper limit for accepted number of tasks.
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 32e47f366cc..f80a5b4ce32 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -203,12 +203,12 @@ ThreadStackExecutorBase::num_idle_workers() const
return _workers.size();
}
-ThreadStackExecutorBase::Stats
+ExecutorStats
ThreadStackExecutorBase::getStats()
{
std::unique_lock guard(_lock);
- Stats stats = _stats;
- _stats = Stats();
+ ExecutorStats stats = _stats;
+ _stats = ExecutorStats();
_stats.queueSize.add(_taskCount);
return stats;
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 59ed385b4f4..66a34bfde95 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -80,7 +80,7 @@ private:
std::unique_ptr<FastOS_ThreadPool> _pool;
mutable std::mutex _lock;
std::condition_variable _cond;
- Stats _stats;
+ ExecutorStats _stats;
Gate _executorCompletion;
ArrayQueue<TaggedTask> _tasks;
ArrayQueue<Worker*> _workers;
@@ -188,7 +188,7 @@ public:
**/
size_t num_idle_workers() const;
- Stats getStats() override;
+ ExecutorStats getStats() override;
Task::UP execute(Task::UP task) override;