diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-18 07:20:18 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-18 07:20:18 +0000 |
commit | 714346ca5e18851c784101537bcb9b0877d413a9 (patch) | |
tree | 121764ca0af5de204005d6b16d00a867f07fc5d8 /searchcore | |
parent | f05f60b1ec40be6f3e60349d65da60bc41555db3 (diff) |
Move ScheduledExecutor to searchcore and drop fnet dependency in staging_vespalib in order to prepare collapsing stagingg vespalib into vespalib
Diffstat (limited to 'searchcore')
11 files changed, 202 insertions, 9 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index 8df693e8ea2..c0b86b509b3 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -80,6 +80,7 @@ vespa_define_module( src/tests/proton/common/hw_info_sampler src/tests/proton/common/operation_rate_tracker src/tests/proton/common/state_reporter_utils + src/tests/proton/common/timer src/tests/proton/docsummary src/tests/proton/document_iterator src/tests/proton/documentdb diff --git a/searchcore/src/tests/proton/common/timer/.gitignore b/searchcore/src/tests/proton/common/timer/.gitignore new file mode 100644 index 00000000000..9031e40152a --- /dev/null +++ b/searchcore/src/tests/proton/common/timer/.gitignore @@ -0,0 +1 @@ +staging_vespalib_timer_test_app diff --git a/searchcore/src/tests/proton/common/timer/CMakeLists.txt b/searchcore/src/tests/proton/common/timer/CMakeLists.txt new file mode 100644 index 00000000000..7afd366ed89 --- /dev/null +++ b/searchcore/src/tests/proton/common/timer/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(staging_vespalib_timer_test_app TEST + SOURCES + timer_test.cpp + DEPENDS + searchcore_pcommon +) +vespa_add_test(NAME staging_vespalib_timer_test_app COMMAND staging_vespalib_timer_test_app) diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp new file mode 100644 index 00000000000..9eea67623b6 --- /dev/null +++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp @@ -0,0 +1,54 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/searchcore/proton/common/scheduledexecutor.h> +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> + +using vespalib::Executor; +typedef Executor::Task Task; + +namespace { + +class TestTask : public Task { +private: + vespalib::CountDownLatch &_latch; +public: + TestTask(vespalib::CountDownLatch & latch) : _latch(latch) { } + void run() override { _latch.countDown(); } +}; + +} + +TEST("testScheduling") { + vespalib::CountDownLatch latch1(3); + vespalib::CountDownLatch latch2(2); + FastOS_ThreadPool threadPool(64_Ki); + FNET_Transport transport; + transport.Start(&threadPool); + proton::ScheduledExecutor timer(transport); + timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); + timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); + EXPECT_TRUE(latch1.await(60s)); + EXPECT_TRUE(latch2.await(60s)); + timer.reset(); + transport.ShutDown(true); +} + +TEST("testReset") { + vespalib::CountDownLatch latch1(2); + FastOS_ThreadPool threadPool(64_Ki); + FNET_Transport transport; + transport.Start(&threadPool); + proton::ScheduledExecutor timer(transport); + timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); + timer.reset(); + EXPECT_TRUE(!latch1.await(3s)); + timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); + EXPECT_TRUE(latch1.await(60s)); + timer.reset(); + transport.ShutDown(true); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index 3f33871a0e2..25e9a469f93 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -25,6 +25,7 @@ vespa_add_library(searchcore_pcommon STATIC selectpruner.cpp state_reporter_utils.cpp statusreport.cpp + scheduledexecutor.cpp DEPENDS searchcore_proton_metrics searchcore_fconfig diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp new file mode 100644 index 00000000000..4577477fb77 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp @@ -0,0 +1,70 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "scheduledexecutor.h" +#include <vespa/fnet/scheduler.h> +#include <vespa/fnet/task.h> +#include <vespa/fnet/transport.h> + +using vespalib::duration; + +namespace proton { + +using Task = vespalib::Executor::Task; + +class TimerTask : public FNET_Task +{ +private: + TimerTask(const TimerTask &); + TimerTask&operator=(const TimerTask &); + + FNET_Scheduler *_scheduler; + Task::UP _task; + duration _interval; +public: + TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval) + : FNET_Task(scheduler), + _task(std::move(task)), + _interval(interval) + { } + + ~TimerTask() { + Kill(); + } + + void PerformTask() override { + _task->run(); + Schedule(vespalib::to_s(_interval)); + } +}; + +ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) + : _transport(transport), + _lock(), + _taskList() +{ } + +ScheduledExecutor::~ScheduledExecutor() +{ + reset(); +} + + +void +ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval) +{ + std::lock_guard guard(_lock); + auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval); + _taskList.push_back(std::move(tTask)); + _taskList.back()->Schedule(vespalib::to_s(delay)); +} + +void +ScheduledExecutor::reset() +{ + std::lock_guard guard(_lock); + for (auto & task : _taskList) { + task->Unschedule(); + } + _taskList.clear(); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h new file mode 100644 index 00000000000..80c8b7edd15 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h @@ -0,0 +1,58 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/time.h> +#include <mutex> +#include <vector> + +class FNET_Transport; + +namespace proton { + +class TimerTask; + +/** + * ScheduledExecutor is a class capable of running Tasks at a regular + * interval. The timer can be reset to clear all tasks currently being + * scheduled. + */ +class ScheduledExecutor +{ +private: + using TaskList = std::vector<std::unique_ptr<TimerTask>>; + using duration = vespalib::duration; + using Executor = vespalib::Executor; + FNET_Transport & _transport; + std::mutex _lock; + TaskList _taskList; + +public: + /** + * Create a new timer, capable of scheduling tasks at fixed intervals. + */ + ScheduledExecutor(FNET_Transport & transport); + + /** + * Destroys this timer, finishing the current task executing and then + * finishing. + */ + ~ScheduledExecutor(); + + /** + * Schedule new task to be executed at specified intervals. + * + * @param task The task to schedule. + * @param delay The delay to wait before first execution. + * @param interval The interval in seconds. + */ + void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval); + + /** + * Reset timer, clearing the list of task to execute. + */ + void reset(); +}; + +} + diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 6030fb3cceb..401de2e34a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "disk_mem_usage_sampler.h" -#include <vespa/vespalib/util/scheduledexecutor.h> +#include <vespa/searchcore/proton/common/scheduledexecutor.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h> #include <filesystem> @@ -15,7 +15,7 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std:: _path(path_in), _sampleInterval(60s), _lastSampleTime(vespalib::steady_clock::now()), - _periodicTimer(std::make_unique<vespalib::ScheduledExecutor>(transport)), + _periodicTimer(std::make_unique<ScheduledExecutor>(transport)), _lock(), _transient_usage_providers() { diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index fa8ac48fa1f..b6ff46bc714 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -7,11 +7,10 @@ class FNET_Transport; -namespace vespalib { class ScheduledExecutor; } - namespace proton { class ITransientResourceUsageProvider; +class ScheduledExecutor; /* * Class to sample disk and memory usage used for filtering write operations. @@ -21,7 +20,7 @@ class DiskMemUsageSampler { std::filesystem::path _path; vespalib::duration _sampleInterval; vespalib::steady_time _lastSampleTime; - std::unique_ptr<vespalib::ScheduledExecutor> _periodicTimer; + std::unique_ptr<ScheduledExecutor> _periodicTimer; std::mutex _lock; std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers; diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 0df211b5a0b..5ca47d4d800 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -5,8 +5,8 @@ #include "document_db_maintenance_config.h" #include "i_blockable_maintenance_job.h" #include <vespa/searchcorespi/index/i_thread_service.h> +#include <vespa/searchcore/proton/common/scheduledexecutor.h> #include <vespa/vespalib/util/lambdatask.h> -#include <vespa/vespalib/util/scheduledexecutor.h> #include <vespa/fastos/thread.h> #include <thread> @@ -51,7 +51,7 @@ MaintenanceController::MaintenanceController(FNET_Transport & transport, _readySubDB(), _remSubDB(), _notReadySubDB(), - _periodicTimer(std::make_unique<vespalib::ScheduledExecutor>(transport)), + _periodicTimer(std::make_unique<ScheduledExecutor>(transport)), _config(), _state(State::INITIALIZING), _docTypeName(docTypeName), diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 8e5bb8d860c..763225045b2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -6,9 +6,9 @@ #include "i_maintenance_job.h" #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/vespalib/util/retain_guard.h> -#include <vespa/vespalib/util/scheduledexecutor.h> #include <mutex> +class FNET_Transport; namespace vespalib { @@ -26,6 +26,7 @@ namespace proton { class MaintenanceJobRunner; class DocumentDBMaintenanceConfig; +class ScheduledExecutor; /** * Class that controls the bucket moving between ready and notready sub databases @@ -88,7 +89,7 @@ private: MaintenanceDocumentSubDB _readySubDB; MaintenanceDocumentSubDB _remSubDB; MaintenanceDocumentSubDB _notReadySubDB; - std::unique_ptr<vespalib::ScheduledExecutor> _periodicTimer; + std::unique_ptr<ScheduledExecutor> _periodicTimer; DocumentDBMaintenanceConfigSP _config; State _state; const DocTypeName &_docTypeName; |