diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-13 22:13:14 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-13 22:56:14 +0000 |
commit | 001bdf0053ba9cb02e20afcceb9d0f7ed63f1178 (patch) | |
tree | b2b0d66c4459114d878cfa61b12e74c39bbb0b74 /vdslib | |
parent | 71c10939b19be8ea115cda9ecddcad7749b2c20d (diff) |
Use std::mutex and std:.condition_variable and GC some unused code.
Diffstat (limited to 'vdslib')
-rw-r--r-- | vdslib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | vdslib/src/tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vdslib/src/tests/thread/.gitignore | 3 | ||||
-rw-r--r-- | vdslib/src/tests/thread/CMakeLists.txt | 8 | ||||
-rw-r--r-- | vdslib/src/tests/thread/taskschedulertest.cpp | 227 | ||||
-rw-r--r-- | vdslib/src/vespa/vdslib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vdslib/src/vespa/vdslib/thread/CMakeLists.txt | 6 | ||||
-rw-r--r-- | vdslib/src/vespa/vdslib/thread/taskscheduler.cpp | 220 | ||||
-rw-r--r-- | vdslib/src/vespa/vdslib/thread/taskscheduler.h | 105 |
9 files changed, 0 insertions, 573 deletions
diff --git a/vdslib/CMakeLists.txt b/vdslib/CMakeLists.txt index 3c1ee756e56..b66f53a4e19 100644 --- a/vdslib/CMakeLists.txt +++ b/vdslib/CMakeLists.txt @@ -14,7 +14,6 @@ vespa_define_module( src/vespa/vdslib/container src/vespa/vdslib/distribution src/vespa/vdslib/state - src/vespa/vdslib/thread TEST_DEPENDS vdstestlib @@ -24,5 +23,4 @@ vespa_define_module( src/tests/container src/tests/distribution src/tests/state - src/tests/thread ) diff --git a/vdslib/src/tests/CMakeLists.txt b/vdslib/src/tests/CMakeLists.txt index 6cf1ba5e33f..0b48d675ddc 100644 --- a/vdslib/src/tests/CMakeLists.txt +++ b/vdslib/src/tests/CMakeLists.txt @@ -9,7 +9,6 @@ vespa_add_executable(vdslib_gtest_runner_app TEST vdslib_containertest vdslib_testdistribution vdslib_teststate - vdslib_testthread GTest::GTest ) diff --git a/vdslib/src/tests/thread/.gitignore b/vdslib/src/tests/thread/.gitignore deleted file mode 100644 index 583460ae288..00000000000 --- a/vdslib/src/tests/thread/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.So -.depend -Makefile diff --git a/vdslib/src/tests/thread/CMakeLists.txt b/vdslib/src/tests/thread/CMakeLists.txt deleted file mode 100644 index bf2c8a41c9b..00000000000 --- a/vdslib/src/tests/thread/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(vdslib_testthread - SOURCES - taskschedulertest.cpp - DEPENDS - vdslib - GTest::GTest -) diff --git a/vdslib/src/tests/thread/taskschedulertest.cpp b/vdslib/src/tests/thread/taskschedulertest.cpp deleted file mode 100644 index 54877fae62b..00000000000 --- a/vdslib/src/tests/thread/taskschedulertest.cpp +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/vdslib/thread/taskscheduler.h> -#include <vespa/vespalib/gtest/gtest.h> -#include <thread> - -namespace vdslib { - -namespace { - -struct TestWatch : public TaskScheduler::Watch { - mutable std::mutex _lock; - uint64_t _time; - - TestWatch(uint64_t startTime = 0) : _time(startTime) {} - ~TestWatch() = default; - - TaskScheduler::Time getTime() const override { - std::lock_guard guard(_lock); - return _time; - } - - void increment(uint64_t ms) { - std::lock_guard guard(_lock); - _time += ms; - } - - void set(uint64_t ms) { - std::lock_guard guard(_lock); - _time = ms; - } -}; - -struct TestTask : public TaskScheduler::Task -{ - TestWatch& _watch; - uint64_t _executionTime; - uint64_t _maxRuns; - uint64_t _maxTime; - int64_t _result; - uint64_t _currentRuns; - std::string _name; - std::vector<std::string>* _register; - - TestTask(TestWatch& watch, uint64_t executionTime, uint64_t maxRuns, - uint64_t maxTime, int64_t result) - : _watch(watch), _executionTime(executionTime), _maxRuns(maxRuns), - _maxTime(maxTime), _result(result), _currentRuns(0), - _name(), _register(0) - { - } - - void registerCallsWithName(const std::string& name, - std::vector<std::string>& myregister) - { - _name = name; - _register = &myregister; - } - - int64_t run(TaskScheduler::Time currentTime) override { - // Emulate that we use time to run - _watch.increment(_executionTime); - if (_register != 0) { - std::ostringstream ost; - ost << currentTime; - if (_name.size() > 0) { - ost << " " << _name; - } - _register->push_back(ost.str()); - } - // If max runs, dont run anymore - if (++_currentRuns >= _maxRuns) { - //std::cerr << "Max runs run, returning 0\n"; - return 0; - } - // If we will go beyond max time, dont run anymore - if (_result > 0 && currentTime + _result > _maxTime) { - //std::cerr << "Max time spent, returning 0\n"; - return 0; - } - //std::cerr << "Executed test task. Returning " << _result << "\n"; - return _result; - } - -}; - -std::string join(std::vector<std::string>& v) { - std::ostringstream ost; - for (size_t i=0; i<v.size(); ++i) { - if (i != 0) ost << ","; - ost << v[i]; - } - return ost.str(); -} - -} - -TEST(TaskSchedulerTest, test_simple) -{ - FastOS_ThreadPool threadPool(128 * 1024); - TestWatch watch(0); - TaskScheduler scheduler; - scheduler.setWatch(watch); - scheduler.start(threadPool); - std::vector<std::string> calls; - - // Test that one can schedule a single task immediately - { - calls.clear(); - watch.set(0); - uint64_t counter = scheduler.getTaskCounter(); - TestTask* task(new TestTask(watch, 10, 5, 1000, 0)); - task->registerCallsWithName("", calls); - scheduler.add(TestTask::UP(task)); - scheduler.waitForTaskCounterOfAtLeast(counter + 1); - EXPECT_EQ(std::string("0"), join(calls)); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - } - // Test that task is repeated at intervals if wanted. - { - calls.clear(); - watch.set(0); - uint64_t counter = scheduler.getTaskCounter(); - TestTask* task(new TestTask(watch, 10, 5, 1000, -20)); - task->registerCallsWithName("", calls); - scheduler.add(TestTask::UP(task)); - for (uint32_t i = 1; i <= 5; ++i) { - scheduler.waitForTaskCounterOfAtLeast(counter + i); - watch.increment(100); - } - EXPECT_EQ(std::string("0,110,220,330,440"), - join(calls)); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - } - // Test that task scheduled at specific time works, and that if - // scheduled at specific time in the past/current, we're rerun at once. - { - calls.clear(); - watch.set(0); - uint64_t counter = scheduler.getTaskCounter(); - TestTask* task(new TestTask(watch, 10, 4, 1000, 100)); - task->registerCallsWithName("", calls); - scheduler.addAbsolute(TestTask::UP(task), 50); - watch.increment(49); // Not yet time to run - std::this_thread::sleep_for(5ms); - // Check that it has not run yet.. - EXPECT_EQ(counter, scheduler.getTaskCounter()); - watch.increment(10); // Now time is enough for it to run - scheduler.waitForTaskCounterOfAtLeast(counter + 1); - watch.increment(10); - std::this_thread::sleep_for(5ms); - // Check that it has not run yet.. - EXPECT_EQ(counter + 1, scheduler.getTaskCounter()); - watch.increment(50); - scheduler.waitForTaskCounterOfAtLeast(counter + 2); - EXPECT_EQ(std::string("59,129,129,129"), - join(calls)); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - } -} - -TEST(TaskSchedulerTest, test_multiple_tasks_at_same_time) -{ - FastOS_ThreadPool threadPool(128 * 1024); - TestWatch watch(0); - TaskScheduler scheduler; - scheduler.setWatch(watch); - std::vector<std::string> calls; - - // Test that tasks deleted before they are run are automatically - // cancelled and removed from scheduler - { - TestTask* task1(new TestTask(watch, 10, 3, 1000, 10)); - TestTask* task2(new TestTask(watch, 10, 3, 1000, 10)); - task1->registerCallsWithName("task1", calls); - task2->registerCallsWithName("task2", calls); - watch.set(10); - scheduler.add(TestTask::UP(task1)); - scheduler.add(TestTask::UP(task2)); - // Start threadpool after adding both, such that we ensure both - // are added at the same time interval - scheduler.start(threadPool); - - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - std::ostringstream ost; - for (size_t i=0; i<calls.size(); ++i) ost << calls[i] << "\n"; - - EXPECT_EQ(std::string( - "10 task1\n" - "10 task2\n" - "10 task1\n" - "10 task2\n" - "10 task1\n" - "10 task2\n" - ), ost.str()); - } -} - -TEST(TaskSchedulerTest, test_remove_task) -{ - FastOS_ThreadPool threadPool(128 * 1024); - TestWatch watch(0); - TaskScheduler scheduler; - scheduler.setWatch(watch); - scheduler.start(threadPool); - std::vector<std::string> calls; - - // Schedule a task, and remove it.. - { - calls.clear(); - watch.set(0); - TestTask* task(new TestTask(watch, 10, 5, 1000, 0)); - task->registerCallsWithName("", calls); - scheduler.addAbsolute(TestTask::UP(task), 50); - // Remove actual task - scheduler.remove(task); - scheduler.waitUntilNoTasksRemaining(); // Ensure task is complete - // Remove non-existing task - task = new TestTask(watch, 10, 5, 1000, 0); - scheduler.remove(task); - delete task; - // Time should not be advanced as task didn't get to run - EXPECT_EQ(0, (int) watch.getTime()); - } -} - -} diff --git a/vdslib/src/vespa/vdslib/CMakeLists.txt b/vdslib/src/vespa/vdslib/CMakeLists.txt index cf5053a5ceb..ea19664a45f 100644 --- a/vdslib/src/vespa/vdslib/CMakeLists.txt +++ b/vdslib/src/vespa/vdslib/CMakeLists.txt @@ -4,7 +4,6 @@ vespa_add_library(vdslib $<TARGET_OBJECTS:vdslib_container> $<TARGET_OBJECTS:vdslib_state> $<TARGET_OBJECTS:vdslib_distribution> - $<TARGET_OBJECTS:vdslib_thread> INSTALL lib64 DEPENDS ) diff --git a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt b/vdslib/src/vespa/vdslib/thread/CMakeLists.txt deleted file mode 100644 index 656772afc49..00000000000 --- a/vdslib/src/vespa/vdslib/thread/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_library(vdslib_thread OBJECT - SOURCES - taskscheduler.cpp - DEPENDS -) diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp b/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp deleted file mode 100644 index 08c7b80e406..00000000000 --- a/vdslib/src/vespa/vdslib/thread/taskscheduler.cpp +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "taskscheduler.h" -#include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <sys/time.h> - -namespace vdslib { - -uint64_t -TaskScheduler::Watch::getTime() const -{ - struct timeval mytime; - gettimeofday(&mytime, 0); - return mytime.tv_sec * 1000llu + mytime.tv_usec / 1000; -} - -TaskScheduler::TaskScheduler() - : _lock(), - _defaultWatch(), - _watch(&_defaultWatch), - _tasks(), - _currentRunningTasks(), - _taskCounter(0) -{ -} - -bool TaskScheduler::onStop() -{ - vespalib::MonitorGuard guard(_lock); - guard.broadcast(); - return true; -} - -TaskScheduler::~TaskScheduler() -{ - stop(); - { - vespalib::MonitorGuard guard(_lock); - guard.broadcast(); - } - join(); - for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end(); ++it) { - TaskVector & v(it->second); - for (TaskVector::iterator it2 = v.begin(); it2 != v.end(); ++it2) { - delete *it2; - } - } -} - -void -TaskScheduler::add(Task::UP task) -{ - vespalib::MonitorGuard guard(_lock); - std::vector<Task*>& tasks(_tasks[_watch->getTime()]); - tasks.push_back(task.release()); - guard.broadcast(); -} - -void -TaskScheduler::addRelative(Task::UP task, Time timeDiff) -{ - vespalib::MonitorGuard guard(_lock); - std::vector<Task*>& tasks(_tasks[_watch->getTime() + timeDiff]); - tasks.push_back(task.release()); - guard.broadcast(); -} - -void -TaskScheduler::addAbsolute(Task::UP task, Time time) -{ - vespalib::MonitorGuard guard(_lock); - std::vector<Task*>& tasks(_tasks[time]); - tasks.push_back(task.release()); - guard.broadcast(); -} - -namespace { - template<typename T> - bool contains(const std::vector<T>& source, const T& element) { - for (size_t i = 0, n = source.size(); i<n; ++i) { - if (source[i] == element) return true; - } - return false; - } - - template<typename T> - void erase(std::vector<T>& source, const T& element) { - std::vector<T> result; - result.reserve(source.size()); - for (size_t i = 0, n = source.size(); i<n; ++i) { - if (source[i] != element) result.push_back(source[i]); - } - result.swap(source); - } -} - -void -TaskScheduler::remove(Task* task) -{ - vespalib::MonitorGuard guard(_lock); - while (contains(_currentRunningTasks, task)) { - guard.wait(); - } - for (TaskMap::iterator it = _tasks.begin(); it != _tasks.end();) { - if (contains(it->second, task)) { - erase(it->second, task); - if (it->second.size() == 0) _tasks.erase(it); - delete task; - break; - } - ++it; - } -} - -void -TaskScheduler::setWatch(const Watch& watch) -{ - vespalib::MonitorGuard guard(_lock); - _watch = &watch; -} - -TaskScheduler::Time -TaskScheduler::getTime() const -{ - vespalib::MonitorGuard guard(_lock); - return _watch->getTime(); -} - -uint64_t -TaskScheduler::getTaskCounter() const -{ - vespalib::MonitorGuard guard(_lock); - return _taskCounter; -} - -void -TaskScheduler::waitForTaskCounterOfAtLeast(uint64_t taskCounter, - uint64_t timeout) const -{ - vespalib::MonitorGuard guard(_lock); - uint64_t currentTime = _defaultWatch.getTime(); - uint64_t endTime = currentTime + timeout; - while (_taskCounter < taskCounter) { - if (endTime <= currentTime) { - vespalib::asciistream ost; - ost << "Task scheduler not reached task counter of " << taskCounter - << " within timeout of " << timeout << " ms. Current task" - << " counter is " << _taskCounter; - throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); - } - guard.wait(endTime - currentTime); - currentTime = _defaultWatch.getTime(); - } -} - -void -TaskScheduler::waitUntilNoTasksRemaining(uint64_t timeout) const -{ - vespalib::MonitorGuard guard(_lock); - uint64_t currentTime = _defaultWatch.getTime(); - uint64_t endTime = currentTime + timeout; - while (_tasks.size() > 0 || _currentRunningTasks.size() > 0) { - if (endTime <= currentTime) { - vespalib::asciistream ost; - ost << "Task scheduler still have tasks scheduled after timeout" - << " of " << timeout << " ms. There are " << _tasks.size() - << " entries in tasks map and " << _currentRunningTasks.size() - << " tasks currently scheduled to run."; - throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); - } - guard.wait(endTime - currentTime); - currentTime = _defaultWatch.getTime(); - } -} - -void -TaskScheduler::run() -{ - while (1) { - vespalib::MonitorGuard guard(_lock); - if (!running()) return; - Time time = _watch->getTime(); - TaskMap::iterator next = _tasks.begin(); - if (next == _tasks.end()) { - guard.wait(); - continue; - } - if (next->first > time) { - guard.wait(next->first - time); - continue; - } - TaskVector taskList(next->second); - _currentRunningTasks.swap(next->second); - _tasks.erase(next); - guard.unlock(); - for (size_t i=0; i<taskList.size(); ++i) { - int64_t result = taskList[i]->run(time); - if (result < 0) { - addAbsolute(Task::UP(taskList[i]), - time + (-1 * result)); - } else if (result > 0) { - if (static_cast<Time>(result) <= time) { - taskList.push_back(taskList[i]); - } else { - addAbsolute(Task::UP(taskList[i]), result); - } - } else { - delete taskList[i]; - } - } - vespalib::MonitorGuard guard2(_lock); - if (!running()) return; - _taskCounter += _currentRunningTasks.size(); - _currentRunningTasks.clear(); - guard2.broadcast(); - } -} - -} // vdslib diff --git a/vdslib/src/vespa/vdslib/thread/taskscheduler.h b/vdslib/src/vespa/vdslib/thread/taskscheduler.h deleted file mode 100644 index b34d633e624..00000000000 --- a/vdslib/src/vespa/vdslib/thread/taskscheduler.h +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class vespalib::TaskScheduler - * \ingroup util - * - * \brief Class to register tasks in to get them run in a separate thread. - * - * Imported to vdslib as C++ document API needs an independent thread to run - * events in, as it was subject to errors to use FNET event thread. Converted - * this class used in storage API client code before. - */ -#pragma once - -#include <vespa/vespalib/util/document_runnable.h> -#include <map> -#include <memory> -#include <vector> -#include <vespa/vespalib/util/sync.h> - -namespace vdslib { - -class TaskScheduler : public document::Runnable -{ -public: - typedef uint64_t Time; - - struct Task { - typedef std::unique_ptr<Task> UP; - virtual ~Task() {} - /** - * Return 0 to unregister this task. Return a negative number to get a - * new callback in that many (times -1) milliseconds. Return a positive - * number to get a callback as soon as thread is available after that - * absolute point in time (in milliseconds). If returning current time - * or before, this task will be scheduled to be rerun immediately - * (after other already waiting tasks have had a chance to run). - * The current time for the scheduler is given to the task. - */ - virtual int64_t run(Time) = 0; - }; - - /** - * If you want to fake time (useful for testing), implement your own watch - * for the scheduler to use. - */ - struct Watch { - virtual ~Watch() {} - virtual Time getTime() const; // In ms since 1970 - }; - - /** Creates a task scheduler. Remember to call start() to get it going. */ - TaskScheduler(); - ~TaskScheduler(); - - /** Register a task for immediate execution */ - void add(Task::UP); - - /** Register a task to be run in a given number of milliseconds from now */ - void addRelative(Task::UP, Time); - - /** Register a task to be run at given absolute time in milliseconds */ - void addAbsolute(Task::UP, Time); - - /** - * Removes a scheduled task from the scheduler. Note that this is - * currently not efficiently implemented but an exhaustive iteration of - * current tasks. Assuming number of tasks is small so this doesn't matter. - * If current task is running while this is called, function will block - * until it has completed before removing it. (To be safe if you want to - * delete task after.) - */ - void remove(Task*); - - /** Get the schedulers current time. */ - Time getTime() const; - - /** Set a custom watch to be used for this scheduler (Useful for testing) */ - void setWatch(const Watch& watch); - - /** Returns a number of current task */ - uint64_t getTaskCounter() const; - - /** Wait until a given number of tasks have been completed */ - void waitForTaskCounterOfAtLeast(uint64_t taskCounter, - uint64_t timeout = 5000) const; - - /** Call this to wait until no tasks are scheduled (Useful for testing) */ - void waitUntilNoTasksRemaining(uint64_t timeout = 5000) const; - -private: - typedef std::vector<Task*> TaskVector; - typedef std::map<Time, TaskVector> TaskMap; - vespalib::Monitor _lock; - Watch _defaultWatch; - const Watch* _watch; - TaskMap _tasks; - TaskVector _currentRunningTasks; - uint64_t _taskCounter; - - void run() override; - bool onStop() override; -}; - -} // vdslib - |