summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-12 15:43:35 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-12-12 15:44:50 +0000
commitc50cff33d0810ba4d99c8215b09a83738bb30dfb (patch)
treeb0a2217f6890856c487e7da32e7a688c82f97ee2 /searchcore
parent9758ab6fdbdcb01c39b42df19355a3c2222d4aaf (diff)
Track which tasks are ongoing, and cancel and clean up in correct order.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp94
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp1
3 files changed, 97 insertions, 6 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
index 40f8cd19a17..b69dcbd253e 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
@@ -2,34 +2,118 @@
#include "scheduled_forward_executor.h"
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <atomic>
+#include <thread>
+#include <cassert>
using vespalib::makeLambdaTask;
namespace proton {
+class ScheduledForwardExecutor::State {
+public:
+ State() :
+ _handle(),
+ _start_success(0),
+ _start_failed(0),
+ _running(false)
+ {}
+ ~State() {
+ assert( !_handle );
+ assert(!isRunning());
+ }
+ /// Returns false if it was already running
+ bool start() {
+ bool already_running = _running.exchange(true);
+ if (already_running) {
+ _start_failed++;
+ } else {
+ _start_success++;
+ }
+ return ! already_running;
+ }
+ void complete() {
+ bool was_running = _running.exchange(false);
+ assert(was_running);
+ }
+ void setHandle(Handle handle) {
+ _handle = std::move(handle);
+ }
+ void cancel() {
+ _handle.reset();
+ while(isRunning()) {
+ std::this_thread::sleep_for(1ms);
+ }
+ }
+private:
+ bool isRunning() const { return _running.load(std::memory_order_relaxed); }
+ Handle _handle;
+ std::atomic<uint64_t> _start_success;
+ std::atomic<uint64_t> _start_failed;
+ std::atomic<bool> _running;
+};
+
+class ScheduledForwardExecutor::Registration : public vespalib::IDestructorCallback {
+private:
+ ScheduledForwardExecutor & _executor;
+ uint64_t _key;
+public:
+ Registration(ScheduledForwardExecutor & executor, uint64_t key) : _executor(executor), _key(key) {}
+ ~Registration() {
+ _executor.cancel(_key);
+ }
+};
+
ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport,
Executor& executor)
: _scheduler(transport),
- _executor(executor)
+ _executor(executor),
+ _lock(),
+ _nextKey(0),
+ _taskList()
{
}
+ScheduledForwardExecutor::~ScheduledForwardExecutor() = default;
+
void
ScheduledForwardExecutor::reset()
{
_scheduler.reset();
}
+bool
+ScheduledForwardExecutor::cancel(uint64_t key)
+{
+ std::lock_guard guard(_lock);
+ auto found = _taskList.find(key);
+ if (found == _taskList.end()) return false;
+ found->second->cancel();
+ _taskList.erase(found);
+ return true;
+}
+
IScheduledExecutor::Handle
ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task,
duration delay, duration interval)
{
+ std::lock_guard guard(_lock);
+ uint64_t key = _nextKey++;
+ auto state = std::make_unique<State>();
std::shared_ptr<Executor::Task> my_task = std::move(task);
- return _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task)]() {
- _executor.execute(makeLambdaTask([&, my_task]() {
- my_task->run();
- }));
+ auto handle = _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task), my_state=state.get()]() {
+ bool start_allowed = my_state->start();
+ if (start_allowed) {
+ _executor.execute(makeLambdaTask([&, my_task]() {
+ my_task->run();
+ my_state->complete();
+ }));
+ }
}), delay, interval);
+ state->setHandle(std::move(handle));
+ _taskList[key] = std::move(state);
+ return std::make_unique<Registration>(*this, key);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
index b85855db287..efab1a5e2fd 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
@@ -13,11 +13,19 @@ namespace proton {
*/
class ScheduledForwardExecutor : public IScheduledExecutor {
private:
+ class State;
+ class Registration;
+ using Tasks = vespalib::hash_map<uint64_t, std::unique_ptr<State>>;
ScheduledExecutor _scheduler;
Executor & _executor;
+ std::mutex _lock;
+ uint64_t _nextKey;
+ Tasks _taskList;
+ bool cancel(uint64_t key);
public:
ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor);
+ ~ScheduledForwardExecutor() override;
void reset();
[[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override;
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
index 1619388ce52..6acea2e68a5 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
@@ -44,7 +44,6 @@ public:
~Registration() {
_executor.cancel(_key);
}
-
};
ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport)