diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-11 14:19:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-11 14:19:15 +0100 |
commit | 3de87e95a6638f19ac5d90b4876b6044add30bb5 (patch) | |
tree | 52b2febd7cfb80d1df762d35c59a3d9c1ee822b6 /searchcore | |
parent | a12c98736b14dffde08b1bb78d59efee9d883180 (diff) | |
parent | d8522831905697c4f5de37d1e1b2df470745930e (diff) |
Merge pull request #25190 from vespa-engine/balder/add-cancellation
Return a handle that will deregister the recurring task when it goes …
Diffstat (limited to 'searchcore')
12 files changed, 98 insertions, 49 deletions
diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp index a0ad3378b09..8dd0c0abf53 100644 --- a/searchcore/src/tests/proton/common/timer/timer_test.cpp +++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp @@ -69,18 +69,27 @@ TYPED_TEST_SUITE(ScheduledExecutorTest, ScheduledTypes); TYPED_TEST(ScheduledExecutorTest, test_scheduling) { vespalib::CountDownLatch latch1(3); vespalib::CountDownLatch latch2(2); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); + auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); + auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); EXPECT_TRUE(latch1.await(60s)); EXPECT_TRUE(latch2.await(60s)); } TYPED_TEST(ScheduledExecutorTest, test_reset) { vespalib::CountDownLatch latch1(2); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); + auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); this->timer->reset(); EXPECT_TRUE(!latch1.await(3s)); - this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); + auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); + EXPECT_TRUE(latch1.await(60s)); +} + +TYPED_TEST(ScheduledExecutorTest, test_drop_handle) { + vespalib::CountDownLatch latch1(2); + auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); + handleA.reset(); + EXPECT_TRUE(!latch1.await(3s)); + auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); EXPECT_TRUE(latch1.await(60s)); } diff --git a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h index b95e65da045..6a5903536f0 100644 --- a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h +++ b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h @@ -3,6 +3,7 @@ #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/idestructorcallback.h> #include <memory> namespace proton { @@ -12,6 +13,9 @@ namespace proton { */ class IScheduledExecutor { public: + using Handle = std::unique_ptr<vespalib::IDestructorCallback>; + using duration = vespalib::duration; + using Executor = vespalib::Executor; virtual ~IScheduledExecutor() = default; /** @@ -20,9 +24,9 @@ public: * @param task The task to schedule. * @param delay The delay to wait before first execution. * @param interval The interval between the task is executed. + * @return A handle that will cancel the recurring task when it goes out of scope */ - virtual void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task, - vespalib::duration delay, vespalib::duration interval) = 0; + [[nodiscard]] virtual Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) = 0; }; } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp index acb94c020f6..3f94247fa7e 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp @@ -3,13 +3,12 @@ #include "scheduled_forward_executor.h" #include <vespa/vespalib/util/lambdatask.h> -using vespalib::Executor; using vespalib::makeLambdaTask; namespace proton { ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport, - vespalib::Executor& executor) + Executor& executor) : _scheduler(transport), _executor(executor) { @@ -21,11 +20,11 @@ ScheduledForwardExecutor::reset() _scheduler.reset(); } -void -ScheduledForwardExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, - vespalib::duration delay, vespalib::duration interval) +IScheduledExecutor::Handle +ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task, + duration delay, duration interval) { - _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() { + return _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() { _executor.execute(makeLambdaTask([&]() { my_task->run(); })); diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h index eb7120527d7..b85855db287 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h @@ -13,16 +13,14 @@ namespace proton { */ class ScheduledForwardExecutor : public IScheduledExecutor { private: - ScheduledExecutor _scheduler; - vespalib::Executor& _executor; + ScheduledExecutor _scheduler; + Executor & _executor; public: - ScheduledForwardExecutor(FNET_Transport& transport, vespalib::Executor& executor); + ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor); void reset(); - void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task, - vespalib::duration delay, vespalib::duration interval) override; - + [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; }; } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp index 4577477fb77..94c81ee4b6b 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp @@ -3,6 +3,7 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/task.h> #include <vespa/fnet/transport.h> +#include <vespa/vespalib/stllike/hash_map.hpp> using vespalib::duration; @@ -13,13 +14,11 @@ using Task = vespalib::Executor::Task; class TimerTask : public FNET_Task { private: - TimerTask(const TimerTask &); - TimerTask&operator=(const TimerTask &); - - FNET_Scheduler *_scheduler; Task::UP _task; duration _interval; public: + TimerTask(const TimerTask &) = delete; + TimerTask&operator=(const TimerTask &) = delete; TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval) : FNET_Task(scheduler), _task(std::move(task)), @@ -36,9 +35,22 @@ public: } }; +class ScheduledExecutor::Registration : public vespalib::IDestructorCallback { +private: + ScheduledExecutor & _executor; + uint64_t _key; +public: + Registration(ScheduledExecutor & executor, uint64_t key) : _executor(executor), _key(key) {} + ~Registration() { + _executor.cancel(_key); + } + +}; + ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) : _transport(transport), _lock(), + _nextKey(0), _taskList() { } @@ -48,13 +60,27 @@ ScheduledExecutor::~ScheduledExecutor() } -void -ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval) +IScheduledExecutor::Handle +ScheduledExecutor::scheduleAtFixedRate(Executor::Task::UP task, duration delay, duration interval) { std::lock_guard guard(_lock); + uint64_t key = _nextKey++; auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval); - _taskList.push_back(std::move(tTask)); - _taskList.back()->Schedule(vespalib::to_s(delay)); + auto & taskRef = *tTask; + _taskList[key] = std::move(tTask); + taskRef.Schedule(vespalib::to_s(delay)); + return std::make_unique<Registration>(*this, key); +} + +bool +ScheduledExecutor::cancel(uint64_t key) +{ + std::lock_guard guard(_lock); + auto found = _taskList.find(key); + if (found == _taskList.end()) return false; + + found->second->Unschedule(); + return true; } void @@ -62,7 +88,7 @@ ScheduledExecutor::reset() { std::lock_guard guard(_lock); for (auto & task : _taskList) { - task->Unschedule(); + task.second->Unschedule(); } _taskList.clear(); } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h index dfb16c94d7d..f4673612a6c 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h @@ -2,6 +2,7 @@ #pragma once #include "i_scheduled_executor.h" +#include <vespa/vespalib/stllike/hash_map.h> #include <mutex> #include <vector> @@ -19,13 +20,14 @@ class TimerTask; class ScheduledExecutor : public IScheduledExecutor { private: - using TaskList = std::vector<std::unique_ptr<TimerTask>>; - using duration = vespalib::duration; - using Executor = vespalib::Executor; + using TaskList = vespalib::hash_map<uint64_t, std::unique_ptr<TimerTask>>; FNET_Transport & _transport; std::mutex _lock; + uint64_t _nextKey; TaskList _taskList; + bool cancel(uint64_t key); + class Registration; public: /** * Create a new timer, capable of scheduling tasks at fixed intervals. @@ -38,7 +40,7 @@ public: */ ~ScheduledExecutor() override; - void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; + [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; /** * Reset timer, clearing the list of task to execute. diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 401de2e34a8..40cd6238393 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -17,6 +17,7 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std:: _lastSampleTime(vespalib::steady_clock::now()), _periodicTimer(std::make_unique<ScheduledExecutor>(transport)), _lock(), + _periodicHandle(), _transient_usage_providers() { setConfig(config); @@ -24,26 +25,25 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std:: DiskMemUsageSampler::~DiskMemUsageSampler() { - _periodicTimer.reset(); + _periodicHandle.reset(); } void DiskMemUsageSampler::setConfig(const Config &config) { - _periodicTimer->reset(); + _periodicHandle.reset(); _filter.setConfig(config.filterConfig); _sampleInterval = config.sampleInterval; sampleAndReportUsage(); _lastSampleTime = vespalib::steady_clock::now(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); - _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { - if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { - return; - } - sampleAndReportUsage(); - _lastSampleTime = vespalib::steady_clock::now(); - }), - maxInterval, maxInterval); + _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { + if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { + return; + } + sampleAndReportUsage(); + _lastSampleTime = vespalib::steady_clock::now(); + }), maxInterval, maxInterval); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index b6ff46bc714..2b7d3ab759f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -7,6 +7,8 @@ class FNET_Transport; +namespace vespalib { class IDestructorCallback; } + namespace proton { class ITransientResourceUsageProvider; @@ -21,7 +23,8 @@ class DiskMemUsageSampler { vespalib::duration _sampleInterval; vespalib::steady_time _lastSampleTime; std::unique_ptr<ScheduledExecutor> _periodicTimer; - std::mutex _lock; + std::mutex _lock; + std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle; std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers; void sampleAndReportUsage(); diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index c95efbca944..adafa4c6217 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -84,7 +84,7 @@ MaintenanceController::killJobs() // Called by master write thread assert(_masterThread.isCurrentThread()); LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId()); - _periodicTimer->reset(); + _periodicTaskHandles.clear(); // No need to take _jobsLock as modification of _jobs also happens in master write thread. for (auto &job : _jobs) { job->stop(); // Make sure no more tasks are added to the executor @@ -162,7 +162,7 @@ MaintenanceController::restart() if (!getStarted() || getStopping() || !_readySubDB.valid()) { return; } - _periodicTimer->reset(); + _periodicTaskHandles.clear(); addJobsToPeriodicTimer(); } @@ -174,14 +174,16 @@ MaintenanceController::addJobsToPeriodicTimer() for (const auto &jw : _jobs) { const IMaintenanceJob &job = jw->getJob(); LOG(debug, "addJobsToPeriodicTimer(): docType='%s', job.name='%s', job.delay=%f, job.interval=%f", - _docTypeName.getName().c_str(), job.getName().c_str(), vespalib::to_s(job.getDelay()), vespalib::to_s(job.getInterval())); + _docTypeName.getName().c_str(), job.getName().c_str(), vespalib::to_s(job.getDelay()), + vespalib::to_s(job.getInterval())); if (job.getInterval() == vespalib::duration::zero()) { jw->run(); continue; } - _periodicTimer->scheduleAtFixedRate(std::make_unique<JobWrapperTask>(jw.get()), - job.getDelay(), job.getInterval()); + _periodicTaskHandles.push_back(_periodicTimer->scheduleAtFixedRate(std::make_unique<JobWrapperTask>(jw.get()), + job.getDelay(), job.getInterval())); } + } void diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index a2e5105f426..0717a4ddc87 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -79,6 +79,7 @@ public: private: using Mutex = std::mutex; using Guard = std::lock_guard<Mutex>; + using TaskHandle = std::unique_ptr<vespalib::IDestructorCallback>; ISyncableThreadService &_masterThread; vespalib::MonitoredRefCount &_refCount; @@ -86,6 +87,7 @@ private: MaintenanceDocumentSubDB _remSubDB; MaintenanceDocumentSubDB _notReadySubDB; std::unique_ptr<ScheduledExecutor> _periodicTimer; + std::vector<TaskHandle> _periodicTaskHandles; State _state; const DocTypeName &_docTypeName; JobList _jobs; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 636d898ed5d..9934a94d7a5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -380,7 +380,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _flushEngine->start(); vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval); - _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval); + _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval); _isInitializing = false; _protonConfigurer.setAllowReconfig(true); _initComplete = true; @@ -469,6 +469,7 @@ Proton::~Proton() if (_memoryFlushConfigUpdater) { _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get()); } + _sessionPruneHandle.reset(); _scheduler->reset(); _executor.shutdown(); _executor.sync(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 8e1d1dedc59..b07ebece0c7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -13,12 +13,14 @@ #include "proton_configurer.h" #include "rpc_hooks.h" #include "shared_threading_service.h" -#include <vespa/eval/eval/llvm/compile_cache.h> +#include <vespa/searchcore/proton/common/i_scheduled_executor.h> +#include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchlib/common/fileheadercontext.h> #include <vespa/searchlib/engine/monitorapi.h> +#include <vespa/eval/eval/llvm/compile_cache.h> #include <vespa/vespalib/net/http/component_config_producer.h> #include <vespa/vespalib/net/http/generic_state_handler.h> #include <vespa/vespalib/net/http/json_get_handler.h> @@ -115,6 +117,7 @@ private: ProtonConfigFetcher _protonConfigFetcher; std::unique_ptr<SharedThreadingService> _shared_service; std::unique_ptr<matching::SessionManager> _sessionManager; + IScheduledExecutor::Handle _sessionPruneHandle; std::unique_ptr<ScheduledForwardExecutor> _scheduler; vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding; matching::QueryLimiter _queryLimiter; |