From 9d83cff2851b2126700b1b3cf7b4080ac7c8fb81 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 21 Feb 2022 18:59:31 +0100 Subject: Revert "Use a common FNET_Transport owned by Proton in both SceduledExecutor …" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- staging_vespalib/src/tests/timer/timer_test.cpp | 40 ++++++++++++---------- .../src/vespa/vespalib/util/scheduledexecutor.cpp | 29 ++++++++++------ .../src/vespa/vespalib/util/scheduledexecutor.h | 11 +++--- 3 files changed, 46 insertions(+), 34 deletions(-) (limited to 'staging_vespalib/src') diff --git a/staging_vespalib/src/tests/timer/timer_test.cpp b/staging_vespalib/src/tests/timer/timer_test.cpp index 1f0ee81e4e6..9d04500b8cd 100644 --- a/staging_vespalib/src/tests/timer/timer_test.cpp +++ b/staging_vespalib/src/tests/timer/timer_test.cpp @@ -2,15 +2,18 @@ #include #include -#include -#include -#include using namespace vespalib; using vespalib::Executor; typedef Executor::Task Task; -namespace { +class Test : public TestApp +{ +public: + int Main() override; + void testScheduling(); + void testReset(); +}; class TestTask : public Task { private: @@ -20,36 +23,35 @@ public: void run() override { _latch.countDown(); } }; +int +Test::Main() +{ + TEST_INIT("timer_test"); + testScheduling(); + testReset(); + TEST_DONE(); } -TEST("testScheduling") { +void Test::testScheduling() +{ vespalib::CountDownLatch latch1(3); vespalib::CountDownLatch latch2(2); - FastOS_ThreadPool threadPool(64_Ki); - FNET_Transport transport; - transport.Start(&threadPool); - ScheduledExecutor timer(transport); + ScheduledExecutor timer; timer.scheduleAtFixedRate(std::make_unique(latch1), 100ms, 200ms); timer.scheduleAtFixedRate(std::make_unique(latch2), 500ms, 500ms); EXPECT_TRUE(latch1.await(60s)); EXPECT_TRUE(latch2.await(60s)); - timer.reset(); - transport.ShutDown(true); } -TEST("testReset") { +void Test::testReset() +{ vespalib::CountDownLatch latch1(2); - FastOS_ThreadPool threadPool(64_Ki); - FNET_Transport transport; - transport.Start(&threadPool); - ScheduledExecutor timer(transport); + ScheduledExecutor timer; timer.scheduleAtFixedRate(std::make_unique(latch1), 2s, 3s); timer.reset(); EXPECT_TRUE(!latch1.await(3s)); timer.scheduleAtFixedRate(std::make_unique(latch1), 200ms, 300ms); EXPECT_TRUE(latch1.await(60s)); - timer.reset(); - transport.ShutDown(true); } -TEST_MAIN() { TEST_RUN_ALL(); } +TEST_APPHOOK(Test) diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp index 99254240f3c..f5384999cb8 100644 --- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp @@ -3,10 +3,11 @@ #include #include #include +#include namespace vespalib { -using Task = vespalib::Executor::Task; +typedef vespalib::Executor::Task Task; class TimerTask : public FNET_Task { @@ -15,8 +16,8 @@ private: TimerTask&operator=(const TimerTask &); FNET_Scheduler *_scheduler; - Task::UP _task; - duration _interval; + Task::UP _task; + duration _interval; public: TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval) : FNET_Task(scheduler), @@ -34,15 +35,21 @@ public: } }; -ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) - : _transport(transport), +ScheduledExecutor::ScheduledExecutor() + : _threadPool(128_Ki), + _transport(new FNET_Transport()), _lock(), _taskList() -{ } +{ + _transport->Start(&_threadPool); +} ScheduledExecutor::~ScheduledExecutor() { - reset(); + std::lock_guard guard(_lock); + _transport->ShutDown(true); + _threadPool.Close(); + _taskList.clear(); } @@ -50,7 +57,7 @@ void ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval) { std::lock_guard guard(_lock); - auto tTask = std::make_unique(_transport.GetScheduler(), std::move(task), interval); + TimerTaskPtr tTask(new TimerTask(_transport->GetScheduler(), std::move(task), interval)); _taskList.push_back(std::move(tTask)); _taskList.back()->Schedule(to_s(delay)); } @@ -59,10 +66,10 @@ void ScheduledExecutor::reset() { std::lock_guard guard(_lock); - for (auto & task : _taskList) { - task->Unschedule(); - } + _transport->ShutDown(true); _taskList.clear(); + _transport = std::make_unique(); + _transport->Start(&_threadPool); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h index 1be49f6ff45..dcc16a8c379 100644 --- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h @@ -3,6 +3,7 @@ #include #include +#include #include class FNET_Transport; @@ -19,8 +20,10 @@ class TimerTask; class ScheduledExecutor { private: - using TaskList = std::vector>; - FNET_Transport & _transport; + typedef std::unique_ptr TimerTaskPtr; + typedef std::vector TaskList; + FastOS_ThreadPool _threadPool; + std::unique_ptr _transport; std::mutex _lock; TaskList _taskList; @@ -28,7 +31,7 @@ public: /** * Create a new timer, capable of scheduling tasks at fixed intervals. */ - ScheduledExecutor(FNET_Transport & transport); + ScheduledExecutor(); /** * Destroys this timer, finishing the current task executing and then @@ -43,7 +46,7 @@ public: * @param delay The delay to wait before first execution. * @param interval The interval in seconds. */ - void scheduleAtFixedRate(std::unique_ptr task, duration delay, duration interval); + void scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval); /** * Reset timer, clearing the list of task to execute. -- cgit v1.2.3