diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-29 13:24:55 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-29 13:25:45 +0000 |
commit | 97b888e9a890b3af4cade6f33de13f1f04ad96fd (patch) | |
tree | aad36780514f9a4282c4c4db94fd3c132cc216d5 /vespalib | |
parent | b9fe86b6f81c4da88a0d5db9abc000849ab3e296 (diff) |
Add a wakeup service
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/tests/wakeupservice/CMakeLists.txt | 8 | ||||
-rw-r--r-- | vespalib/src/tests/wakeupservice/wakeupservice_test.cpp | 48 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/executor.h | 15 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/iwakeupservice.h | 20 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/wakeupservice.cpp | 90 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/wakeupservice.h | 35 |
8 files changed, 213 insertions, 5 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index 2f657b1bed0..ff4e62cc67c 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -151,6 +151,7 @@ vespa_define_module( src/tests/valgrind src/tests/visit_ranges src/tests/wakeup + src/tests/wakeupservice src/tests/websocket src/tests/zcurve diff --git a/vespalib/src/tests/wakeupservice/CMakeLists.txt b/vespalib/src/tests/wakeupservice/CMakeLists.txt new file mode 100644 index 00000000000..b00fe475c82 --- /dev/null +++ b/vespalib/src/tests/wakeupservice/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_wakeupservice_test_app TEST + SOURCES + wakeupservice_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_wakeupservice_test_app COMMAND vespalib_wakeupservice_test_app) diff --git a/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp b/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp new file mode 100644 index 00000000000..7b24d47d678 --- /dev/null +++ b/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp @@ -0,0 +1,48 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/wakeupservice.h> + +using namespace vespalib; + +struct WakeupCounter : public IWakeup { + WakeupCounter() : _count(0) {} + void wakeup() override { _count++; } + std::atomic<uint64_t> _count; +}; + +TEST("require that wakeup is called") { + WakeupCounter a; + WakeupService service(1ms); + EXPECT_EQUAL(0u, a._count); + auto ra = service.registerForWakeup(&a); + EXPECT_TRUE(ra); + while (a._count == 0) { + std::this_thread::sleep_for(1ms); + } + ra.reset(); + uint64_t countAtStop = a._count; + std::this_thread::sleep_for(1s); + EXPECT_EQUAL(countAtStop, a._count); +} + +TEST("require that same wakeup can only be registered once, but reregisterd after unregistered.") { + WakeupCounter a; + WakeupService service(1ms); + EXPECT_EQUAL(0u, a._count); + auto ra1 = service.registerForWakeup(&a); + EXPECT_TRUE(ra1); + auto ra2 = service.registerForWakeup(&a); + EXPECT_FALSE(ra2); + while (a._count == 0) { + std::this_thread::sleep_for(1ms); + } + ra1.reset(); + uint64_t countAtStop = a._count; + ra2 = service.registerForWakeup(&a); + EXPECT_TRUE(ra2); + std::this_thread::sleep_for(1s); + EXPECT_LESS(countAtStop, a._count); +} + + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 75ea02d448e..92371418c8a 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -69,6 +69,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT time.cpp unwind_message.cpp valgrind.cpp + wakeupservice.cpp zstdcompressor.cpp DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h index 6ef8f182ec4..52d4252ce78 100644 --- a/vespalib/src/vespa/vespalib/util/executor.h +++ b/vespalib/src/vespa/vespalib/util/executor.h @@ -6,12 +6,21 @@ namespace vespalib { +class IWakeup { +public: + virtual ~IWakeup() = default; + /** + * In case you have a lazy executor that naps inbetween. + **/ + virtual void wakeup() = 0; +}; + /** * An executor decouples the execution of a task from the request of * executing that task. Also, tasks are typically executed * concurrently in multiple threads. **/ -class Executor +class Executor : public IWakeup { public: /** @@ -37,10 +46,6 @@ public: **/ virtual Task::UP execute(Task::UP task) = 0; - /** - * In case you have a lazy executor that naps inbetween. - **/ - virtual void wakeup() = 0; virtual ~Executor() = default; }; diff --git a/vespalib/src/vespa/vespalib/util/iwakeupservice.h b/vespalib/src/vespa/vespalib/util/iwakeupservice.h new file mode 100644 index 00000000000..fbd8a299fe7 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/iwakeupservice.h @@ -0,0 +1,20 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "executor.h" +#include "idestructorcallback.h" + +namespace vespalib { + +/** + * Interface to register for receiving wakeup calls. + * The registration will last as long as the returned object is kept alive. + **/ +class IWakeupService { +public: + virtual ~IWakeupService() = default; + virtual std::shared_ptr<IDestructorCallback> registerForWakeup(IWakeup * toWakeup) = 0; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/wakeupservice.cpp b/vespalib/src/vespa/vespalib/util/wakeupservice.cpp new file mode 100644 index 00000000000..fbb4b1c12e6 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/wakeupservice.cpp @@ -0,0 +1,90 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "wakeupservice.h" +#include <cassert> + +namespace vespalib { + +WakeupService::WakeupService(duration napTime) + : _naptime(napTime), + _lock(), + _closed(false), + _toWakeup(), + _thread() +{ +} + +WakeupService::~WakeupService() +{ + { + std::lock_guard guard(_lock); + assert(_toWakeup.empty()); + _closed = true; + } + if (_thread) { + _thread->join(); + } +} + +class WakeupService::Registration : public IDestructorCallback { +public: + Registration(WakeupService * service, IWakeup * toWakeup) noexcept + : _service(service), + _toWakeup(toWakeup) + { } + Registration(const Registration &) = delete; + Registration & operator=(const Registration &) = delete; + ~Registration() override{ + _service->unregister(_toWakeup); + } +private: + WakeupService * _service; + IWakeup * _toWakeup; +}; + +std::shared_ptr<IDestructorCallback> +WakeupService::registerForWakeup(IWakeup * toWakeup) { + std::lock_guard guard(_lock); + auto found = std::find(_toWakeup.begin(), _toWakeup.end(), toWakeup); + if (found != _toWakeup.end()) return std::shared_ptr<IDestructorCallback>(); + + _toWakeup.push_back(toWakeup); + if ( ! _thread) { + _thread = std::make_unique<std::thread>(WakeupService::run, this); + } + return std::make_shared<Registration>(this, toWakeup); +} + +void +WakeupService::unregister(IWakeup * toWakeup) { + std::lock_guard guard(_lock); + auto found = std::find(_toWakeup.begin(), _toWakeup.end(), toWakeup); + assert (found != _toWakeup.end()); + _toWakeup.erase(found); +} + +void +WakeupService::runLoop() { + bool done = false; + while ( ! done ) { + { + std::lock_guard guard(_lock); + for (IWakeup *toWakeup: _toWakeup) { + toWakeup->wakeup(); + } + done = _closed; + } + if ( ! done) { + std::this_thread::sleep_for(_naptime); + } + } + +} + +void +WakeupService::run(WakeupService * service) { + service->runLoop(); +} + +} + diff --git a/vespalib/src/vespa/vespalib/util/wakeupservice.h b/vespalib/src/vespa/vespalib/util/wakeupservice.h new file mode 100644 index 00000000000..2eb29edb718 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/wakeupservice.h @@ -0,0 +1,35 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iwakeupservice.h" +#include "time.h" +#include <mutex> +#include <vector> +#include <thread> + +namespace vespalib { + +class WakeupService : public IWakeupService { +public: + WakeupService(duration napTime); + WakeupService(const WakeupService &) = delete; + WakeupService & operator=(const WakeupService &) = delete; + ~WakeupService() override; + /** + * Register the one to be woken up + */ + std::shared_ptr<IDestructorCallback> registerForWakeup(IWakeup * toWakeup) override; +private: + class Registration; + void unregister(IWakeup * toWakeup); + void runLoop(); + static void run(WakeupService *); + duration _naptime; + std::mutex _lock; + bool _closed; + std::vector<IWakeup *> _toWakeup; + std::unique_ptr<std::thread> _thread; +}; + +} |