From c50cff33d0810ba4d99c8215b09a83738bb30dfb Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 12 Dec 2022 15:43:35 +0000 Subject: Track which tasks are ongoing, and cancel and clean up in correct order. --- .../proton/common/scheduled_forward_executor.cpp | 94 ++++++++++++++++++++-- .../proton/common/scheduled_forward_executor.h | 8 ++ .../searchcore/proton/common/scheduledexecutor.cpp | 1 - 3 files changed, 97 insertions(+), 6 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp index 40f8cd19a17..b69dcbd253e 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp @@ -2,34 +2,118 @@ #include "scheduled_forward_executor.h" #include +#include +#include +#include +#include using vespalib::makeLambdaTask; namespace proton { +class ScheduledForwardExecutor::State { +public: + State() : + _handle(), + _start_success(0), + _start_failed(0), + _running(false) + {} + ~State() { + assert( !_handle ); + assert(!isRunning()); + } + /// Returns false if it was already running + bool start() { + bool already_running = _running.exchange(true); + if (already_running) { + _start_failed++; + } else { + _start_success++; + } + return ! already_running; + } + void complete() { + bool was_running = _running.exchange(false); + assert(was_running); + } + void setHandle(Handle handle) { + _handle = std::move(handle); + } + void cancel() { + _handle.reset(); + while(isRunning()) { + std::this_thread::sleep_for(1ms); + } + } +private: + bool isRunning() const { return _running.load(std::memory_order_relaxed); } + Handle _handle; + std::atomic _start_success; + std::atomic _start_failed; + std::atomic _running; +}; + +class ScheduledForwardExecutor::Registration : public vespalib::IDestructorCallback { +private: + ScheduledForwardExecutor & _executor; + uint64_t _key; +public: + Registration(ScheduledForwardExecutor & executor, uint64_t key) : _executor(executor), _key(key) {} + ~Registration() { + _executor.cancel(_key); + } +}; + ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor) : _scheduler(transport), - _executor(executor) + _executor(executor), + _lock(), + _nextKey(0), + _taskList() { } +ScheduledForwardExecutor::~ScheduledForwardExecutor() = default; + void ScheduledForwardExecutor::reset() { _scheduler.reset(); } +bool +ScheduledForwardExecutor::cancel(uint64_t key) +{ + std::lock_guard guard(_lock); + auto found = _taskList.find(key); + if (found == _taskList.end()) return false; + found->second->cancel(); + _taskList.erase(found); + return true; +} + IScheduledExecutor::Handle ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task, duration delay, duration interval) { + std::lock_guard guard(_lock); + uint64_t key = _nextKey++; + auto state = std::make_unique(); std::shared_ptr my_task = std::move(task); - return _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task)]() { - _executor.execute(makeLambdaTask([&, my_task]() { - my_task->run(); - })); + auto handle = _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task), my_state=state.get()]() { + bool start_allowed = my_state->start(); + if (start_allowed) { + _executor.execute(makeLambdaTask([&, my_task]() { + my_task->run(); + my_state->complete(); + })); + } }), delay, interval); + state->setHandle(std::move(handle)); + _taskList[key] = std::move(state); + return std::make_unique(*this, key); } } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h index b85855db287..efab1a5e2fd 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h @@ -13,11 +13,19 @@ namespace proton { */ class ScheduledForwardExecutor : public IScheduledExecutor { private: + class State; + class Registration; + using Tasks = vespalib::hash_map>; ScheduledExecutor _scheduler; Executor & _executor; + std::mutex _lock; + uint64_t _nextKey; + Tasks _taskList; + bool cancel(uint64_t key); public: ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor); + ~ScheduledForwardExecutor() override; void reset(); [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr task, duration delay, duration interval) override; diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp index 1619388ce52..6acea2e68a5 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp @@ -44,7 +44,6 @@ public: ~Registration() { _executor.cancel(_key); } - }; ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) -- cgit v1.2.3