diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-08 22:34:35 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-09 13:30:47 +0000 |
commit | d8522831905697c4f5de37d1e1b2df470745930e (patch) | |
tree | 51df3b95e838f735547d12352bbbf8380f681b50 | |
parent | 0f845e25cb7f0f4c91002a03b4c4c679cbb833e0 (diff) |
Return a handle that will deregister the recurring task when it goes out of scope.
This requires the user to consider lifetime, and allows for finegrained control.
12 files changed, 98 insertions, 49 deletions
diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp index a0ad3378b09..8dd0c0abf53 100644 --- a/searchcore/src/tests/proton/common/timer/timer_test.cpp +++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp @@ -69,18 +69,27 @@ TYPED_TEST_SUITE(ScheduledExecutorTest, ScheduledTypes); TYPED_TEST(ScheduledExecutorTest, test_scheduling) { vespalib::CountDownLatch latch1(3); vespalib::CountDownLatch latch2(2); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); + auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); + auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); EXPECT_TRUE(latch1.await(60s)); EXPECT_TRUE(latch2.await(60s)); } TYPED_TEST(ScheduledExecutorTest, test_reset) { vespalib::CountDownLatch latch1(2); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); + auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); this->timer->reset(); EXPECT_TRUE(!latch1.await(3s)); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); + auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); + EXPECT_TRUE(latch1.await(60s)); +} + +TYPED_TEST(ScheduledExecutorTest, test_drop_handle) { + vespalib::CountDownLatch latch1(2); + auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); + handleA.reset(); + EXPECT_TRUE(!latch1.await(3s)); + auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); EXPECT_TRUE(latch1.await(60s)); } diff --git a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h index b95e65da045..6a5903536f0 100644 --- a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h +++ b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h @@ -3,6 +3,7 @@ #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/idestructorcallback.h> #include <memory> namespace proton { @@ -12,6 +13,9 @@ namespace proton { */ class IScheduledExecutor { public: + using Handle = std::unique_ptr<vespalib::IDestructorCallback>; + using duration = vespalib::duration; + using Executor = vespalib::Executor; virtual ~IScheduledExecutor() = default; /** @@ -20,9 +24,9 @@ public: * @param task The task to schedule. * @param delay The delay to wait before first execution. * @param interval The interval between the task is executed. + * @return A handle that will cancel the recurring task when it goes out of scope */ - virtual void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task, - vespalib::duration delay, vespalib::duration interval) = 0; + [[nodiscard]] virtual Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) = 0; }; } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp index acb94c020f6..3f94247fa7e 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp @@ -3,13 +3,12 @@ #include "scheduled_forward_executor.h" #include <vespa/vespalib/util/lambdatask.h> -using vespalib::Executor; using vespalib::makeLambdaTask; namespace proton { ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport, - vespalib::Executor& executor) + Executor& executor) : _scheduler(transport), _executor(executor) { @@ -21,11 +20,11 @@ ScheduledForwardExecutor::reset() _scheduler.reset(); } -void -ScheduledForwardExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, - vespalib::duration delay, vespalib::duration interval) +IScheduledExecutor::Handle +ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task, + duration delay, duration interval) { - _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() { + return _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() { _executor.execute(makeLambdaTask([&]() { my_task->run(); })); diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h index eb7120527d7..b85855db287 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h @@ -13,16 +13,14 @@ namespace proton { */ class ScheduledForwardExecutor : public IScheduledExecutor { private: - ScheduledExecutor _scheduler; - vespalib::Executor& _executor; + ScheduledExecutor _scheduler; + Executor & _executor; public: - ScheduledForwardExecutor(FNET_Transport& transport, vespalib::Executor& executor); + ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor); void reset(); - void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task, - vespalib::duration delay, vespalib::duration interval) override; - + [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp index 4577477fb77..94c81ee4b6b 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp @@ -3,6 +3,7 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/task.h> #include <vespa/fnet/transport.h> +#include <vespa/vespalib/stllike/hash_map.hpp> using vespalib::duration; @@ -13,13 +14,11 @@ using Task = vespalib::Executor::Task; class TimerTask : public FNET_Task { private: - TimerTask(const TimerTask &); - TimerTask&operator=(const TimerTask &); - - FNET_Scheduler *_scheduler; Task::UP _task; duration _interval; public: + TimerTask(const TimerTask &) = delete; + TimerTask&operator=(const TimerTask &) = delete; TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval) : FNET_Task(scheduler), _task(std::move(task)), @@ -36,9 +35,22 @@ public: } }; +class ScheduledExecutor::Registration : public vespalib::IDestructorCallback { +private: + ScheduledExecutor & _executor; + uint64_t _key; +public: + Registration(ScheduledExecutor & executor, uint64_t key) : _executor(executor), _key(key) {} + ~Registration() { + _executor.cancel(_key); + } + +}; + ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) : _transport(transport), _lock(), + _nextKey(0), _taskList() { } @@ -48,13 +60,27 @@ ScheduledExecutor::~ScheduledExecutor() } -void -ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval) +IScheduledExecutor::Handle +ScheduledExecutor::scheduleAtFixedRate(Executor::Task::UP task, duration delay, duration interval) { std::lock_guard guard(_lock); + uint64_t key = _nextKey++; auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval); - _taskList.push_back(std::move(tTask)); - _taskList.back()->Schedule(vespalib::to_s(delay)); + auto & taskRef = *tTask; + _taskList[key] = std::move(tTask); + taskRef.Schedule(vespalib::to_s(delay)); + return std::make_unique<Registration>(*this, key); +} + +bool +ScheduledExecutor::cancel(uint64_t key) +{ + std::lock_guard guard(_lock); + auto found = _taskList.find(key); + if (found == _taskList.end()) return false; + + found->second->Unschedule(); + return true; } void @@ -62,7 +88,7 @@ ScheduledExecutor::reset() { std::lock_guard guard(_lock); for (auto & task : _taskList) { - task->Unschedule(); + task.second->Unschedule(); } _taskList.clear(); } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h index dfb16c94d7d..f4673612a6c 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h @@ -2,6 +2,7 @@ #pragma once #include "i_scheduled_executor.h" +#include <vespa/vespalib/stllike/hash_map.h> #include <mutex> #include <vector> @@ -19,13 +20,14 @@ class TimerTask; class ScheduledExecutor : public IScheduledExecutor { private: - using TaskList = std::vector<std::unique_ptr<TimerTask>>; - using duration = vespalib::duration; - using Executor = vespalib::Executor; + using TaskList = vespalib::hash_map<uint64_t, std::unique_ptr<TimerTask>>; FNET_Transport & _transport; std::mutex _lock; + uint64_t _nextKey; TaskList _taskList; + bool cancel(uint64_t key); + class Registration; public: /** * Create a new timer, capable of scheduling tasks at fixed intervals. @@ -38,7 +40,7 @@ public: */ ~ScheduledExecutor() override; - void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; + [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; /** * Reset timer, clearing the list of task to execute. diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 401de2e34a8..40cd6238393 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -17,6 +17,7 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std:: _lastSampleTime(vespalib::steady_clock::now()), _periodicTimer(std::make_unique<ScheduledExecutor>(transport)), _lock(), + _periodicHandle(), _transient_usage_providers() { setConfig(config); @@ -24,26 +25,25 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std:: DiskMemUsageSampler::~DiskMemUsageSampler() { - _periodicTimer.reset(); + _periodicHandle.reset(); } void DiskMemUsageSampler::setConfig(const Config &config) { - _periodicTimer->reset(); + _periodicHandle.reset(); _filter.setConfig(config.filterConfig); _sampleInterval = config.sampleInterval; sampleAndReportUsage(); _lastSampleTime = vespalib::steady_clock::now(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); - _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { - if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { - return; - } - sampleAndReportUsage(); - _lastSampleTime = vespalib::steady_clock::now(); - }), - maxInterval, maxInterval); + _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { + if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { + return; + } + sampleAndReportUsage(); + _lastSampleTime = vespalib::steady_clock::now(); + }), maxInterval, maxInterval); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index b6ff46bc714..2b7d3ab759f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -7,6 +7,8 @@ class FNET_Transport; +namespace vespalib { class IDestructorCallback; } + namespace proton { class ITransientResourceUsageProvider; @@ -21,7 +23,8 @@ class DiskMemUsageSampler { vespalib::duration _sampleInterval; vespalib::steady_time _lastSampleTime; std::unique_ptr<ScheduledExecutor> _periodicTimer; - std::mutex _lock; + std::mutex _lock; + std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle; std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers; void sampleAndReportUsage(); diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index c95efbca944..adafa4c6217 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -84,7 +84,7 @@ MaintenanceController::killJobs() // Called by master write thread assert(_masterThread.isCurrentThread()); LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId()); - _periodicTimer->reset(); + _periodicTaskHandles.clear(); // No need to take _jobsLock as modification of _jobs also happens in master write thread. for (auto &job : _jobs) { job->stop(); // Make sure no more tasks are added to the executor @@ -162,7 +162,7 @@ MaintenanceController::restart() if (!getStarted() || getStopping() || !_readySubDB.valid()) { return; } - _periodicTimer->reset(); + _periodicTaskHandles.clear(); addJobsToPeriodicTimer(); } @@ -174,14 +174,16 @@ MaintenanceController::addJobsToPeriodicTimer() for (const auto &jw : _jobs) { const IMaintenanceJob &job = jw->getJob(); LOG(debug, "addJobsToPeriodicTimer(): docType='%s', job.name='%s', job.delay=%f, job.interval=%f", - _docTypeName.getName().c_str(), job.getName().c_str(), vespalib::to_s(job.getDelay()), vespalib::to_s(job.getInterval())); + _docTypeName.getName().c_str(), job.getName().c_str(), vespalib::to_s(job.getDelay()), + vespalib::to_s(job.getInterval())); if (job.getInterval() == vespalib::duration::zero()) { jw->run(); continue; } - _periodicTimer->scheduleAtFixedRate(std::make_unique<JobWrapperTask>(jw.get()), - job.getDelay(), job.getInterval()); + _periodicTaskHandles.push_back(_periodicTimer->scheduleAtFixedRate(std::make_unique<JobWrapperTask>(jw.get()), + job.getDelay(), job.getInterval())); } + } void diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index a2e5105f426..0717a4ddc87 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -79,6 +79,7 @@ public: private: using Mutex = std::mutex; using Guard = std::lock_guard<Mutex>; + using TaskHandle = std::unique_ptr<vespalib::IDestructorCallback>; ISyncableThreadService &_masterThread; vespalib::MonitoredRefCount &_refCount; @@ -86,6 +87,7 @@ private: MaintenanceDocumentSubDB _remSubDB; MaintenanceDocumentSubDB _notReadySubDB; std::unique_ptr<ScheduledExecutor> _periodicTimer; + std::vector<TaskHandle> _periodicTaskHandles; State _state; const DocTypeName &_docTypeName; JobList _jobs; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 5c82b6e74ae..a8b94c8c489 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -380,7 +380,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _flushEngine->start(); vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval); - _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval); + _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval); _isInitializing = false; _protonConfigurer.setAllowReconfig(true); _initComplete = true; @@ -469,6 +469,7 @@ Proton::~Proton() if (_memoryFlushConfigUpdater) { _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get()); } + _sessionPruneHandle.reset(); _scheduler->reset(); _executor.shutdown(); _executor.sync(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 8e1d1dedc59..b07ebece0c7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -13,12 +13,14 @@ #include "proton_configurer.h" #include "rpc_hooks.h" #include "shared_threading_service.h" -#include <vespa/eval/eval/llvm/compile_cache.h> +#include <vespa/searchcore/proton/common/i_scheduled_executor.h> +#include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchlib/common/fileheadercontext.h> #include <vespa/searchlib/engine/monitorapi.h> +#include <vespa/eval/eval/llvm/compile_cache.h> #include <vespa/vespalib/net/http/component_config_producer.h> #include <vespa/vespalib/net/http/generic_state_handler.h> #include <vespa/vespalib/net/http/json_get_handler.h> @@ -115,6 +117,7 @@ private: ProtonConfigFetcher _protonConfigFetcher; std::unique_ptr<SharedThreadingService> _shared_service; std::unique_ptr<matching::SessionManager> _sessionManager; + IScheduledExecutor::Handle _sessionPruneHandle; std::unique_ptr<ScheduledForwardExecutor> _scheduler; vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding; matching::QueryLimiter _queryLimiter; |