diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-20 21:24:22 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-20 23:46:18 +0000 |
commit | 1bb2800fb1bf312689115a7230b1d3ead74d4ac6 (patch) | |
tree | 29c84e9a2b9451e0fc4cd8d5e2908c26eb1758fd /staging_vespalib | |
parent | 1cfea65b9bc71b472e9dc3370b120cf428b6ece0 (diff) |
Use a common FNET_Transport owned by Proton in both SceduledExecutor and TransactionLogServer.
This reduces the number of Transport object by 1 per document type and netto 1 in Proton.
Each of them contains 2 threads.
In addition it uses a common Transport for the RpcFileAcquirer objects used during config fetching.
This prevents creating 3 temporary Transport objects on every reconfig.
Diffstat (limited to 'staging_vespalib')
3 files changed, 34 insertions, 46 deletions
diff --git a/staging_vespalib/src/tests/timer/timer_test.cpp b/staging_vespalib/src/tests/timer/timer_test.cpp index 9d04500b8cd..1f0ee81e4e6 100644 --- a/staging_vespalib/src/tests/timer/timer_test.cpp +++ b/staging_vespalib/src/tests/timer/timer_test.cpp @@ -2,18 +2,15 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/scheduledexecutor.h> +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> using namespace vespalib; using vespalib::Executor; typedef Executor::Task Task; -class Test : public TestApp -{ -public: - int Main() override; - void testScheduling(); - void testReset(); -}; +namespace { class TestTask : public Task { private: @@ -23,35 +20,36 @@ public: void run() override { _latch.countDown(); } }; -int -Test::Main() -{ - TEST_INIT("timer_test"); - testScheduling(); - testReset(); - TEST_DONE(); } -void Test::testScheduling() -{ +TEST("testScheduling") { vespalib::CountDownLatch latch1(3); vespalib::CountDownLatch latch2(2); - ScheduledExecutor timer; + FastOS_ThreadPool threadPool(64_Ki); + FNET_Transport transport; + transport.Start(&threadPool); + ScheduledExecutor timer(transport); timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); EXPECT_TRUE(latch1.await(60s)); EXPECT_TRUE(latch2.await(60s)); + timer.reset(); + transport.ShutDown(true); } -void Test::testReset() -{ +TEST("testReset") { vespalib::CountDownLatch latch1(2); - ScheduledExecutor timer; + FastOS_ThreadPool threadPool(64_Ki); + FNET_Transport transport; + transport.Start(&threadPool); + ScheduledExecutor timer(transport); timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); timer.reset(); EXPECT_TRUE(!latch1.await(3s)); timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); EXPECT_TRUE(latch1.await(60s)); + timer.reset(); + transport.ShutDown(true); } -TEST_APPHOOK(Test) +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp index f5384999cb8..99254240f3c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp @@ -3,11 +3,10 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/task.h> #include <vespa/fnet/transport.h> -#include <vespa/vespalib/util/size_literals.h> namespace vespalib { -typedef vespalib::Executor::Task Task; +using Task = vespalib::Executor::Task; class TimerTask : public FNET_Task { @@ -16,8 +15,8 @@ private: TimerTask&operator=(const TimerTask &); FNET_Scheduler *_scheduler; - Task::UP _task; - duration _interval; + Task::UP _task; + duration _interval; public: TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval) : FNET_Task(scheduler), @@ -35,21 +34,15 @@ public: } }; -ScheduledExecutor::ScheduledExecutor() - : _threadPool(128_Ki), - _transport(new FNET_Transport()), +ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) + : _transport(transport), _lock(), _taskList() -{ - _transport->Start(&_threadPool); -} +{ } ScheduledExecutor::~ScheduledExecutor() { - std::lock_guard guard(_lock); - _transport->ShutDown(true); - _threadPool.Close(); - _taskList.clear(); + reset(); } @@ -57,7 +50,7 @@ void ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval) { std::lock_guard guard(_lock); - TimerTaskPtr tTask(new TimerTask(_transport->GetScheduler(), std::move(task), interval)); + auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval); _taskList.push_back(std::move(tTask)); _taskList.back()->Schedule(to_s(delay)); } @@ -66,10 +59,10 @@ void ScheduledExecutor::reset() { std::lock_guard guard(_lock); - _transport->ShutDown(true); + for (auto & task : _taskList) { + task->Unschedule(); + } _taskList.clear(); - _transport = std::make_unique<FNET_Transport>(); - _transport->Start(&_threadPool); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h index dcc16a8c379..1be49f6ff45 100644 --- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h @@ -3,7 +3,6 @@ #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/time.h> -#include <vespa/fastos/thread.h> #include <vector> class FNET_Transport; @@ -20,10 +19,8 @@ class TimerTask; class ScheduledExecutor { private: - typedef std::unique_ptr<TimerTask> TimerTaskPtr; - typedef std::vector<TimerTaskPtr> TaskList; - FastOS_ThreadPool _threadPool; - std::unique_ptr<FNET_Transport> _transport; + using TaskList = std::vector<std::unique_ptr<TimerTask>>; + FNET_Transport & _transport; std::mutex _lock; TaskList _taskList; @@ -31,7 +28,7 @@ public: /** * Create a new timer, capable of scheduling tasks at fixed intervals. */ - ScheduledExecutor(); + ScheduledExecutor(FNET_Transport & transport); /** * Destroys this timer, finishing the current task executing and then @@ -46,7 +43,7 @@ public: * @param delay The delay to wait before first execution. * @param interval The interval in seconds. */ - void scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval); + void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval); /** * Reset timer, clearing the list of task to execute. |