diff options
author | Geir Storli <geirst@yahooinc.com> | 2022-12-08 14:29:03 +0000 |
---|---|---|
committer | Geir Storli <geirst@yahooinc.com> | 2022-12-08 14:29:03 +0000 |
commit | 509b16095830b049150d564164432355d59af9f9 (patch) | |
tree | 75e6d4e6a0ebd1abd337c1bf9bf494d4f7b901a3 /searchcore | |
parent | dc85b21d427069b982d8e0888c0a4062e5491fb1 (diff) |
Add class that posts tasks at a regular interval to another executor.
Diffstat (limited to 'searchcore')
7 files changed, 156 insertions, 37 deletions
diff --git a/searchcore/src/tests/proton/common/timer/CMakeLists.txt b/searchcore/src/tests/proton/common/timer/CMakeLists.txt index 89b9ecc688a..54d31e7e888 100644 --- a/searchcore/src/tests/proton/common/timer/CMakeLists.txt +++ b/searchcore/src/tests/proton/common/timer/CMakeLists.txt @@ -4,5 +4,6 @@ vespa_add_executable(searchcore_common_timer_test_app TEST timer_test.cpp DEPENDS searchcore_pcommon + GTest::GTest ) vespa_add_test(NAME searchcore_common_timer_test_app COMMAND searchcore_common_timer_test_app) diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp index 9eea67623b6..a0ad3378b09 100644 --- a/searchcore/src/tests/proton/common/timer/timer_test.cpp +++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp @@ -1,13 +1,17 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/fastos/thread.h> +#include <vespa/fnet/transport.h> +#include <vespa/searchcore/proton/common/scheduled_forward_executor.h> #include <vespa/searchcore/proton/common/scheduledexecutor.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/count_down_latch.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/fnet/transport.h> -#include <vespa/fastos/thread.h> +#include <vespa/vespalib/util/threadstackexecutor.h> using vespalib::Executor; -typedef Executor::Task Task; +using namespace proton; +using Task = Executor::Task; namespace { @@ -21,34 +25,63 @@ public: } -TEST("testScheduling") { +template <typename T> +std::unique_ptr<T> make_scheduled_executor(FNET_Transport& transport, vespalib::Executor& executor); + +template <> +std::unique_ptr<ScheduledExecutor> +make_scheduled_executor<ScheduledExecutor>(FNET_Transport& transport, vespalib::Executor&) { + return std::make_unique<ScheduledExecutor>(transport); +} + +template <> +std::unique_ptr<ScheduledForwardExecutor> +make_scheduled_executor<ScheduledForwardExecutor>(FNET_Transport& transport, vespalib::Executor& executor) { + return std::make_unique<ScheduledForwardExecutor>(transport, executor); +} + +template <typename ScheduledT> +class ScheduledExecutorTest : public testing::Test { +public: + FastOS_ThreadPool threadPool; + FNET_Transport transport; + vespalib::ThreadStackExecutor executor; + std::unique_ptr<ScheduledT> timer; + + ScheduledExecutorTest() + : threadPool(64_Ki), + transport(), + executor(1, 64_Ki) + { + transport.Start(&threadPool); + timer = make_scheduled_executor<ScheduledT>(transport, executor); + } + ~ScheduledExecutorTest() { + timer->reset(); + transport.ShutDown(true); + } +}; + +using ScheduledTypes = ::testing::Types<ScheduledExecutor, ScheduledForwardExecutor>; + +TYPED_TEST_SUITE(ScheduledExecutorTest, ScheduledTypes); + +TYPED_TEST(ScheduledExecutorTest, test_scheduling) { vespalib::CountDownLatch latch1(3); vespalib::CountDownLatch latch2(2); - FastOS_ThreadPool threadPool(64_Ki); - FNET_Transport transport; - transport.Start(&threadPool); - proton::ScheduledExecutor timer(transport); - timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); - timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); + this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); + this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); EXPECT_TRUE(latch1.await(60s)); EXPECT_TRUE(latch2.await(60s)); - timer.reset(); - transport.ShutDown(true); } -TEST("testReset") { +TYPED_TEST(ScheduledExecutorTest, test_reset) { vespalib::CountDownLatch latch1(2); - FastOS_ThreadPool threadPool(64_Ki); - FNET_Transport transport; - transport.Start(&threadPool); - proton::ScheduledExecutor timer(transport); - timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); - timer.reset(); + this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); + this->timer->reset(); EXPECT_TRUE(!latch1.await(3s)); - timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); + this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); EXPECT_TRUE(latch1.await(60s)); - timer.reset(); - transport.ShutDown(true); } -TEST_MAIN() { TEST_RUN_ALL(); } +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index fffbd12764b..d42f764fa77 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -20,12 +20,13 @@ vespa_add_library(searchcore_pcommon STATIC pendinglidtracker.cpp replay_feed_token_factory.cpp replay_feedtoken_state.cpp + scheduled_forward_executor.cpp + scheduledexecutor.cpp select_utils.cpp selectcontext.cpp selectpruner.cpp state_reporter_utils.cpp statusreport.cpp - scheduledexecutor.cpp DEPENDS searchcore_proton_metrics searchcore_fconfig diff --git a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h new file mode 100644 index 00000000000..b95e65da045 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h @@ -0,0 +1,28 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/time.h> +#include <memory> + +namespace proton { + +/** + * Interface used to run Tasks at a regular interval. + */ +class IScheduledExecutor { +public: + virtual ~IScheduledExecutor() = default; + + /** + * Schedule a new task to be executed at specified intervals. + * + * @param task The task to schedule. + * @param delay The delay to wait before first execution. + * @param interval The interval between the task is executed. + */ + virtual void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task, + vespalib::duration delay, vespalib::duration interval) = 0; +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp new file mode 100644 index 00000000000..acb94c020f6 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp @@ -0,0 +1,35 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "scheduled_forward_executor.h" +#include <vespa/vespalib/util/lambdatask.h> + +using vespalib::Executor; +using vespalib::makeLambdaTask; + +namespace proton { + +ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport, + vespalib::Executor& executor) + : _scheduler(transport), + _executor(executor) +{ +} + +void +ScheduledForwardExecutor::reset() +{ + _scheduler.reset(); +} + +void +ScheduledForwardExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, + vespalib::duration delay, vespalib::duration interval) +{ + _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() { + _executor.execute(makeLambdaTask([&]() { + my_task->run(); + })); + }), delay, interval); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h new file mode 100644 index 00000000000..eb7120527d7 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_scheduled_executor.h" +#include "scheduledexecutor.h" + +class FNET_Transport; + +namespace proton { + +/** + * This class posts Tasks at a regular interval to another executor which runs them. + */ +class ScheduledForwardExecutor : public IScheduledExecutor { +private: + ScheduledExecutor _scheduler; + vespalib::Executor& _executor; + +public: + ScheduledForwardExecutor(FNET_Transport& transport, vespalib::Executor& executor); + void reset(); + + void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task, + vespalib::duration delay, vespalib::duration interval) override; + +}; + +} + diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h index 80c8b7edd15..dfb16c94d7d 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h @@ -1,8 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/time.h> +#include "i_scheduled_executor.h" #include <mutex> #include <vector> @@ -17,7 +16,7 @@ class TimerTask; * interval. The timer can be reset to clear all tasks currently being * scheduled. */ -class ScheduledExecutor +class ScheduledExecutor : public IScheduledExecutor { private: using TaskList = std::vector<std::unique_ptr<TimerTask>>; @@ -37,16 +36,9 @@ public: * Destroys this timer, finishing the current task executing and then * finishing. */ - ~ScheduledExecutor(); + ~ScheduledExecutor() override; - /** - * Schedule new task to be executed at specified intervals. - * - * @param task The task to schedule. - * @param delay The delay to wait before first execution. - * @param interval The interval in seconds. - */ - void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval); + void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; /** * Reset timer, clearing the list of task to execute. |