summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-13 07:47:23 +0100
committerGitHub <noreply@github.com>2022-12-13 07:47:23 +0100
commit3c457c690d1e306a0560cfb724e4c892c00fe9e4 (patch)
tree0376417c508f11b5f4baa12b6e3eb65e7540c8b6 /searchcore
parentfc0180434827f15a5678f964b428a264919921cc (diff)
parent0292b25bf3be780df933be21a7a489dbf5658934 (diff)
Merge pull request #25222 from vespa-engine/balder/ensure-only-1-task-executing-and-wait-until-safe
Balder/ensure only 1 task executing and wait until safe
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/common/timer/timer_test.cpp40
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp97
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp16
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp2
6 files changed, 131 insertions, 39 deletions
diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp
index 8dd0c0abf53..23180fc1aba 100644
--- a/searchcore/src/tests/proton/common/timer/timer_test.cpp
+++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp
@@ -8,10 +8,12 @@
#include <vespa/vespalib/util/count_down_latch.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
using vespalib::Executor;
using namespace proton;
using Task = Executor::Task;
+using vespalib::makeLambdaTask;
namespace {
@@ -57,7 +59,6 @@ public:
timer = make_scheduled_executor<ScheduledT>(transport, executor);
}
~ScheduledExecutorTest() {
- timer->reset();
transport.ShutDown(true);
}
};
@@ -75,15 +76,6 @@ TYPED_TEST(ScheduledExecutorTest, test_scheduling) {
EXPECT_TRUE(latch2.await(60s));
}
-TYPED_TEST(ScheduledExecutorTest, test_reset) {
- vespalib::CountDownLatch latch1(2);
- auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
- this->timer->reset();
- EXPECT_TRUE(!latch1.await(3s));
- auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
- EXPECT_TRUE(latch1.await(60s));
-}
-
TYPED_TEST(ScheduledExecutorTest, test_drop_handle) {
vespalib::CountDownLatch latch1(2);
auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
@@ -93,4 +85,32 @@ TYPED_TEST(ScheduledExecutorTest, test_drop_handle) {
EXPECT_TRUE(latch1.await(60s));
}
+TYPED_TEST(ScheduledExecutorTest, test_only_one_instance_running) {
+ vespalib::Gate latch;
+ std::atomic<uint64_t> counter = 0;
+ auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms);
+ std::this_thread::sleep_for(2s);
+ EXPECT_EQ(1, counter);
+ latch.countDown();
+ std::this_thread::sleep_for(2s);
+ EXPECT_GT(counter, 10);
+}
+
+TYPED_TEST(ScheduledExecutorTest, test_sync_delete) {
+ vespalib::Gate latch;
+ std::atomic<uint64_t> counter = 0;
+ std::atomic<uint64_t> reset_counter = 0;
+ auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms);
+ auto handleB = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { handleA.reset(); reset_counter++; }), 0ms, 1ms);
+ std::this_thread::sleep_for(2s);
+ EXPECT_EQ(1, counter);
+ EXPECT_EQ(0, reset_counter);
+ latch.countDown();
+ std::this_thread::sleep_for(2s);
+ EXPECT_EQ(1, counter);
+ EXPECT_GT(reset_counter, 10);
+ EXPECT_EQ(nullptr, handleA.get());
+ EXPECT_FALSE(nullptr == handleB.get());
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
index 40f8cd19a17..08712a1094c 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
@@ -2,34 +2,115 @@
#include "scheduled_forward_executor.h"
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <atomic>
+#include <thread>
+#include <cassert>
using vespalib::makeLambdaTask;
namespace proton {
+class ScheduledForwardExecutor::State {
+public:
+ State() :
+ _handle(),
+ _start_success(0),
+ _start_failed(0),
+ _running(false)
+ {}
+ ~State() {
+ assert( !_handle );
+ assert(!isRunning());
+ }
+ /// Returns false if it was already running
+ bool start() {
+ bool already_running = _running.exchange(true);
+ if (already_running) {
+ _start_failed++;
+ } else {
+ _start_success++;
+ }
+ return ! already_running;
+ }
+ void complete() {
+ bool was_running = _running.exchange(false);
+ assert(was_running);
+ }
+ void setHandle(Handle handle) {
+ _handle = std::move(handle);
+ }
+ void cancel() {
+ _handle.reset();
+ while(isRunning()) {
+ std::this_thread::sleep_for(1ms);
+ }
+ }
+private:
+ bool isRunning() const { return _running.load(std::memory_order_relaxed); }
+ Handle _handle;
+ std::atomic<uint64_t> _start_success;
+ std::atomic<uint64_t> _start_failed;
+ std::atomic<bool> _running;
+};
+
+class ScheduledForwardExecutor::Registration : public vespalib::IDestructorCallback {
+private:
+ ScheduledForwardExecutor & _executor;
+ uint64_t _key;
+public:
+ Registration(ScheduledForwardExecutor & executor, uint64_t key) : _executor(executor), _key(key) {}
+ ~Registration() {
+ _executor.cancel(_key);
+ }
+};
+
ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport,
Executor& executor)
: _scheduler(transport),
- _executor(executor)
+ _executor(executor),
+ _lock(),
+ _nextKey(0),
+ _taskList()
{
}
-void
-ScheduledForwardExecutor::reset()
+ScheduledForwardExecutor::~ScheduledForwardExecutor() {
+ std::lock_guard guard(_lock);
+ assert(_taskList.empty());
+}
+
+bool
+ScheduledForwardExecutor::cancel(uint64_t key)
{
- _scheduler.reset();
+ std::lock_guard guard(_lock);
+ auto found = _taskList.find(key);
+ if (found == _taskList.end()) return false;
+ found->second->cancel();
+ _taskList.erase(found);
+ return true;
}
IScheduledExecutor::Handle
ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task,
duration delay, duration interval)
{
+ std::lock_guard guard(_lock);
+ uint64_t key = _nextKey++;
+ auto state = std::make_unique<State>();
std::shared_ptr<Executor::Task> my_task = std::move(task);
- return _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task)]() {
- _executor.execute(makeLambdaTask([&, my_task]() {
- my_task->run();
- }));
+ auto handle = _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(my_task), my_state=state.get()]() {
+ bool start_allowed = my_state->start();
+ if (start_allowed) {
+ _executor.execute(makeLambdaTask([&, my_task]() {
+ my_task->run();
+ my_state->complete();
+ }));
+ }
}), delay, interval);
+ state->setHandle(std::move(handle));
+ _taskList[key] = std::move(state);
+ return std::make_unique<Registration>(*this, key);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
index b85855db287..d65b1cfedc4 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
@@ -13,13 +13,19 @@ namespace proton {
*/
class ScheduledForwardExecutor : public IScheduledExecutor {
private:
+ class State;
+ class Registration;
+ using Tasks = vespalib::hash_map<uint64_t, std::unique_ptr<State>>;
ScheduledExecutor _scheduler;
Executor & _executor;
+ std::mutex _lock;
+ uint64_t _nextKey;
+ Tasks _taskList;
+ bool cancel(uint64_t key);
public:
ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor);
- void reset();
-
+ ~ScheduledForwardExecutor() override;
[[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
index 1619388ce52..3fb08b9ad2b 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
@@ -25,7 +25,7 @@ public:
_interval(interval)
{ }
- ~TimerTask() {
+ ~TimerTask() override {
Kill();
}
@@ -44,7 +44,6 @@ public:
~Registration() {
_executor.cancel(_key);
}
-
};
ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport)
@@ -56,7 +55,8 @@ ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport)
ScheduledExecutor::~ScheduledExecutor()
{
- reset();
+ std::lock_guard guard(_lock);
+ assert(_taskList.empty());
}
@@ -84,14 +84,4 @@ ScheduledExecutor::cancel(uint64_t key)
return true;
}
-void
-ScheduledExecutor::reset()
-{
- std::lock_guard guard(_lock);
- for (auto & task : _taskList) {
- task.second->Unschedule();
- }
- _taskList.clear();
-}
-
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
index f4673612a6c..198a944b0f4 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
@@ -41,11 +41,6 @@ public:
~ScheduledExecutor() override;
[[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override;
-
- /**
- * Reset timer, clearing the list of task to execute.
- */
- void reset();
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 9934a94d7a5..44dfbbfba98 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -470,7 +470,7 @@ Proton::~Proton()
_diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get());
}
_sessionPruneHandle.reset();
- _scheduler->reset();
+ _scheduler.reset();
_executor.shutdown();
_executor.sync();
_rpcHooks.reset();