aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-12-08 22:34:35 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-12-09 13:30:47 +0000
commitd8522831905697c4f5de37d1e1b2df470745930e (patch)
tree51df3b95e838f735547d12352bbbf8380f681b50
parent0f845e25cb7f0f4c91002a03b4c4c679cbb833e0 (diff)
Return a handle that will deregister the recurring task when it goes out of scope.
This requires the user to consider lifetime, and allows for finegrained control.
-rw-r--r--searchcore/src/tests/proton/common/timer/timer_test.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h5
12 files changed, 98 insertions, 49 deletions
diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp
index a0ad3378b09..8dd0c0abf53 100644
--- a/searchcore/src/tests/proton/common/timer/timer_test.cpp
+++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp
@@ -69,18 +69,27 @@ TYPED_TEST_SUITE(ScheduledExecutorTest, ScheduledTypes);
TYPED_TEST(ScheduledExecutorTest, test_scheduling) {
vespalib::CountDownLatch latch1(3);
vespalib::CountDownLatch latch2(2);
- this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms);
- this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms);
+ auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms);
+ auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms);
EXPECT_TRUE(latch1.await(60s));
EXPECT_TRUE(latch2.await(60s));
}
TYPED_TEST(ScheduledExecutorTest, test_reset) {
vespalib::CountDownLatch latch1(2);
- this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
+ auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
this->timer->reset();
EXPECT_TRUE(!latch1.await(3s));
- this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
+ auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
+ EXPECT_TRUE(latch1.await(60s));
+}
+
+TYPED_TEST(ScheduledExecutorTest, test_drop_handle) {
+ vespalib::CountDownLatch latch1(2);
+ auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
+ handleA.reset();
+ EXPECT_TRUE(!latch1.await(3s));
+ auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
EXPECT_TRUE(latch1.await(60s));
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h
index b95e65da045..6a5903536f0 100644
--- a/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h
+++ b/searchcore/src/vespa/searchcore/proton/common/i_scheduled_executor.h
@@ -3,6 +3,7 @@
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/idestructorcallback.h>
#include <memory>
namespace proton {
@@ -12,6 +13,9 @@ namespace proton {
*/
class IScheduledExecutor {
public:
+ using Handle = std::unique_ptr<vespalib::IDestructorCallback>;
+ using duration = vespalib::duration;
+ using Executor = vespalib::Executor;
virtual ~IScheduledExecutor() = default;
/**
@@ -20,9 +24,9 @@ public:
* @param task The task to schedule.
* @param delay The delay to wait before first execution.
* @param interval The interval between the task is executed.
+ * @return A handle that will cancel the recurring task when it goes out of scope
*/
- virtual void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task,
- vespalib::duration delay, vespalib::duration interval) = 0;
+ [[nodiscard]] virtual Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) = 0;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
index acb94c020f6..3f94247fa7e 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.cpp
@@ -3,13 +3,12 @@
#include "scheduled_forward_executor.h"
#include <vespa/vespalib/util/lambdatask.h>
-using vespalib::Executor;
using vespalib::makeLambdaTask;
namespace proton {
ScheduledForwardExecutor::ScheduledForwardExecutor(FNET_Transport& transport,
- vespalib::Executor& executor)
+ Executor& executor)
: _scheduler(transport),
_executor(executor)
{
@@ -21,11 +20,11 @@ ScheduledForwardExecutor::reset()
_scheduler.reset();
}
-void
-ScheduledForwardExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task,
- vespalib::duration delay, vespalib::duration interval)
+IScheduledExecutor::Handle
+ScheduledForwardExecutor::scheduleAtFixedRate(Executor::Task::UP task,
+ duration delay, duration interval)
{
- _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() {
+ return _scheduler.scheduleAtFixedRate(makeLambdaTask([&, my_task = std::move(task)]() {
_executor.execute(makeLambdaTask([&]() {
my_task->run();
}));
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
index eb7120527d7..b85855db287 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduled_forward_executor.h
@@ -13,16 +13,14 @@ namespace proton {
*/
class ScheduledForwardExecutor : public IScheduledExecutor {
private:
- ScheduledExecutor _scheduler;
- vespalib::Executor& _executor;
+ ScheduledExecutor _scheduler;
+ Executor & _executor;
public:
- ScheduledForwardExecutor(FNET_Transport& transport, vespalib::Executor& executor);
+ ScheduledForwardExecutor(FNET_Transport& transport, Executor& executor);
void reset();
- void scheduleAtFixedRate(std::unique_ptr<vespalib::Executor::Task> task,
- vespalib::duration delay, vespalib::duration interval) override;
-
+ [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override;
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
index 4577477fb77..94c81ee4b6b 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
@@ -3,6 +3,7 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/task.h>
#include <vespa/fnet/transport.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
using vespalib::duration;
@@ -13,13 +14,11 @@ using Task = vespalib::Executor::Task;
class TimerTask : public FNET_Task
{
private:
- TimerTask(const TimerTask &);
- TimerTask&operator=(const TimerTask &);
-
- FNET_Scheduler *_scheduler;
Task::UP _task;
duration _interval;
public:
+ TimerTask(const TimerTask &) = delete;
+ TimerTask&operator=(const TimerTask &) = delete;
TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval)
: FNET_Task(scheduler),
_task(std::move(task)),
@@ -36,9 +35,22 @@ public:
}
};
+class ScheduledExecutor::Registration : public vespalib::IDestructorCallback {
+private:
+ ScheduledExecutor & _executor;
+ uint64_t _key;
+public:
+ Registration(ScheduledExecutor & executor, uint64_t key) : _executor(executor), _key(key) {}
+ ~Registration() {
+ _executor.cancel(_key);
+ }
+
+};
+
ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport)
: _transport(transport),
_lock(),
+ _nextKey(0),
_taskList()
{ }
@@ -48,13 +60,27 @@ ScheduledExecutor::~ScheduledExecutor()
}
-void
-ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval)
+IScheduledExecutor::Handle
+ScheduledExecutor::scheduleAtFixedRate(Executor::Task::UP task, duration delay, duration interval)
{
std::lock_guard guard(_lock);
+ uint64_t key = _nextKey++;
auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval);
- _taskList.push_back(std::move(tTask));
- _taskList.back()->Schedule(vespalib::to_s(delay));
+ auto & taskRef = *tTask;
+ _taskList[key] = std::move(tTask);
+ taskRef.Schedule(vespalib::to_s(delay));
+ return std::make_unique<Registration>(*this, key);
+}
+
+bool
+ScheduledExecutor::cancel(uint64_t key)
+{
+ std::lock_guard guard(_lock);
+ auto found = _taskList.find(key);
+ if (found == _taskList.end()) return false;
+
+ found->second->Unschedule();
+ return true;
}
void
@@ -62,7 +88,7 @@ ScheduledExecutor::reset()
{
std::lock_guard guard(_lock);
for (auto & task : _taskList) {
- task->Unschedule();
+ task.second->Unschedule();
}
_taskList.clear();
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
index dfb16c94d7d..f4673612a6c 100644
--- a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
@@ -2,6 +2,7 @@
#pragma once
#include "i_scheduled_executor.h"
+#include <vespa/vespalib/stllike/hash_map.h>
#include <mutex>
#include <vector>
@@ -19,13 +20,14 @@ class TimerTask;
class ScheduledExecutor : public IScheduledExecutor
{
private:
- using TaskList = std::vector<std::unique_ptr<TimerTask>>;
- using duration = vespalib::duration;
- using Executor = vespalib::Executor;
+ using TaskList = vespalib::hash_map<uint64_t, std::unique_ptr<TimerTask>>;
FNET_Transport & _transport;
std::mutex _lock;
+ uint64_t _nextKey;
TaskList _taskList;
+ bool cancel(uint64_t key);
+ class Registration;
public:
/**
* Create a new timer, capable of scheduling tasks at fixed intervals.
@@ -38,7 +40,7 @@ public:
*/
~ScheduledExecutor() override;
- void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override;
+ [[nodiscard]] Handle scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval) override;
/**
* Reset timer, clearing the list of task to execute.
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
index 401de2e34a8..40cd6238393 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
@@ -17,6 +17,7 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::
_lastSampleTime(vespalib::steady_clock::now()),
_periodicTimer(std::make_unique<ScheduledExecutor>(transport)),
_lock(),
+ _periodicHandle(),
_transient_usage_providers()
{
setConfig(config);
@@ -24,26 +25,25 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::
DiskMemUsageSampler::~DiskMemUsageSampler()
{
- _periodicTimer.reset();
+ _periodicHandle.reset();
}
void
DiskMemUsageSampler::setConfig(const Config &config)
{
- _periodicTimer->reset();
+ _periodicHandle.reset();
_filter.setConfig(config.filterConfig);
_sampleInterval = config.sampleInterval;
sampleAndReportUsage();
_lastSampleTime = vespalib::steady_clock::now();
vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval);
- _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() {
- if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) {
- return;
- }
- sampleAndReportUsage();
- _lastSampleTime = vespalib::steady_clock::now();
- }),
- maxInterval, maxInterval);
+ _periodicHandle = _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() {
+ if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) {
+ return;
+ }
+ sampleAndReportUsage();
+ _lastSampleTime = vespalib::steady_clock::now();
+ }), maxInterval, maxInterval);
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
index b6ff46bc714..2b7d3ab759f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
@@ -7,6 +7,8 @@
class FNET_Transport;
+namespace vespalib { class IDestructorCallback; }
+
namespace proton {
class ITransientResourceUsageProvider;
@@ -21,7 +23,8 @@ class DiskMemUsageSampler {
vespalib::duration _sampleInterval;
vespalib::steady_time _lastSampleTime;
std::unique_ptr<ScheduledExecutor> _periodicTimer;
- std::mutex _lock;
+ std::mutex _lock;
+ std::unique_ptr<vespalib::IDestructorCallback> _periodicHandle;
std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers;
void sampleAndReportUsage();
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index c95efbca944..adafa4c6217 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -84,7 +84,7 @@ MaintenanceController::killJobs()
// Called by master write thread
assert(_masterThread.isCurrentThread());
LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId());
- _periodicTimer->reset();
+ _periodicTaskHandles.clear();
// No need to take _jobsLock as modification of _jobs also happens in master write thread.
for (auto &job : _jobs) {
job->stop(); // Make sure no more tasks are added to the executor
@@ -162,7 +162,7 @@ MaintenanceController::restart()
if (!getStarted() || getStopping() || !_readySubDB.valid()) {
return;
}
- _periodicTimer->reset();
+ _periodicTaskHandles.clear();
addJobsToPeriodicTimer();
}
@@ -174,14 +174,16 @@ MaintenanceController::addJobsToPeriodicTimer()
for (const auto &jw : _jobs) {
const IMaintenanceJob &job = jw->getJob();
LOG(debug, "addJobsToPeriodicTimer(): docType='%s', job.name='%s', job.delay=%f, job.interval=%f",
- _docTypeName.getName().c_str(), job.getName().c_str(), vespalib::to_s(job.getDelay()), vespalib::to_s(job.getInterval()));
+ _docTypeName.getName().c_str(), job.getName().c_str(), vespalib::to_s(job.getDelay()),
+ vespalib::to_s(job.getInterval()));
if (job.getInterval() == vespalib::duration::zero()) {
jw->run();
continue;
}
- _periodicTimer->scheduleAtFixedRate(std::make_unique<JobWrapperTask>(jw.get()),
- job.getDelay(), job.getInterval());
+ _periodicTaskHandles.push_back(_periodicTimer->scheduleAtFixedRate(std::make_unique<JobWrapperTask>(jw.get()),
+ job.getDelay(), job.getInterval()));
}
+
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index a2e5105f426..0717a4ddc87 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -79,6 +79,7 @@ public:
private:
using Mutex = std::mutex;
using Guard = std::lock_guard<Mutex>;
+ using TaskHandle = std::unique_ptr<vespalib::IDestructorCallback>;
ISyncableThreadService &_masterThread;
vespalib::MonitoredRefCount &_refCount;
@@ -86,6 +87,7 @@ private:
MaintenanceDocumentSubDB _remSubDB;
MaintenanceDocumentSubDB _notReadySubDB;
std::unique_ptr<ScheduledExecutor> _periodicTimer;
+ std::vector<TaskHandle> _periodicTaskHandles;
State _state;
const DocTypeName &_docTypeName;
JobList _jobs;
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 5c82b6e74ae..a8b94c8c489 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -380,7 +380,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_flushEngine->start();
vespalib::duration pruneSessionsInterval = vespalib::from_s(protonConfig.grouping.sessionmanager.pruning.interval);
- _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval);
+ _sessionPruneHandle = _scheduler->scheduleAtFixedRate(makeLambdaTask([&]() { _sessionManager->pruneTimedOutSessions(vespalib::steady_clock::now()); }), pruneSessionsInterval, pruneSessionsInterval);
_isInitializing = false;
_protonConfigurer.setAllowReconfig(true);
_initComplete = true;
@@ -469,6 +469,7 @@ Proton::~Proton()
if (_memoryFlushConfigUpdater) {
_diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get());
}
+ _sessionPruneHandle.reset();
_scheduler->reset();
_executor.shutdown();
_executor.sync();
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 8e1d1dedc59..b07ebece0c7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -13,12 +13,14 @@
#include "proton_configurer.h"
#include "rpc_hooks.h"
#include "shared_threading_service.h"
-#include <vespa/eval/eval/llvm/compile_cache.h>
+#include <vespa/searchcore/proton/common/i_scheduled_executor.h>
+#include <vespa/searchcore/proton/matching/querylimiter.h>
#include <vespa/searchcore/proton/matching/querylimiter.h>
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
#include <vespa/searchlib/common/fileheadercontext.h>
#include <vespa/searchlib/engine/monitorapi.h>
+#include <vespa/eval/eval/llvm/compile_cache.h>
#include <vespa/vespalib/net/http/component_config_producer.h>
#include <vespa/vespalib/net/http/generic_state_handler.h>
#include <vespa/vespalib/net/http/json_get_handler.h>
@@ -115,6 +117,7 @@ private:
ProtonConfigFetcher _protonConfigFetcher;
std::unique_ptr<SharedThreadingService> _shared_service;
std::unique_ptr<matching::SessionManager> _sessionManager;
+ IScheduledExecutor::Handle _sessionPruneHandle;
std::unique_ptr<ScheduledForwardExecutor> _scheduler;
vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding;
matching::QueryLimiter _queryLimiter;