aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2016-08-30 16:00:44 +0200
committerGitHub <noreply@github.com>2016-08-30 16:00:44 +0200
commit0ed1c4c46d4b1e39aeaf7c6618b54af58e8541f5 (patch)
tree338f3bb33727e91c3d1a96a0e0da80e8b7b51b66 /vespalib/src
parent0259f6dc4c4c01c5eb5666de1a3ba603cc813770 (diff)
parent4d699ab97297079fb0e59c56a7ac4a89606ee70f (diff)
Merge pull request #507 from yahoo/havardpe/cleanup-and-instrument-thread-stack-executor
Havardpe/cleanup and instrument thread stack executor
Diffstat (limited to 'vespalib/src')
-rw-r--r--vespalib/src/tests/executor/.gitignore1
-rw-r--r--vespalib/src/tests/executor/CMakeLists.txt7
-rw-r--r--vespalib/src/tests/executor/blocking_executor_stress.cpp52
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp22
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp14
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutor.h14
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp24
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h5
9 files changed, 95 insertions, 50 deletions
diff --git a/vespalib/src/tests/executor/.gitignore b/vespalib/src/tests/executor/.gitignore
index 33760771905..96203a03956 100644
--- a/vespalib/src/tests/executor/.gitignore
+++ b/vespalib/src/tests/executor/.gitignore
@@ -7,3 +7,4 @@ vespalib_executor_test_app
vespalib_stress_test_app
vespalib_threadstackexecutor_test_app
/vespalib_blockingthreadstackexecutor_test_app
+/vespalib_blocking_executor_stress_test_app
diff --git a/vespalib/src/tests/executor/CMakeLists.txt b/vespalib/src/tests/executor/CMakeLists.txt
index 0435c03f34c..54ca828d89a 100644
--- a/vespalib/src/tests/executor/CMakeLists.txt
+++ b/vespalib/src/tests/executor/CMakeLists.txt
@@ -27,3 +27,10 @@ vespa_add_executable(vespalib_blockingthreadstackexecutor_test_app TEST
vespalib
)
vespa_add_test(NAME vespalib_blockingthreadstackexecutor_test_app COMMAND vespalib_blockingthreadstackexecutor_test_app)
+vespa_add_executable(vespalib_blocking_executor_stress_test_app TEST
+ SOURCES
+ blocking_executor_stress.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_blocking_executor_stress_test_app COMMAND vespalib_blocking_executor_stress_test_app)
diff --git a/vespalib/src/tests/executor/blocking_executor_stress.cpp b/vespalib/src/tests/executor/blocking_executor_stress.cpp
new file mode 100644
index 00000000000..4adecb7c4a9
--- /dev/null
+++ b/vespalib/src/tests/executor/blocking_executor_stress.cpp
@@ -0,0 +1,52 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/sync.h>
+#include <atomic>
+
+using namespace vespalib;
+
+std::atomic<size_t> tasks_run;
+
+size_t do_stuff(size_t size) {
+ size_t value = 0;
+ for (size_t i = 0; i < size; ++i) {
+ for (size_t j = 0; j < i; ++j) {
+ for (size_t k = 0; k < j; ++k) {
+ value += (i * j * k);
+ value *= (i + j + k);
+ }
+ }
+ }
+ return value;
+}
+
+struct MyTask : Executor::Task {
+ size_t size;
+ size_t data;
+ MyTask(size_t size_in) : size(size_in), data(0) {}
+ virtual void run() override {
+ data += do_stuff(size);
+ ++tasks_run;
+ data += do_stuff(size);
+ data += do_stuff(size);
+ }
+};
+
+TEST_MT_F("stress test block thread stack executor", 8, BlockingThreadStackExecutor(4, 128000, 1000))
+{
+ size_t loop_cnt = 100;
+ for (size_t i = 0; i < loop_cnt; ++i) {
+ auto result = f1.execute(std::make_unique<MyTask>(thread_id));
+ EXPECT_TRUE(result.get() == nullptr);
+ }
+ TEST_BARRIER();
+ if (thread_id == 0) {
+ f1.shutdown().sync();
+ }
+ TEST_BARRIER();
+ EXPECT_EQUAL((loop_cnt * num_threads), tasks_run);
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
index be4ec12dc38..eaaa429a5f8 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
@@ -5,17 +5,6 @@
namespace vespalib {
-BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) :
- ThreadStackExecutorBase(stackSize, taskLimit)
-{
- start(threads);
-}
-
-BlockingThreadStackExecutor::~BlockingThreadStackExecutor()
-{
- cleanup();
-}
-
bool
BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard)
{
@@ -31,6 +20,17 @@ BlockingThreadStackExecutor::wakeup(MonitorGuard & monitor)
monitor.broadcast();
}
+BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) :
+ ThreadStackExecutorBase(stackSize, taskLimit)
+{
+ start(threads);
+}
+
+BlockingThreadStackExecutor::~BlockingThreadStackExecutor()
+{
+ cleanup();
+}
+
void
BlockingThreadStackExecutor::setTaskLimit(uint32_t taskLimit)
{
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
index 6e1b7ad8100..1274ed7fcea 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
@@ -13,8 +13,9 @@ namespace vespalib {
class BlockingThreadStackExecutor : public ThreadStackExecutorBase
{
private:
- virtual bool acceptNewTask(MonitorGuard & monitor);
- virtual void wakeup(MonitorGuard & monitor);
+ bool acceptNewTask(MonitorGuard & monitor) override;
+ void wakeup(MonitorGuard & monitor) override;
+
public:
/**
* Create a new blocking thread stack executor. The task limit specifies
@@ -35,4 +36,3 @@ public:
};
} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
index 206fdd08bfc..808fe01fea1 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
@@ -5,14 +5,6 @@
namespace vespalib {
-//-----------------------------------------------------------------------------
-
-ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) :
- ThreadStackExecutorBase(stackSize, taskLimit)
-{
- start(threads);
-}
-
bool
ThreadStackExecutor::acceptNewTask(MonitorGuard &)
{
@@ -24,6 +16,12 @@ ThreadStackExecutor::wakeup(MonitorGuard &)
{
}
+ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) :
+ ThreadStackExecutorBase(stackSize, taskLimit)
+{
+ start(threads);
+}
+
ThreadStackExecutor::~ThreadStackExecutor()
{
cleanup();
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
index 1dac0b5f336..27d8c3da325 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
@@ -13,18 +13,8 @@ namespace vespalib {
class ThreadStackExecutor : public ThreadStackExecutorBase
{
public:
- /**
- * This will tell if a task will be accepted or not.
- * An implementation might decide to block.
- */
- virtual bool acceptNewTask(MonitorGuard & monitor);
-
- /**
- * If blocking implementation, this might wake up any waiters.
- *
- * @param monitor to use for signaling.
- */
- virtual void wakeup(MonitorGuard & monitor);
+ bool acceptNewTask(MonitorGuard &) override;
+ void wakeup(MonitorGuard &) override;
public:
/**
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 9ff3a2c3c87..4169a415858 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -49,6 +49,7 @@ ThreadStackExecutorBase::unblock_threads(const MonitorGuard &)
void
ThreadStackExecutorBase::assignTask(const TaggedTask &task, Worker &worker)
{
+ worker.verify();
MonitorGuard monitor(worker.monitor);
assert(worker.idle);
assert(worker.task.task == 0);
@@ -60,12 +61,14 @@ ThreadStackExecutorBase::assignTask(const TaggedTask &task, Worker &worker)
bool
ThreadStackExecutorBase::obtainTask(Worker &worker)
{
+ worker.verify();
{
MonitorGuard monitor(_monitor);
if (worker.task.task != 0) {
+ assert(_taskCount != 0);
+ worker.task.task = 0;
--_taskCount;
_barrier.completeEvent(worker.task.token);
- worker.task.task = 0;
}
unblock_threads(monitor);
if (!_tasks.empty()) {
@@ -98,6 +101,7 @@ ThreadStackExecutorBase::Run(FastOS_ThreadInterface *, void *)
delete worker.task.task;
}
_executorCompletion.await(); // to allow unsafe signaling
+ worker.verify();
}
//-----------------------------------------------------------------------------
@@ -133,7 +137,10 @@ void
ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit)
{
MonitorGuard monitor(_monitor);
- _taskLimit = taskLimit;
+ if (!_closed) {
+ _taskLimit = taskLimit;
+ wakeup(monitor);
+ }
}
ThreadStackExecutorBase::Stats
@@ -146,19 +153,6 @@ ThreadStackExecutorBase::getStats()
return stats;
}
-bool
-ThreadStackExecutorBase::acceptNewTask(MonitorGuard & guard)
-{
- (void) guard;
- return (_taskCount < _taskLimit);
-}
-
-void
-ThreadStackExecutorBase::wakeup(MonitorGuard & monitor)
-{
- (void) monitor;
-}
-
ThreadStackExecutorBase::Task::UP
ThreadStackExecutorBase::execute(Task::UP task)
{
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 6cec139d932..f74a4be399e 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -43,8 +43,11 @@ private:
struct Worker {
Monitor monitor;
bool idle;
+ uint32_t pre_guard;
TaggedTask task;
- Worker() : monitor(), idle(false), task() {}
+ uint32_t post_guard;
+ Worker() : monitor(), idle(false), pre_guard(0xaaaaaaaa), task(), post_guard(0x44444444) {}
+ void verify() { assert((pre_guard == 0xaaaaaaaa) && (post_guard == 0x44444444)); }
};
struct BarrierCompletion {