diff options
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h | 4 | ||||
-rw-r--r-- | vespalib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/tests/wakeupservice/CMakeLists.txt | 8 | ||||
-rw-r--r-- | vespalib/src/tests/wakeupservice/wakeupservice_test.cpp | 48 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/executor.h | 15 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/iwakeupservice.h | 20 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/wakeupservice.cpp | 90 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/wakeupservice.h | 35 |
9 files changed, 215 insertions, 7 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index 0e931838279..3fe6fb5d678 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -14,7 +14,7 @@ namespace vespalib { * Interface class to run multiple tasks in parallel, but tasks with same * id has to be run in sequence. */ -class ISequencedTaskExecutor +class ISequencedTaskExecutor : public vespalib::IWakeup { public: class ExecutorId { @@ -62,7 +62,7 @@ public: /** * Call this one to ensure you get the attention of the workers. */ - virtual void wakeup() { } + void wakeup() override { } /** * Wrap lambda function into a task and schedule it to be run. diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index 2f657b1bed0..ff4e62cc67c 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -151,6 +151,7 @@ vespa_define_module( src/tests/valgrind src/tests/visit_ranges src/tests/wakeup + src/tests/wakeupservice src/tests/websocket src/tests/zcurve diff --git a/vespalib/src/tests/wakeupservice/CMakeLists.txt b/vespalib/src/tests/wakeupservice/CMakeLists.txt new file mode 100644 index 00000000000..b00fe475c82 --- /dev/null +++ b/vespalib/src/tests/wakeupservice/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_wakeupservice_test_app TEST + SOURCES + wakeupservice_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_wakeupservice_test_app COMMAND vespalib_wakeupservice_test_app) diff --git a/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp b/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp new file mode 100644 index 00000000000..7b24d47d678 --- /dev/null +++ b/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp @@ -0,0 +1,48 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/wakeupservice.h> + +using namespace vespalib; + +struct WakeupCounter : public IWakeup { + WakeupCounter() : _count(0) {} + void wakeup() override { _count++; } + std::atomic<uint64_t> _count; +}; + +TEST("require that wakeup is called") { + WakeupCounter a; + WakeupService service(1ms); + EXPECT_EQUAL(0u, a._count); + auto ra = service.registerForWakeup(&a); + EXPECT_TRUE(ra); + while (a._count == 0) { + std::this_thread::sleep_for(1ms); + } + ra.reset(); + uint64_t countAtStop = a._count; + std::this_thread::sleep_for(1s); + EXPECT_EQUAL(countAtStop, a._count); +} + +TEST("require that same wakeup can only be registered once, but reregisterd after unregistered.") { + WakeupCounter a; + WakeupService service(1ms); + EXPECT_EQUAL(0u, a._count); + auto ra1 = service.registerForWakeup(&a); + EXPECT_TRUE(ra1); + auto ra2 = service.registerForWakeup(&a); + EXPECT_FALSE(ra2); + while (a._count == 0) { + std::this_thread::sleep_for(1ms); + } + ra1.reset(); + uint64_t countAtStop = a._count; + ra2 = service.registerForWakeup(&a); + EXPECT_TRUE(ra2); + std::this_thread::sleep_for(1s); + EXPECT_LESS(countAtStop, a._count); +} + + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 75ea02d448e..92371418c8a 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -69,6 +69,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT time.cpp unwind_message.cpp valgrind.cpp + wakeupservice.cpp zstdcompressor.cpp DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h index 6ef8f182ec4..52d4252ce78 100644 --- a/vespalib/src/vespa/vespalib/util/executor.h +++ b/vespalib/src/vespa/vespalib/util/executor.h @@ -6,12 +6,21 @@ namespace vespalib { +class IWakeup { +public: + virtual ~IWakeup() = default; + /** + * In case you have a lazy executor that naps inbetween. + **/ + virtual void wakeup() = 0; +}; + /** * An executor decouples the execution of a task from the request of * executing that task. Also, tasks are typically executed * concurrently in multiple threads. **/ -class Executor +class Executor : public IWakeup { public: /** @@ -37,10 +46,6 @@ public: **/ virtual Task::UP execute(Task::UP task) = 0; - /** - * In case you have a lazy executor that naps inbetween. - **/ - virtual void wakeup() = 0; virtual ~Executor() = default; }; diff --git a/vespalib/src/vespa/vespalib/util/iwakeupservice.h b/vespalib/src/vespa/vespalib/util/iwakeupservice.h new file mode 100644 index 00000000000..fbd8a299fe7 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/iwakeupservice.h @@ -0,0 +1,20 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "executor.h" +#include "idestructorcallback.h" + +namespace vespalib { + +/** + * Interface to register for receiving wakeup calls. + * The registration will last as long as the returned object is kept alive. + **/ +class IWakeupService { +public: + virtual ~IWakeupService() = default; + virtual std::shared_ptr<IDestructorCallback> registerForWakeup(IWakeup * toWakeup) = 0; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/wakeupservice.cpp b/vespalib/src/vespa/vespalib/util/wakeupservice.cpp new file mode 100644 index 00000000000..fbb4b1c12e6 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/wakeupservice.cpp @@ -0,0 +1,90 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "wakeupservice.h" +#include <cassert> + +namespace vespalib { + +WakeupService::WakeupService(duration napTime) + : _naptime(napTime), + _lock(), + _closed(false), + _toWakeup(), + _thread() +{ +} + +WakeupService::~WakeupService() +{ + { + std::lock_guard guard(_lock); + assert(_toWakeup.empty()); + _closed = true; + } + if (_thread) { + _thread->join(); + } +} + +class WakeupService::Registration : public IDestructorCallback { +public: + Registration(WakeupService * service, IWakeup * toWakeup) noexcept + : _service(service), + _toWakeup(toWakeup) + { } + Registration(const Registration &) = delete; + Registration & operator=(const Registration &) = delete; + ~Registration() override{ + _service->unregister(_toWakeup); + } +private: + WakeupService * _service; + IWakeup * _toWakeup; +}; + +std::shared_ptr<IDestructorCallback> +WakeupService::registerForWakeup(IWakeup * toWakeup) { + std::lock_guard guard(_lock); + auto found = std::find(_toWakeup.begin(), _toWakeup.end(), toWakeup); + if (found != _toWakeup.end()) return std::shared_ptr<IDestructorCallback>(); + + _toWakeup.push_back(toWakeup); + if ( ! _thread) { + _thread = std::make_unique<std::thread>(WakeupService::run, this); + } + return std::make_shared<Registration>(this, toWakeup); +} + +void +WakeupService::unregister(IWakeup * toWakeup) { + std::lock_guard guard(_lock); + auto found = std::find(_toWakeup.begin(), _toWakeup.end(), toWakeup); + assert (found != _toWakeup.end()); + _toWakeup.erase(found); +} + +void +WakeupService::runLoop() { + bool done = false; + while ( ! done ) { + { + std::lock_guard guard(_lock); + for (IWakeup *toWakeup: _toWakeup) { + toWakeup->wakeup(); + } + done = _closed; + } + if ( ! done) { + std::this_thread::sleep_for(_naptime); + } + } + +} + +void +WakeupService::run(WakeupService * service) { + service->runLoop(); +} + +} + diff --git a/vespalib/src/vespa/vespalib/util/wakeupservice.h b/vespalib/src/vespa/vespalib/util/wakeupservice.h new file mode 100644 index 00000000000..2eb29edb718 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/wakeupservice.h @@ -0,0 +1,35 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iwakeupservice.h" +#include "time.h" +#include <mutex> +#include <vector> +#include <thread> + +namespace vespalib { + +class WakeupService : public IWakeupService { +public: + WakeupService(duration napTime); + WakeupService(const WakeupService &) = delete; + WakeupService & operator=(const WakeupService &) = delete; + ~WakeupService() override; + /** + * Register the one to be woken up + */ + std::shared_ptr<IDestructorCallback> registerForWakeup(IWakeup * toWakeup) override; +private: + class Registration; + void unregister(IWakeup * toWakeup); + void runLoop(); + static void run(WakeupService *); + duration _naptime; + std::mutex _lock; + bool _closed; + std::vector<IWakeup *> _toWakeup; + std::unique_ptr<std::thread> _thread; +}; + +} |