aboutsummaryrefslogtreecommitdiffstats
path: root/vdslib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-13 22:13:14 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-13 22:56:14 +0000
commit001bdf0053ba9cb02e20afcceb9d0f7ed63f1178 (patch)
treeb2b0d66c4459114d878cfa61b12e74c39bbb0b74 /vdslib
parent71c10939b19be8ea115cda9ecddcad7749b2c20d (diff)
Use std::mutex and std:.condition_variable and GC some unused code.
Diffstat (limited to 'vdslib')
-rw-r--r--vdslib/CMakeLists.txt2
-rw-r--r--vdslib/src/tests/CMakeLists.txt1
-rw-r--r--vdslib/src/tests/thread/.gitignore3
-rw-r--r--vdslib/src/tests/thread/CMakeLists.txt8
-rw-r--r--vdslib/src/tests/thread/taskschedulertest.cpp227
-rw-r--r--vdslib/src/vespa/vdslib/CMakeLists.txt1
-rw-r--r--vdslib/src/vespa/vdslib/thread/CMakeLists.txt6
-rw-r--r--vdslib/src/vespa/vdslib/thread/taskscheduler.cpp220
-rw-r--r--vdslib/src/vespa/vdslib/thread/taskscheduler.h105
9 files changed, 0 insertions, 573 deletions
diff --git a/vdslib/CMakeLists.txt b/vdslib/CMakeLists.txt
index 3c1ee756e56..b66f53a4e19 100644
--- a/vdslib/CMakeLists.txt
+++ b/vdslib/CMakeLists.txt
@@ -14,7 +14,6 @@ vespa_define_module(
src/vespa/vdslib/container
src/vespa/vdslib/distribution
src/vespa/vdslib/state
- src/vespa/vdslib/thread
TEST_DEPENDS
vdstestlib
@@ -24,5 +23,4 @@ vespa_define_module(
src/tests/container
src/tests/distribution
src/tests/state
- src/tests/thread
)
diff --git a/vdslib/src/tests/CMakeLists.txt b/vdslib/src/tests/CMakeLists.txt
index 6cf1ba5e33f..0b48d675ddc 100644
--- a/vdslib/src/tests/CMakeLists.txt
+++ b/vdslib/src/tests/CMakeLists.txt
@@ -9,7 +9,6 @@ vespa_add_executable(vdslib_gtest_runner_app TEST
vdslib_containertest
vdslib_testdistribution
vdslib_teststate
- vdslib_testthread
GTest::GTest
)
diff --git a/vdslib/src/tests/thread/.gitignore b/vdslib/src/tests/thread/.gitignore
deleted file mode 100644
index 583460ae288..00000000000
--- a/vdslib/src/tests/thread/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-*.So
-.depend
-Makefile
diff --git a/vdslib/src/tests/thread/CMakeLists.txt b/vdslib/src/tests/thread/CMakeLists.txt
deleted file mode 100644
index bf2c8a41c9b..00000000000
--- a/vdslib/src/tests/thread/CMakeLists.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(vdslib_testthread
- SOURCES
- taskschedulertest.cpp
- DEPENDS
- vdslib
- GTest::GTest
-)
diff --git a/vdslib/src/tests/thread/taskschedulertest.cpp b/vdslib/src/tests/thread/taskschedulertest.cpp
deleted file mode 100644
index 54877fae62b..00000000000
--- a/vdslib/src/tests/thread/taskschedulertest.cpp
+++ /dev/null
@@ -1,227 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vdslib/thread/taskscheduler.h>
-#include <vespa/vespalib/gtest/gtest.h>
-#include <thread>
-
-namespace vdslib {
-
-namespace {
-
-struct TestWatch : public TaskScheduler::Watch {
- mutable std::mutex _lock;
- uint64_t _time;
-
- TestWatch(uint64_t startTime = 0) : _time(startTime) {}
- ~TestWatch() = default;
-
- TaskScheduler::Time getTime() const override {
- std::lock_guard guard(_lock);
- return _time;
- }
-
- void increment(uint64_t ms) {
- std::lock_guard guard(_lock);
- _time += ms;
- }
-
- void set(uint64_t ms) {
- std::lock_guard guard(_lock);
- _time = ms;
- }
-};
-
-struct TestTask : public TaskScheduler::Task
-{
- TestWatch& _watch;
- uint64_t _executionTime;
- uint64_t _maxRuns;
- uint64_t _maxTime;
- int64_t _result;
- uint64_t _currentRuns;
- std::string _name;
- std::vector<std::string>* _register;
-
- TestTask(TestWatch& watch, uint64_t executionTime, uint64_t maxRuns,
- uint64_t maxTime, int64_t result)
- : _watch(watch), _executionTime(executionTime), _maxRuns(maxRuns),
- _maxTime(maxTime), _result(result), _currentRuns(0),
- _name(), _register(0)
- {
- }
-
- void registerCallsWithName(const std::string& name,
- std::vector<std::string>& myregister)
- {
- _name = name;
- _register = &myregister;
- }
-
- int64_t run(TaskScheduler::Time currentTime) override {
- // Emulate that we use time to run
- _watch.increment(_executionTime);
- if (_register != 0) {
- std::ostringstream ost;
- ost << currentTime;
- if (_name.size() > 0) {
- ost << " " << _name;
- }
- _register->push_back(ost.str());
- }
- // If max runs, dont run anymore
- if (++_currentRuns >= _maxRuns) {
- //std::cerr << "Max runs run, returning 0\n";
- return 0;
- }
- // If we will go beyond max time, dont run anymore
- if (_result > 0 && currentTime + _result > _maxTime) {
- //std::cerr << "Max time spent, returning 0\n";
- return 0;
- }
- //std::cerr << "Executed test task. Returning " << _result << "\n";
- return _result;
- }
-
-};
-
-std::string join(std::vector<std::string>& v) {
- std::ostringstream ost;
- for (size_t i=0; i<v.size(); ++i) {
- if (i != 0) ost << ",";
- ost << v[i];
- }
- return ost.str();
-}
-
-}
-
-TEST(TaskSchedulerTest, test_simple)
-{
- FastOS_ThreadPool threadPool(128 * 1024);
- TestWatch watch(0);
- TaskScheduler scheduler;
- scheduler.setWatch(watch);
- scheduler.start(threadPool);
- std::vector<std::string> calls;
-
- // Test that one can schedule a single task immediately
- {
- calls.clear();
- watch.set(0);
- uint64_t counter = scheduler.getTaskCounter();
- TestTask* task(new TestTask(watch, 10, 5, 1000, 0));
- task->registerCallsWithName("", calls);
- scheduler.add(TestTask::UP(task));
- scheduler.waitForTaskCounterOfAtLeast(counter + 1);
- EXPECT_EQ(std::string("0"), join(calls));
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- }
- // Test that task is repeated at intervals if wanted.
- {
- calls.clear();
- watch.set(0);
- uint64_t counter = scheduler.getTaskCounter();
- TestTask* task(new TestTask(watch, 10, 5, 1000, -20));
- task->registerCallsWithName("", calls);
- scheduler.add(TestTask::UP(task));
- for (uint32_t i = 1; i <= 5; ++i) {
- scheduler.waitForTaskCounterOfAtLeast(counter + i);
- watch.increment(100);
- }
- EXPECT_EQ(std::string("0,110,220,330,440"),
- join(calls));
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- }
- // Test that task scheduled at specific time works, and that if
- // scheduled at specific time in the past/current, we're rerun at once.
- {
- calls.clear();
- watch.set(0);
- uint64_t counter = scheduler.getTaskCounter();
- TestTask* task(new TestTask(watch, 10, 4, 1000, 100));
- task->registerCallsWithName("", calls);
- scheduler.addAbsolute(TestTask::UP(task), 50);
- watch.increment(49); // Not yet time to run
- std::this_thread::sleep_for(5ms);
- // Check that it has not run yet..
- EXPECT_EQ(counter, scheduler.getTaskCounter());
- watch.increment(10); // Now time is enough for it to run
- scheduler.waitForTaskCounterOfAtLeast(counter + 1);
- watch.increment(10);
- std::this_thread::sleep_for(5ms);
- // Check that it has not run yet..
- EXPECT_EQ(counter + 1, scheduler.getTaskCounter());
- watch.increment(50);
- scheduler.waitForTaskCounterOfAtLeast(counter + 2);
- EXPECT_EQ(std::string("59,129,129,129"),
- join(calls));
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- }
-}
-
-TEST(TaskSchedulerTest, test_multiple_tasks_at_same_time)
-{
- FastOS_ThreadPool threadPool(128 * 1024);
- TestWatch watch(0);
- TaskScheduler scheduler;
- scheduler.setWatch(watch);
- std::vector<std::string> calls;
-
- // Test that tasks deleted before they are run are automatically
- // cancelled and removed from scheduler
- {
- TestTask* task1(new TestTask(watch, 10, 3, 1000, 10));
- TestTask* task2(new TestTask(watch, 10, 3, 1000, 10));
- task1->registerCallsWithName("task1", calls);
- task2->registerCallsWithName("task2", calls);
- watch.set(10);
- scheduler.add(TestTask::UP(task1));
- scheduler.add(TestTask::UP(task2));
- // Start threadpool after adding both, such that we ensure both
- // are added at the same time interval
- scheduler.start(threadPool);
-
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- std::ostringstream ost;
- for (size_t i=0; i<calls.size(); ++i) ost << calls[i] << "\n";
-
- EXPECT_EQ(std::string(
- "10 task1\n"
- "10 task2\n"
- "10 task1\n"
- "10 task2\n"
- "10 task1\n"
- "10 task2\n"
- ), ost.str());
- }
-}
-
-TEST(TaskSchedulerTest, test_remove_task)
-{
- FastOS_ThreadPool threadPool(128 * 1024);
- TestWatch watch(0);
- TaskScheduler scheduler;
- scheduler.setWatch(watch);
- scheduler.start(threadPool);
- std::vector<std::string> calls;
-
- // Schedule a task, and remove it..
- {
- calls.clear();
- watch.set(0);
- TestTask* task(new TestTask(watch, 10, 5, 1000, 0));
- task->registerCallsWithName("", calls);
- scheduler.addAbsolute(TestTask::UP(task), 50);
- // Remove actual task
- scheduler.remove(task);
- scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete
- // Remove non-existing task
- task = new TestTask(watch, 10, 5, 1000, 0);
- scheduler.remove(task);
- delete task;
- // Time should not be advanced as task didn't get to run
- EXPECT_EQ(0, (int) watch.getTime());
- }
-}
-
-}
diff --git a/vdslib/src/vespa/vdslib/CMakeLists.txt b/vdslib/src/vespa/vdslib/CMakeLists.txt
index cf5053a5ceb..ea19664a45f 100644
--- a/vdslib/src/vespa/vdslib/CMakeLists.txt
+++ b/vdslib/src/vespa/vdslib/CMakeLists.txt
@@ -4,7 +4,6 @@ vespa_add_library(vdslib
$<TARGET_OBJECTS:vdslib_container>
$<TARGET_OBJECTS:vdslib_state>
$<TARGET_OBJECTS:vdslib_distribution>
- $<TARGET_OBJECTS:vdslib_thread>
INSTALL lib64
DEPENDS
)
diff --git a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt b/vdslib/src/vespa/vdslib/thread/CMakeLists.txt
deleted file mode 100644
index 656772afc49..00000000000
--- a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_library(vdslib_thread OBJECT
- SOURCES
- taskscheduler.cpp
- DEPENDS
-)
diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp b/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp
deleted file mode 100644
index 08c7b80e406..00000000000
--- a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "taskscheduler.h"
-#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <sys/time.h>
-
-namespace vdslib {
-
-uint64_t
-TaskScheduler::Watch::getTime() const
-{
- struct timeval mytime;
- gettimeofday(&mytime, 0);
- return mytime.tv_sec * 1000llu + mytime.tv_usec / 1000;
-}
-
-TaskScheduler::TaskScheduler()
- : _lock(),
- _defaultWatch(),
- _watch(&_defaultWatch),
- _tasks(),
- _currentRunningTasks(),
- _taskCounter(0)
-{
-}
-
-bool TaskScheduler::onStop()
-{
- vespalib::MonitorGuard guard(_lock);
- guard.broadcast();
- return true;
-}
-
-TaskScheduler::~TaskScheduler()
-{
- stop();
- {
- vespalib::MonitorGuard guard(_lock);
- guard.broadcast();
- }
- join();
- for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end(); ++it) {
- TaskVector & v(it->second);
- for (TaskVector::iterator it2 = v.begin(); it2 != v.end(); ++it2) {
- delete *it2;
- }
- }
-}
-
-void
-TaskScheduler::add(Task::UP task)
-{
- vespalib::MonitorGuard guard(_lock);
- std::vector<Task*>& tasks(_tasks[_watch->getTime()]);
- tasks.push_back(task.release());
- guard.broadcast();
-}
-
-void
-TaskScheduler::addRelative(Task::UP task, Time timeDiff)
-{
- vespalib::MonitorGuard guard(_lock);
- std::vector<Task*>& tasks(_tasks[_watch->getTime() + timeDiff]);
- tasks.push_back(task.release());
- guard.broadcast();
-}
-
-void
-TaskScheduler::addAbsolute(Task::UP task, Time time)
-{
- vespalib::MonitorGuard guard(_lock);
- std::vector<Task*>& tasks(_tasks[time]);
- tasks.push_back(task.release());
- guard.broadcast();
-}
-
-namespace {
- template<typename T>
- bool contains(const std::vector<T>& source, const T& element) {
- for (size_t i = 0, n = source.size(); i<n; ++i) {
- if (source[i] == element) return true;
- }
- return false;
- }
-
- template<typename T>
- void erase(std::vector<T>& source, const T& element) {
- std::vector<T> result;
- result.reserve(source.size());
- for (size_t i = 0, n = source.size(); i<n; ++i) {
- if (source[i] != element) result.push_back(source[i]);
- }
- result.swap(source);
- }
-}
-
-void
-TaskScheduler::remove(Task* task)
-{
- vespalib::MonitorGuard guard(_lock);
- while (contains(_currentRunningTasks, task)) {
- guard.wait();
- }
- for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end();) {
- if (contains(it->second, task)) {
- erase(it->second, task);
- if (it->second.size() == 0) _tasks.erase(it);
- delete task;
- break;
- }
- ++it;
- }
-}
-
-void
-TaskScheduler::setWatch(const Watch& watch)
-{
- vespalib::MonitorGuard guard(_lock);
- _watch = &watch;
-}
-
-TaskScheduler::Time
-TaskScheduler::getTime() const
-{
- vespalib::MonitorGuard guard(_lock);
- return _watch->getTime();
-}
-
-uint64_t
-TaskScheduler::getTaskCounter() const
-{
- vespalib::MonitorGuard guard(_lock);
- return _taskCounter;
-}
-
-void
-TaskScheduler::waitForTaskCounterOfAtLeast(uint64_t taskCounter,
- uint64_t timeout) const
-{
- vespalib::MonitorGuard guard(_lock);
- uint64_t currentTime = _defaultWatch.getTime();
- uint64_t endTime = currentTime + timeout;
- while (_taskCounter < taskCounter) {
- if (endTime <= currentTime) {
- vespalib::asciistream ost;
- ost << "Task scheduler not reached task counter of " << taskCounter
- << " within timeout of " << timeout << " ms. Current task"
- << " counter is " << _taskCounter;
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
- }
- guard.wait(endTime - currentTime);
- currentTime = _defaultWatch.getTime();
- }
-}
-
-void
-TaskScheduler::waitUntilNoTasksRemaining(uint64_t timeout) const
-{
- vespalib::MonitorGuard guard(_lock);
- uint64_t currentTime = _defaultWatch.getTime();
- uint64_t endTime = currentTime + timeout;
- while (_tasks.size() > 0 || _currentRunningTasks.size() > 0) {
- if (endTime <= currentTime) {
- vespalib::asciistream ost;
- ost << "Task scheduler still have tasks scheduled after timeout"
- << " of " << timeout << " ms. There are " << _tasks.size()
- << " entries in tasks map and " << _currentRunningTasks.size()
- << " tasks currently scheduled to run.";
- throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
- }
- guard.wait(endTime - currentTime);
- currentTime = _defaultWatch.getTime();
- }
-}
-
-void
-TaskScheduler::run()
-{
- while (1) {
- vespalib::MonitorGuard guard(_lock);
- if (!running()) return;
- Time time = _watch->getTime();
- TaskMap::iterator next = _tasks.begin();
- if (next == _tasks.end()) {
- guard.wait();
- continue;
- }
- if (next->first > time) {
- guard.wait(next->first - time);
- continue;
- }
- TaskVector taskList(next->second);
- _currentRunningTasks.swap(next->second);
- _tasks.erase(next);
- guard.unlock();
- for (size_t i=0; i<taskList.size(); ++i) {
- int64_t result = taskList[i]->run(time);
- if (result < 0) {
- addAbsolute(Task::UP(taskList[i]),
- time + (-1 * result));
- } else if (result > 0) {
- if (static_cast<Time>(result) <= time) {
- taskList.push_back(taskList[i]);
- } else {
- addAbsolute(Task::UP(taskList[i]), result);
- }
- } else {
- delete taskList[i];
- }
- }
- vespalib::MonitorGuard guard2(_lock);
- if (!running()) return;
- _taskCounter += _currentRunningTasks.size();
- _currentRunningTasks.clear();
- guard2.broadcast();
- }
-}
-
-} // vdslib
diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.h b/vdslib/src/vespa/vdslib/thread/taskscheduler.h
deleted file mode 100644
index b34d633e624..00000000000
--- a/vdslib/src/vespa/vdslib/thread/taskscheduler.h
+++ /dev/null
@@ -1,105 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \class vespalib::TaskScheduler
- * \ingroup util
- *
- * \brief Class to register tasks in to get them run in a separate thread.
- *
- * Imported to vdslib as C++ document API needs an independent thread to run
- * events in, as it was subject to errors to use FNET event thread. Converted
- * this class used in storage API client code before.
- */
-#pragma once
-
-#include <vespa/vespalib/util/document_runnable.h>
-#include <map>
-#include <memory>
-#include <vector>
-#include <vespa/vespalib/util/sync.h>
-
-namespace vdslib {
-
-class TaskScheduler : public document::Runnable
-{
-public:
- typedef uint64_t Time;
-
- struct Task {
- typedef std::unique_ptr<Task> UP;
- virtual ~Task() {}
- /**
- * Return 0 to unregister this task. Return a negative number to get a
- * new callback in that many (times -1) milliseconds. Return a positive
- * number to get a callback as soon as thread is available after that
- * absolute point in time (in milliseconds). If returning current time
- * or before, this task will be scheduled to be rerun immediately
- * (after other already waiting tasks have had a chance to run).
- * The current time for the scheduler is given to the task.
- */
- virtual int64_t run(Time) = 0;
- };
-
- /**
- * If you want to fake time (useful for testing), implement your own watch
- * for the scheduler to use.
- */
- struct Watch {
- virtual ~Watch() {}
- virtual Time getTime() const; // In ms since 1970
- };
-
- /** Creates a task scheduler. Remember to call start() to get it going. */
- TaskScheduler();
- ~TaskScheduler();
-
- /** Register a task for immediate execution */
- void add(Task::UP);
-
- /** Register a task to be run in a given number of milliseconds from now */
- void addRelative(Task::UP, Time);
-
- /** Register a task to be run at given absolute time in milliseconds */
- void addAbsolute(Task::UP, Time);
-
- /**
- * Removes a scheduled task from the scheduler. Note that this is
- * currently not efficiently implemented but an exhaustive iteration of
- * current tasks. Assuming number of tasks is small so this doesn't matter.
- * If current task is running while this is called, function will block
- * until it has completed before removing it. (To be safe if you want to
- * delete task after.)
- */
- void remove(Task*);
-
- /** Get the schedulers current time. */
- Time getTime() const;
-
- /** Set a custom watch to be used for this scheduler (Useful for testing) */
- void setWatch(const Watch& watch);
-
- /** Returns a number of current task */
- uint64_t getTaskCounter() const;
-
- /** Wait until a given number of tasks have been completed */
- void waitForTaskCounterOfAtLeast(uint64_t taskCounter,
- uint64_t timeout = 5000) const;
-
- /** Call this to wait until no tasks are scheduled (Useful for testing) */
- void waitUntilNoTasksRemaining(uint64_t timeout = 5000) const;
-
-private:
- typedef std::vector<Task*> TaskVector;
- typedef std::map<Time, TaskVector> TaskMap;
- vespalib::Monitor _lock;
- Watch _defaultWatch;
- const Watch* _watch;
- TaskMap _tasks;
- TaskVector _currentRunningTasks;
- uint64_t _taskCounter;
-
- void run() override;
- bool onStop() override;
-};
-
-} // vdslib
-