diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-12-13 07:47:23 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-12-13 07:47:23 +0100 |
commit | 3c457c690d1e306a0560cfb724e4c892c00fe9e4 (patch) | |
tree | 0376417c508f11b5f4baa12b6e3eb65e7540c8b6 /searchcore | |
parent | fc0180434827f15a5678f964b428a264919921cc (diff) | |
parent | 0292b25bf3be780df933be21a7a489dbf5658934 (diff) |
Merge pull request #25222 from vespa-engine/balder/ensure-only-1-task-executing-and-wait-until-safe
Balder/ensure only 1 task executing and wait until safe
Diffstat (limited to 'searchcore')
6 files changed, 131 insertions, 39 deletions
diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp index 8dd0c0abf53..23180fc1aba 100644 --- a/searchcore/src/tests/proton/common/timer/timer_test.cpp +++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp @@ -8,10 +8,12 @@ #include <vespa/vespalib/util/count_down_latch.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> using vespalib::Executor; using namespace proton; using Task = Executor::Task; +using vespalib::makeLambdaTask; namespace { @@ -57,7 +59,6 @@ public: timer = make_scheduled_executor<ScheduledT>(transport, executor); } ~ScheduledExecutorTest() { - timer->reset(); transport.ShutDown(true); } }; @@ -75,15 +76,6 @@ TYPED_TEST(ScheduledExecutorTest, test_scheduling) { EXPECT_TRUE(latch2.await(60s)); } -TYPED_TEST(ScheduledExecutorTest, test_reset) { - vespalib::CountDownLatch latch1(2); - auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); - this->timer->reset(); - EXPECT_TRUE(!latch1.await(3s)); - auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); - EXPECT_TRUE(latch1.await(60s)); -} - TYPED_TEST(ScheduledExecutorTest, test_drop_handle) { vespalib::CountDownLatch latch1(2); auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); @@ -93,4 +85,32 @@ TYPED_TEST(ScheduledExecutorTest, test_drop_handle) { EXPECT_TRUE(latch1.await(60s)); } +TYPED_TEST(ScheduledExecutorTest, test_only_one_instance_running) { + vespalib::Gate latch; + std::atomic<uint64_t> counter = 0; + auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms); + std::this_thread::sleep_for(2s); + EXPECT_EQ(1, counter); + latch.countDown(); + std::this_thread::sleep_for(2s); + EXPECT_GT(counter, 10); +} + +TYPED_TEST(ScheduledExecutorTest, test_sync_delete) { + vespalib::Gate latch; + std::atomic<uint64_t> counter = 0; + std::atomic<uint64_t> reset_counter = 0; + auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms); + auto handleB = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { handleA.reset(); reset_counter++; }), 0ms, 1ms); + std::this_thread::sleep_for(2s); + EXPECT_EQ(1, counter); + EXPECT_EQ(0, reset_counter); + latch.countDown(); + std::this_thread::sleep_for(2s); + EXPECT_EQ(1, counter); + EXPECT_GT(reset_counter, 10); + EXPECT_EQ(nullptr, handleA.get()); + EXPECT_FALSE(nullptr == handleB.get()); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp index 40f8cd19a17..08712a1094c 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp @@ -2,34 +2,115 @@ #include "scheduled_forward_executor.h" #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <atomic> +#include <thread> +#include <cassert> using vespalib::makeLambdaTask; namespace proton { +class ScheduledForwardExecutor::State { +public: + State() : + _handle(), + _start_success(0), + _start_failed(0), + _running(false) + {} + ~State() { + assert( !_handle ); + assert(!isRunning()); + } + /// Returns false if it was already running + bool start() { + bool already_running = _running.exchange(true); + if (already_running) { + _start_failed++; + } else { + _start_success++; + } + return ! already_running; + } + void complete() { + bool was_running = _running.exchange(false); + assert(was_running); + } + void setHandle(Handle handle) { + _handle = std::move(handle); + } + void cancel() { + _handle.reset(); + while(isRunning()) { + std::this_thread::sleep_for(1ms); + } + } +private: + bool isRunning() const { return _running.load(std::memory_order_relaxed); } + Handle _handle; + std::atomic<uint64_t> _start_success; + std::atomic<uint64_t> _start_failed; + std::atomic<bool> _running; +}; + +class ScheduledForwardExecutor::Registration : public vespalib::IDestructorCallback { +private: + ScheduledForwardExecutor & _executor; + uint64_t _key; +public: + Registration(ScheduledForwardExecutor & executor, uint64_t key) : _executor(executor), _key(key) {} + ~Registration() { + _executor.cancel(_key); + } +}; + ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor) : _scheduler(transport), - _executor(executor) + _executor(executor), + _lock(), + _nextKey(0), + _taskList() { } -void -ScheduledForwardExecutor::reset() +ScheduledForwardExecutor::~ScheduledForwardExecutor() { + std::lock_guard guard(_lock); + assert(_taskList.empty()); +} + +bool +ScheduledForwardExecutor::cancel(uint64_t key) { - _scheduler.reset(); + std::lock_guard guard(_lock); + auto found = _taskList.find(key); + if (found == _taskList.end()) return false; + found->second->cancel(); + _taskList.erase(found); + return true; } IScheduledExecutor::Handle ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task, duration delay, duration interval) { + std::lock_guard guard(_lock); + uint64_t key = _nextKey++; + auto state = std::make_unique<State>(); std::shared_ptr<Executor::Task> my_task = std::move(task); - return _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task)]() { - _executor.execute(makeLambdaTask([&, my_task]() { - my_task->run(); - })); + auto handle = _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task), my_state=state.get()]() { + bool start_allowed = my_state->start(); + if (start_allowed) { + _executor.execute(makeLambdaTask([&, my_task]() { + my_task->run(); + my_state->complete(); + })); + } }), delay, interval); + state->setHandle(std::move(handle)); + _taskList[key] = std::move(state); + return std::make_unique<Registration>(*this, key); } } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h index b85855db287..d65b1cfedc4 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h @@ -13,13 +13,19 @@ namespace proton { */ class ScheduledForwardExecutor : public IScheduledExecutor { private: + class State; + class Registration; + using Tasks = vespalib::hash_map<uint64_t, std::unique_ptr<State>>; ScheduledExecutor _scheduler; Executor & _executor; + std::mutex _lock; + uint64_t _nextKey; + Tasks _taskList; + bool cancel(uint64_t key); public: ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor); - void reset(); - + ~ScheduledForwardExecutor() override; [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; }; diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp index 1619388ce52..3fb08b9ad2b 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp @@ -25,7 +25,7 @@ public: _interval(interval) { } - ~TimerTask() { + ~TimerTask() override { Kill(); } @@ -44,7 +44,6 @@ public: ~Registration() { _executor.cancel(_key); } - }; ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) @@ -56,7 +55,8 @@ ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) ScheduledExecutor::~ScheduledExecutor() { - reset(); + std::lock_guard guard(_lock); + assert(_taskList.empty()); } @@ -84,14 +84,4 @@ ScheduledExecutor::cancel(uint64_t key) return true; } -void -ScheduledExecutor::reset() -{ - std::lock_guard guard(_lock); - for (auto & task : _taskList) { - task.second->Unschedule(); - } - _taskList.clear(); -} - } diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h index f4673612a6c..198a944b0f4 100644 --- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h +++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h @@ -41,11 +41,6 @@ public: ~ScheduledExecutor() override; [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override; - - /** - * Reset timer, clearing the list of task to execute. - */ - void reset(); }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 9934a94d7a5..44dfbbfba98 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -470,7 +470,7 @@ Proton::~Proton() _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get()); } _sessionPruneHandle.reset(); - _scheduler->reset(); + _scheduler.reset(); _executor.shutdown(); _executor.sync(); _rpcHooks.reset(); |