aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h4
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/wakeupservice/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/wakeupservice/wakeupservice_test.cpp48
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/util/executor.h15
-rw-r--r--vespalib/src/vespa/vespalib/util/iwakeupservice.h20
-rw-r--r--vespalib/src/vespa/vespalib/util/wakeupservice.cpp90
-rw-r--r--vespalib/src/vespa/vespalib/util/wakeupservice.h35
9 files changed, 215 insertions, 7 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index 0e931838279..3fe6fb5d678 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -14,7 +14,7 @@ namespace vespalib {
* Interface class to run multiple tasks in parallel, but tasks with same
* id has to be run in sequence.
*/
-class ISequencedTaskExecutor
+class ISequencedTaskExecutor : public vespalib::IWakeup
{
public:
class ExecutorId {
@@ -62,7 +62,7 @@ public:
/**
* Call this one to ensure you get the attention of the workers.
*/
- virtual void wakeup() { }
+ void wakeup() override { }
/**
* Wrap lambda function into a task and schedule it to be run.
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index 2f657b1bed0..ff4e62cc67c 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -151,6 +151,7 @@ vespa_define_module(
src/tests/valgrind
src/tests/visit_ranges
src/tests/wakeup
+ src/tests/wakeupservice
src/tests/websocket
src/tests/zcurve
diff --git a/vespalib/src/tests/wakeupservice/CMakeLists.txt b/vespalib/src/tests/wakeupservice/CMakeLists.txt
new file mode 100644
index 00000000000..b00fe475c82
--- /dev/null
+++ b/vespalib/src/tests/wakeupservice/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_wakeupservice_test_app TEST
+ SOURCES
+ wakeupservice_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_wakeupservice_test_app COMMAND vespalib_wakeupservice_test_app)
diff --git a/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp b/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp
new file mode 100644
index 00000000000..7b24d47d678
--- /dev/null
+++ b/vespalib/src/tests/wakeupservice/wakeupservice_test.cpp
@@ -0,0 +1,48 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/wakeupservice.h>
+
+using namespace vespalib;
+
+struct WakeupCounter : public IWakeup {
+ WakeupCounter() : _count(0) {}
+ void wakeup() override { _count++; }
+ std::atomic<uint64_t> _count;
+};
+
+TEST("require that wakeup is called") {
+ WakeupCounter a;
+ WakeupService service(1ms);
+ EXPECT_EQUAL(0u, a._count);
+ auto ra = service.registerForWakeup(&a);
+ EXPECT_TRUE(ra);
+ while (a._count == 0) {
+ std::this_thread::sleep_for(1ms);
+ }
+ ra.reset();
+ uint64_t countAtStop = a._count;
+ std::this_thread::sleep_for(1s);
+ EXPECT_EQUAL(countAtStop, a._count);
+}
+
+TEST("require that same wakeup can only be registered once, but reregisterd after unregistered.") {
+ WakeupCounter a;
+ WakeupService service(1ms);
+ EXPECT_EQUAL(0u, a._count);
+ auto ra1 = service.registerForWakeup(&a);
+ EXPECT_TRUE(ra1);
+ auto ra2 = service.registerForWakeup(&a);
+ EXPECT_FALSE(ra2);
+ while (a._count == 0) {
+ std::this_thread::sleep_for(1ms);
+ }
+ ra1.reset();
+ uint64_t countAtStop = a._count;
+ ra2 = service.registerForWakeup(&a);
+ EXPECT_TRUE(ra2);
+ std::this_thread::sleep_for(1s);
+ EXPECT_LESS(countAtStop, a._count);
+}
+
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 75ea02d448e..92371418c8a 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -69,6 +69,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
time.cpp
unwind_message.cpp
valgrind.cpp
+ wakeupservice.cpp
zstdcompressor.cpp
DEPENDS
)
diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h
index 6ef8f182ec4..52d4252ce78 100644
--- a/vespalib/src/vespa/vespalib/util/executor.h
+++ b/vespalib/src/vespa/vespalib/util/executor.h
@@ -6,12 +6,21 @@
namespace vespalib {
+class IWakeup {
+public:
+ virtual ~IWakeup() = default;
+ /**
+ * In case you have a lazy executor that naps inbetween.
+ **/
+ virtual void wakeup() = 0;
+};
+
/**
* An executor decouples the execution of a task from the request of
* executing that task. Also, tasks are typically executed
* concurrently in multiple threads.
**/
-class Executor
+class Executor : public IWakeup
{
public:
/**
@@ -37,10 +46,6 @@ public:
**/
virtual Task::UP execute(Task::UP task) = 0;
- /**
- * In case you have a lazy executor that naps inbetween.
- **/
- virtual void wakeup() = 0;
virtual ~Executor() = default;
};
diff --git a/vespalib/src/vespa/vespalib/util/iwakeupservice.h b/vespalib/src/vespa/vespalib/util/iwakeupservice.h
new file mode 100644
index 00000000000..fbd8a299fe7
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/iwakeupservice.h
@@ -0,0 +1,20 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "executor.h"
+#include "idestructorcallback.h"
+
+namespace vespalib {
+
+/**
+ * Interface to register for receiving wakeup calls.
+ * The registration will last as long as the returned object is kept alive.
+ **/
+class IWakeupService {
+public:
+ virtual ~IWakeupService() = default;
+ virtual std::shared_ptr<IDestructorCallback> registerForWakeup(IWakeup * toWakeup) = 0;
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/wakeupservice.cpp b/vespalib/src/vespa/vespalib/util/wakeupservice.cpp
new file mode 100644
index 00000000000..fbb4b1c12e6
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/wakeupservice.cpp
@@ -0,0 +1,90 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "wakeupservice.h"
+#include <cassert>
+
+namespace vespalib {
+
+WakeupService::WakeupService(duration napTime)
+ : _naptime(napTime),
+ _lock(),
+ _closed(false),
+ _toWakeup(),
+ _thread()
+{
+}
+
+WakeupService::~WakeupService()
+{
+ {
+ std::lock_guard guard(_lock);
+ assert(_toWakeup.empty());
+ _closed = true;
+ }
+ if (_thread) {
+ _thread->join();
+ }
+}
+
+class WakeupService::Registration : public IDestructorCallback {
+public:
+ Registration(WakeupService * service, IWakeup * toWakeup) noexcept
+ : _service(service),
+ _toWakeup(toWakeup)
+ { }
+ Registration(const Registration &) = delete;
+ Registration & operator=(const Registration &) = delete;
+ ~Registration() override{
+ _service->unregister(_toWakeup);
+ }
+private:
+ WakeupService * _service;
+ IWakeup * _toWakeup;
+};
+
+std::shared_ptr<IDestructorCallback>
+WakeupService::registerForWakeup(IWakeup * toWakeup) {
+ std::lock_guard guard(_lock);
+ auto found = std::find(_toWakeup.begin(), _toWakeup.end(), toWakeup);
+ if (found != _toWakeup.end()) return std::shared_ptr<IDestructorCallback>();
+
+ _toWakeup.push_back(toWakeup);
+ if ( ! _thread) {
+ _thread = std::make_unique<std::thread>(WakeupService::run, this);
+ }
+ return std::make_shared<Registration>(this, toWakeup);
+}
+
+void
+WakeupService::unregister(IWakeup * toWakeup) {
+ std::lock_guard guard(_lock);
+ auto found = std::find(_toWakeup.begin(), _toWakeup.end(), toWakeup);
+ assert (found != _toWakeup.end());
+ _toWakeup.erase(found);
+}
+
+void
+WakeupService::runLoop() {
+ bool done = false;
+ while ( ! done ) {
+ {
+ std::lock_guard guard(_lock);
+ for (IWakeup *toWakeup: _toWakeup) {
+ toWakeup->wakeup();
+ }
+ done = _closed;
+ }
+ if ( ! done) {
+ std::this_thread::sleep_for(_naptime);
+ }
+ }
+
+}
+
+void
+WakeupService::run(WakeupService * service) {
+ service->runLoop();
+}
+
+}
+
diff --git a/vespalib/src/vespa/vespalib/util/wakeupservice.h b/vespalib/src/vespa/vespalib/util/wakeupservice.h
new file mode 100644
index 00000000000..2eb29edb718
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/wakeupservice.h
@@ -0,0 +1,35 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "iwakeupservice.h"
+#include "time.h"
+#include <mutex>
+#include <vector>
+#include <thread>
+
+namespace vespalib {
+
+class WakeupService : public IWakeupService {
+public:
+ WakeupService(duration napTime);
+ WakeupService(const WakeupService &) = delete;
+ WakeupService & operator=(const WakeupService &) = delete;
+ ~WakeupService() override;
+ /**
+ * Register the one to be woken up
+ */
+ std::shared_ptr<IDestructorCallback> registerForWakeup(IWakeup * toWakeup) override;
+private:
+ class Registration;
+ void unregister(IWakeup * toWakeup);
+ void runLoop();
+ static void run(WakeupService *);
+ duration _naptime;
+ std::mutex _lock;
+ bool _closed;
+ std::vector<IWakeup *> _toWakeup;
+ std::unique_ptr<std::thread> _thread;
+};
+
+}