summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2022-12-08 14:29:03 +0000
committerGeir Storli <geirst@yahooinc.com>2022-12-08 14:29:03 +0000
commit509b16095830b049150d564164432355d59af9f9 (patch)
tree75e6d4e6a0ebd1abd337c1bf9bf494d4f7b901a3 /searchcore
parentdc85b21d427069b982d8e0888c0a4062e5491fb1 (diff)
Add class that posts tasks at a regular interval to another executor.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/common/timer/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/common/timer/timer_test.cpp81
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt3
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h28
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp35
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h29
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h16
7 files changed, 156 insertions, 37 deletions
diff --git a/searchcore/src/tests/proton/common/timer/CMakeLists.txt b/searchcore/src/tests/proton/common/timer/CMakeLists.txt
index 89b9ecc688a..54d31e7e888 100644
--- a/searchcore/src/tests/proton/common/timer/CMakeLists.txt
+++ b/searchcore/src/tests/proton/common/timer/CMakeLists.txt
@@ -4,5 +4,6 @@ vespa_add_executable(searchcore_common_timer_test_app TEST
timer_test.cpp
DEPENDS
searchcore_pcommon
+ GTest::GTest
)
vespa_add_test(NAME searchcore_common_timer_test_app COMMAND searchcore_common_timer_test_app)
diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp
index 9eea67623b6..a0ad3378b09 100644
--- a/searchcore/src/tests/proton/common/timer/timer_test.cpp
+++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp
@@ -1,13 +1,17 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/fastos/thread.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/searchcore/proton/common/scheduled_forward_executor.h>
#include <vespa/searchcore/proton/common/scheduledexecutor.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/count_down_latch.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/fnet/transport.h>
-#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
using vespalib::Executor;
-typedef Executor::Task Task;
+using namespace proton;
+using Task = Executor::Task;
namespace {
@@ -21,34 +25,63 @@ public:
}
-TEST("testScheduling") {
+template <typename T>
+std::unique_ptr<T> make_scheduled_executor(FNET_Transport& transport, vespalib::Executor& executor);
+
+template <>
+std::unique_ptr<ScheduledExecutor>
+make_scheduled_executor<ScheduledExecutor>(FNET_Transport& transport, vespalib::Executor&) {
+ return std::make_unique<ScheduledExecutor>(transport);
+}
+
+template <>
+std::unique_ptr<ScheduledForwardExecutor>
+make_scheduled_executor<ScheduledForwardExecutor>(FNET_Transport& transport, vespalib::Executor& executor) {
+ return std::make_unique<ScheduledForwardExecutor>(transport, executor);
+}
+
+template <typename ScheduledT>
+class ScheduledExecutorTest : public testing::Test {
+public:
+ FastOS_ThreadPool threadPool;
+ FNET_Transport transport;
+ vespalib::ThreadStackExecutor executor;
+ std::unique_ptr<ScheduledT> timer;
+
+ ScheduledExecutorTest()
+ : threadPool(64_Ki),
+ transport(),
+ executor(1, 64_Ki)
+ {
+ transport.Start(&threadPool);
+ timer = make_scheduled_executor<ScheduledT>(transport, executor);
+ }
+ ~ScheduledExecutorTest() {
+ timer->reset();
+ transport.ShutDown(true);
+ }
+};
+
+using ScheduledTypes = ::testing::Types<ScheduledExecutor, ScheduledForwardExecutor>;
+
+TYPED_TEST_SUITE(ScheduledExecutorTest, ScheduledTypes);
+
+TYPED_TEST(ScheduledExecutorTest, test_scheduling) {
vespalib::CountDownLatch latch1(3);
vespalib::CountDownLatch latch2(2);
- FastOS_ThreadPool threadPool(64_Ki);
- FNET_Transport transport;
- transport.Start(&threadPool);
- proton::ScheduledExecutor timer(transport);
- timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms);
- timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms);
+ this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms);
+ this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms);
EXPECT_TRUE(latch1.await(60s));
EXPECT_TRUE(latch2.await(60s));
- timer.reset();
- transport.ShutDown(true);
}
-TEST("testReset") {
+TYPED_TEST(ScheduledExecutorTest, test_reset) {
vespalib::CountDownLatch latch1(2);
- FastOS_ThreadPool threadPool(64_Ki);
- FNET_Transport transport;
- transport.Start(&threadPool);
- proton::ScheduledExecutor timer(transport);
- timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
- timer.reset();
+ this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
+ this->timer->reset();
EXPECT_TRUE(!latch1.await(3s));
- timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
+ this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
EXPECT_TRUE(latch1.await(60s));
- timer.reset();
- transport.ShutDown(true);
}
-TEST_MAIN() { TEST_RUN_ALL(); }
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
index fffbd12764b..d42f764fa77 100644
--- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
@@ -20,12 +20,13 @@ vespa_add_library(searchcore_pcommon STATIC
pendinglidtracker.cpp
replay_feed_token_factory.cpp
replay_feedtoken_state.cpp
+ scheduled_forward_executor.cpp
+ scheduledexecutor.cpp
select_utils.cpp
selectcontext.cpp
selectpruner.cpp
state_reporter_utils.cpp
statusreport.cpp
- scheduledexecutor.cpp
DEPENDS
searchcore_proton_metrics
searchcore_fconfig
diff --git a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h
new file mode 100644
index 00000000000..b95e65da045
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/time.h>
+#include <memory>
+
+namespace proton {
+
+/**
+ * Interface used to run Tasks at a regular interval.
+ */
+class IScheduledExecutor {
+public:
+ virtual ~IScheduledExecutor() = default;
+
+ /**
+ * Schedule a new task to be executed at specified intervals.
+ *
+ * @param task The task to schedule.
+ * @param delay The delay to wait before first execution.
+ * @param interval The interval between the task is executed.
+ */
+ virtual void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task,
+ vespalib::duration delay, vespalib::duration interval) = 0;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
new file mode 100644
index 00000000000..acb94c020f6
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
@@ -0,0 +1,35 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "scheduled_forward_executor.h"
+#include <vespa/vespalib/util/lambdatask.h>
+
+using vespalib::Executor;
+using vespalib::makeLambdaTask;
+
+namespace proton {
+
+ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport,
+ vespalib::Executor& executor)
+ : _scheduler(transport),
+ _executor(executor)
+{
+}
+
+void
+ScheduledForwardExecutor::reset()
+{
+ _scheduler.reset();
+}
+
+void
+ScheduledForwardExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task,
+ vespalib::duration delay, vespalib::duration interval)
+{
+ _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() {
+ _executor.execute(makeLambdaTask([&]() {
+ my_task->run();
+ }));
+ }), delay, interval);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
new file mode 100644
index 00000000000..eb7120527d7
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "i_scheduled_executor.h"
+#include "scheduledexecutor.h"
+
+class FNET_Transport;
+
+namespace proton {
+
+/**
+ * This class posts Tasks at a regular interval to another executor which runs them.
+ */
+class ScheduledForwardExecutor : public IScheduledExecutor {
+private:
+ ScheduledExecutor _scheduler;
+ vespalib::Executor& _executor;
+
+public:
+ ScheduledForwardExecutor(FNET_Transport& transport, vespalib::Executor& executor);
+ void reset();
+
+ void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task,
+ vespalib::duration delay, vespalib::duration interval) override;
+
+};
+
+}
+
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
index 80c8b7edd15..dfb16c94d7d 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
@@ -1,8 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/time.h>
+#include "i_scheduled_executor.h"
#include <mutex>
#include <vector>
@@ -17,7 +16,7 @@ class TimerTask;
* interval. The timer can be reset to clear all tasks currently being
* scheduled.
*/
-class ScheduledExecutor
+class ScheduledExecutor : public IScheduledExecutor
{
private:
using TaskList = std::vector<std::unique_ptr<TimerTask>>;
@@ -37,16 +36,9 @@ public:
* Destroys this timer, finishing the current task executing and then
* finishing.
*/
- ~ScheduledExecutor();
+ ~ScheduledExecutor() override;
- /**
- * Schedule new task to be executed at specified intervals.
- *
- * @param task The task to schedule.
- * @param delay The delay to wait before first execution.
- * @param interval The interval in seconds.
- */
- void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval);
+ void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override;
/**
* Reset timer, clearing the list of task to execute.