diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2016-08-30 16:00:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-30 16:00:44 +0200 |
commit | 0ed1c4c46d4b1e39aeaf7c6618b54af58e8541f5 (patch) | |
tree | 338f3bb33727e91c3d1a96a0e0da80e8b7b51b66 /vespalib/src | |
parent | 0259f6dc4c4c01c5eb5666de1a3ba603cc813770 (diff) | |
parent | 4d699ab97297079fb0e59c56a7ac4a89606ee70f (diff) |
Merge pull request #507 from yahoo/havardpe/cleanup-and-instrument-thread-stack-executor
Havardpe/cleanup and instrument thread stack executor
Diffstat (limited to 'vespalib/src')
9 files changed, 95 insertions, 50 deletions
diff --git a/vespalib/src/tests/executor/.gitignore b/vespalib/src/tests/executor/.gitignore index 33760771905..96203a03956 100644 --- a/vespalib/src/tests/executor/.gitignore +++ b/vespalib/src/tests/executor/.gitignore @@ -7,3 +7,4 @@ vespalib_executor_test_app vespalib_stress_test_app vespalib_threadstackexecutor_test_app /vespalib_blockingthreadstackexecutor_test_app +/vespalib_blocking_executor_stress_test_app diff --git a/vespalib/src/tests/executor/CMakeLists.txt b/vespalib/src/tests/executor/CMakeLists.txt index 0435c03f34c..54ca828d89a 100644 --- a/vespalib/src/tests/executor/CMakeLists.txt +++ b/vespalib/src/tests/executor/CMakeLists.txt @@ -27,3 +27,10 @@ vespa_add_executable(vespalib_blockingthreadstackexecutor_test_app TEST vespalib ) vespa_add_test(NAME vespalib_blockingthreadstackexecutor_test_app COMMAND vespalib_blockingthreadstackexecutor_test_app) +vespa_add_executable(vespalib_blocking_executor_stress_test_app TEST + SOURCES + blocking_executor_stress.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_blocking_executor_stress_test_app COMMAND vespalib_blocking_executor_stress_test_app) diff --git a/vespalib/src/tests/executor/blocking_executor_stress.cpp b/vespalib/src/tests/executor/blocking_executor_stress.cpp new file mode 100644 index 00000000000..4adecb7c4a9 --- /dev/null +++ b/vespalib/src/tests/executor/blocking_executor_stress.cpp @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/sync.h> +#include <atomic> + +using namespace vespalib; + +std::atomic<size_t> tasks_run; + +size_t do_stuff(size_t size) { + size_t value = 0; + for (size_t i = 0; i < size; ++i) { + for (size_t j = 0; j < i; ++j) { + for (size_t k = 0; k < j; ++k) { + value += (i * j * k); + value *= (i + j + k); + } + } + } + return value; +} + +struct MyTask : Executor::Task { + size_t size; + size_t data; + MyTask(size_t size_in) : size(size_in), data(0) {} + virtual void run() override { + data += do_stuff(size); + ++tasks_run; + data += do_stuff(size); + data += do_stuff(size); + } +}; + +TEST_MT_F("stress test block thread stack executor", 8, BlockingThreadStackExecutor(4, 128000, 1000)) +{ + size_t loop_cnt = 100; + for (size_t i = 0; i < loop_cnt; ++i) { + auto result = f1.execute(std::make_unique<MyTask>(thread_id)); + EXPECT_TRUE(result.get() == nullptr); + } + TEST_BARRIER(); + if (thread_id == 0) { + f1.shutdown().sync(); + } + TEST_BARRIER(); + EXPECT_EQUAL((loop_cnt * num_threads), tasks_run); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index be4ec12dc38..eaaa429a5f8 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -5,17 +5,6 @@ namespace vespalib { -BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) : - ThreadStackExecutorBase(stackSize, taskLimit) -{ - start(threads); -} - -BlockingThreadStackExecutor::~BlockingThreadStackExecutor() -{ - cleanup(); -} - bool BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard) { @@ -31,6 +20,17 @@ BlockingThreadStackExecutor::wakeup(MonitorGuard & monitor) monitor.broadcast(); } +BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) : + ThreadStackExecutorBase(stackSize, taskLimit) +{ + start(threads); +} + +BlockingThreadStackExecutor::~BlockingThreadStackExecutor() +{ + cleanup(); +} + void BlockingThreadStackExecutor::setTaskLimit(uint32_t taskLimit) { diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h index 6e1b7ad8100..1274ed7fcea 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h @@ -13,8 +13,9 @@ namespace vespalib { class BlockingThreadStackExecutor : public ThreadStackExecutorBase { private: - virtual bool acceptNewTask(MonitorGuard & monitor); - virtual void wakeup(MonitorGuard & monitor); + bool acceptNewTask(MonitorGuard & monitor) override; + void wakeup(MonitorGuard & monitor) override; + public: /** * Create a new blocking thread stack executor. The task limit specifies @@ -35,4 +36,3 @@ public: }; } // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp index 206fdd08bfc..808fe01fea1 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp @@ -5,14 +5,6 @@ namespace vespalib { -//----------------------------------------------------------------------------- - -ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) : - ThreadStackExecutorBase(stackSize, taskLimit) -{ - start(threads); -} - bool ThreadStackExecutor::acceptNewTask(MonitorGuard &) { @@ -24,6 +16,12 @@ ThreadStackExecutor::wakeup(MonitorGuard &) { } +ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) : + ThreadStackExecutorBase(stackSize, taskLimit) +{ + start(threads); +} + ThreadStackExecutor::~ThreadStackExecutor() { cleanup(); diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h index 1dac0b5f336..27d8c3da325 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h @@ -13,18 +13,8 @@ namespace vespalib { class ThreadStackExecutor : public ThreadStackExecutorBase { public: - /** - * This will tell if a task will be accepted or not. - * An implementation might decide to block. - */ - virtual bool acceptNewTask(MonitorGuard & monitor); - - /** - * If blocking implementation, this might wake up any waiters. - * - * @param monitor to use for signaling. - */ - virtual void wakeup(MonitorGuard & monitor); + bool acceptNewTask(MonitorGuard &) override; + void wakeup(MonitorGuard &) override; public: /** diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 9ff3a2c3c87..4169a415858 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -49,6 +49,7 @@ ThreadStackExecutorBase::unblock_threads(const MonitorGuard &) void ThreadStackExecutorBase::assignTask(const TaggedTask &task, Worker &worker) { + worker.verify(); MonitorGuard monitor(worker.monitor); assert(worker.idle); assert(worker.task.task == 0); @@ -60,12 +61,14 @@ ThreadStackExecutorBase::assignTask(const TaggedTask &task, Worker &worker) bool ThreadStackExecutorBase::obtainTask(Worker &worker) { + worker.verify(); { MonitorGuard monitor(_monitor); if (worker.task.task != 0) { + assert(_taskCount != 0); + worker.task.task = 0; --_taskCount; _barrier.completeEvent(worker.task.token); - worker.task.task = 0; } unblock_threads(monitor); if (!_tasks.empty()) { @@ -98,6 +101,7 @@ ThreadStackExecutorBase::Run(FastOS_ThreadInterface *, void *) delete worker.task.task; } _executorCompletion.await(); // to allow unsafe signaling + worker.verify(); } //----------------------------------------------------------------------------- @@ -133,7 +137,10 @@ void ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit) { MonitorGuard monitor(_monitor); - _taskLimit = taskLimit; + if (!_closed) { + _taskLimit = taskLimit; + wakeup(monitor); + } } ThreadStackExecutorBase::Stats @@ -146,19 +153,6 @@ ThreadStackExecutorBase::getStats() return stats; } -bool -ThreadStackExecutorBase::acceptNewTask(MonitorGuard & guard) -{ - (void) guard; - return (_taskCount < _taskLimit); -} - -void -ThreadStackExecutorBase::wakeup(MonitorGuard & monitor) -{ - (void) monitor; -} - ThreadStackExecutorBase::Task::UP ThreadStackExecutorBase::execute(Task::UP task) { diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 6cec139d932..f74a4be399e 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -43,8 +43,11 @@ private: struct Worker { Monitor monitor; bool idle; + uint32_t pre_guard; TaggedTask task; - Worker() : monitor(), idle(false), task() {} + uint32_t post_guard; + Worker() : monitor(), idle(false), pre_guard(0xaaaaaaaa), task(), post_guard(0x44444444) {} + void verify() { assert((pre_guard == 0xaaaaaaaa) && (post_guard == 0x44444444)); } }; struct BarrierCompletion { |