summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-05-18 07:20:18 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-05-18 07:20:18 +0000
commit714346ca5e18851c784101537bcb9b0877d413a9 (patch)
tree121764ca0af5de204005d6b16d00a867f07fc5d8 /searchcore
parentf05f60b1ec40be6f3e60349d65da60bc41555db3 (diff)
Move ScheduledExecutor to searchcore and drop fnet dependency in staging_vespalib in order to prepare collapsing stagingg vespalib into vespalib
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/common/timer/.gitignore1
-rw-r--r--searchcore/src/tests/proton/common/timer/CMakeLists.txt8
-rw-r--r--searchcore/src/tests/proton/common/timer/timer_test.cpp54
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp70
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h58
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h5
11 files changed, 202 insertions, 9 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index 8df693e8ea2..c0b86b509b3 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -80,6 +80,7 @@ vespa_define_module(
src/tests/proton/common/hw_info_sampler
src/tests/proton/common/operation_rate_tracker
src/tests/proton/common/state_reporter_utils
+ src/tests/proton/common/timer
src/tests/proton/docsummary
src/tests/proton/document_iterator
src/tests/proton/documentdb
diff --git a/searchcore/src/tests/proton/common/timer/.gitignore b/searchcore/src/tests/proton/common/timer/.gitignore
new file mode 100644
index 00000000000..9031e40152a
--- /dev/null
+++ b/searchcore/src/tests/proton/common/timer/.gitignore
@@ -0,0 +1 @@
+staging_vespalib_timer_test_app
diff --git a/searchcore/src/tests/proton/common/timer/CMakeLists.txt b/searchcore/src/tests/proton/common/timer/CMakeLists.txt
new file mode 100644
index 00000000000..7afd366ed89
--- /dev/null
+++ b/searchcore/src/tests/proton/common/timer/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(staging_vespalib_timer_test_app TEST
+ SOURCES
+ timer_test.cpp
+ DEPENDS
+ searchcore_pcommon
+)
+vespa_add_test(NAME staging_vespalib_timer_test_app COMMAND staging_vespalib_timer_test_app)
diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp
new file mode 100644
index 00000000000..9eea67623b6
--- /dev/null
+++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp
@@ -0,0 +1,54 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/searchcore/proton/common/scheduledexecutor.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
+
+using vespalib::Executor;
+typedef Executor::Task Task;
+
+namespace {
+
+class TestTask : public Task {
+private:
+ vespalib::CountDownLatch &_latch;
+public:
+ TestTask(vespalib::CountDownLatch & latch) : _latch(latch) { }
+ void run() override { _latch.countDown(); }
+};
+
+}
+
+TEST("testScheduling") {
+ vespalib::CountDownLatch latch1(3);
+ vespalib::CountDownLatch latch2(2);
+ FastOS_ThreadPool threadPool(64_Ki);
+ FNET_Transport transport;
+ transport.Start(&threadPool);
+ proton::ScheduledExecutor timer(transport);
+ timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms);
+ timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms);
+ EXPECT_TRUE(latch1.await(60s));
+ EXPECT_TRUE(latch2.await(60s));
+ timer.reset();
+ transport.ShutDown(true);
+}
+
+TEST("testReset") {
+ vespalib::CountDownLatch latch1(2);
+ FastOS_ThreadPool threadPool(64_Ki);
+ FNET_Transport transport;
+ transport.Start(&threadPool);
+ proton::ScheduledExecutor timer(transport);
+ timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
+ timer.reset();
+ EXPECT_TRUE(!latch1.await(3s));
+ timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
+ EXPECT_TRUE(latch1.await(60s));
+ timer.reset();
+ transport.ShutDown(true);
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
index 3f33871a0e2..25e9a469f93 100644
--- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
@@ -25,6 +25,7 @@ vespa_add_library(searchcore_pcommon STATIC
selectpruner.cpp
state_reporter_utils.cpp
statusreport.cpp
+ scheduledexecutor.cpp
DEPENDS
searchcore_proton_metrics
searchcore_fconfig
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
new file mode 100644
index 00000000000..4577477fb77
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.cpp
@@ -0,0 +1,70 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "scheduledexecutor.h"
+#include <vespa/fnet/scheduler.h>
+#include <vespa/fnet/task.h>
+#include <vespa/fnet/transport.h>
+
+using vespalib::duration;
+
+namespace proton {
+
+using Task = vespalib::Executor::Task;
+
+class TimerTask : public FNET_Task
+{
+private:
+ TimerTask(const TimerTask &);
+ TimerTask&operator=(const TimerTask &);
+
+ FNET_Scheduler *_scheduler;
+ Task::UP _task;
+ duration _interval;
+public:
+ TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval)
+ : FNET_Task(scheduler),
+ _task(std::move(task)),
+ _interval(interval)
+ { }
+
+ ~TimerTask() {
+ Kill();
+ }
+
+ void PerformTask() override {
+ _task->run();
+ Schedule(vespalib::to_s(_interval));
+ }
+};
+
+ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport)
+ : _transport(transport),
+ _lock(),
+ _taskList()
+{ }
+
+ScheduledExecutor::~ScheduledExecutor()
+{
+ reset();
+}
+
+
+void
+ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval)
+{
+ std::lock_guard guard(_lock);
+ auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval);
+ _taskList.push_back(std::move(tTask));
+ _taskList.back()->Schedule(vespalib::to_s(delay));
+}
+
+void
+ScheduledExecutor::reset()
+{
+ std::lock_guard guard(_lock);
+ for (auto & task : _taskList) {
+ task->Unschedule();
+ }
+ _taskList.clear();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
new file mode 100644
index 00000000000..80c8b7edd15
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/common/scheduledexecutor.h
@@ -0,0 +1,58 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/time.h>
+#include <mutex>
+#include <vector>
+
+class FNET_Transport;
+
+namespace proton {
+
+class TimerTask;
+
+/**
+ * ScheduledExecutor is a class capable of running Tasks at a regular
+ * interval. The timer can be reset to clear all tasks currently being
+ * scheduled.
+ */
+class ScheduledExecutor
+{
+private:
+ using TaskList = std::vector<std::unique_ptr<TimerTask>>;
+ using duration = vespalib::duration;
+ using Executor = vespalib::Executor;
+ FNET_Transport & _transport;
+ std::mutex _lock;
+ TaskList _taskList;
+
+public:
+ /**
+ * Create a new timer, capable of scheduling tasks at fixed intervals.
+ */
+ ScheduledExecutor(FNET_Transport & transport);
+
+ /**
+ * Destroys this timer, finishing the current task executing and then
+ * finishing.
+ */
+ ~ScheduledExecutor();
+
+ /**
+ * Schedule new task to be executed at specified intervals.
+ *
+ * @param task The task to schedule.
+ * @param delay The delay to wait before first execution.
+ * @param interval The interval in seconds.
+ */
+ void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval);
+
+ /**
+ * Reset timer, clearing the list of task to execute.
+ */
+ void reset();
+};
+
+}
+
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
index 6030fb3cceb..401de2e34a8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "disk_mem_usage_sampler.h"
-#include <vespa/vespalib/util/scheduledexecutor.h>
+#include <vespa/searchcore/proton/common/scheduledexecutor.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h>
#include <filesystem>
@@ -15,7 +15,7 @@ DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::
_path(path_in),
_sampleInterval(60s),
_lastSampleTime(vespalib::steady_clock::now()),
- _periodicTimer(std::make_unique<vespalib::ScheduledExecutor>(transport)),
+ _periodicTimer(std::make_unique<ScheduledExecutor>(transport)),
_lock(),
_transient_usage_providers()
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
index fa8ac48fa1f..b6ff46bc714 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
@@ -7,11 +7,10 @@
class FNET_Transport;
-namespace vespalib { class ScheduledExecutor; }
-
namespace proton {
class ITransientResourceUsageProvider;
+class ScheduledExecutor;
/*
* Class to sample disk and memory usage used for filtering write operations.
@@ -21,7 +20,7 @@ class DiskMemUsageSampler {
std::filesystem::path _path;
vespalib::duration _sampleInterval;
vespalib::steady_time _lastSampleTime;
- std::unique_ptr<vespalib::ScheduledExecutor> _periodicTimer;
+ std::unique_ptr<ScheduledExecutor> _periodicTimer;
std::mutex _lock;
std::vector<std::shared_ptr<const ITransientResourceUsageProvider>> _transient_usage_providers;
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index 0df211b5a0b..5ca47d4d800 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -5,8 +5,8 @@
#include "document_db_maintenance_config.h"
#include "i_blockable_maintenance_job.h"
#include <vespa/searchcorespi/index/i_thread_service.h>
+#include <vespa/searchcore/proton/common/scheduledexecutor.h>
#include <vespa/vespalib/util/lambdatask.h>
-#include <vespa/vespalib/util/scheduledexecutor.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -51,7 +51,7 @@ MaintenanceController::MaintenanceController(FNET_Transport & transport,
_readySubDB(),
_remSubDB(),
_notReadySubDB(),
- _periodicTimer(std::make_unique<vespalib::ScheduledExecutor>(transport)),
+ _periodicTimer(std::make_unique<ScheduledExecutor>(transport)),
_config(),
_state(State::INITIALIZING),
_docTypeName(docTypeName),
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index 8e5bb8d860c..763225045b2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -6,9 +6,9 @@
#include "i_maintenance_job.h"
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/vespalib/util/retain_guard.h>
-#include <vespa/vespalib/util/scheduledexecutor.h>
#include <mutex>
+class FNET_Transport;
namespace vespalib {
@@ -26,6 +26,7 @@ namespace proton {
class MaintenanceJobRunner;
class DocumentDBMaintenanceConfig;
+class ScheduledExecutor;
/**
* Class that controls the bucket moving between ready and notready sub databases
@@ -88,7 +89,7 @@ private:
MaintenanceDocumentSubDB _readySubDB;
MaintenanceDocumentSubDB _remSubDB;
MaintenanceDocumentSubDB _notReadySubDB;
- std::unique_ptr<vespalib::ScheduledExecutor> _periodicTimer;
+ std::unique_ptr<ScheduledExecutor> _periodicTimer;
DocumentDBMaintenanceConfigSP _config;
State _state;
const DocTypeName &_docTypeName;