summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h4
-rw-r--r--vespalib/src/tests/executor/stress_test.cpp5
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp28
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h36
7 files changed, 60 insertions, 40 deletions
diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
index bc17a698536..440e720a0b6 100644
--- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
+++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp
@@ -45,11 +45,11 @@ using storage::spi::Timestamp;
using vespalib::GenerationHandler;
using vespalib::GenerationHolder;
using searchcorespi::IFlushTarget;
+using namespace std::literals;
namespace proton {
-namespace
-{
+namespace {
static constexpr uint32_t numBucketBits = UINT32_C(20);
static constexpr uint64_t timestampBias = UINT64_C(2000000000000);
@@ -666,7 +666,7 @@ TEST("requireThatStatsAreUpdated")
EXPECT_GREATER(lastAllocated, perGidUsed);
EXPECT_GREATER(lastUsed, perGidUsed);
- FastOS_Thread::Sleep(2200);
+ std::this_thread::sleep_for(2200ms);
addGid(dms, gid1, bucketId1, time1);
EXPECT_EQUAL(2u, dms.getStatus().getNumDocs());
EXPECT_EQUAL(2u, dms.getStatus().getNumValues());
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index f703223bcbb..7f4698610c8 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -7,6 +7,7 @@
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/fastos/thread.h>
#include <set>
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
index 4927f831fc4..af8e16f657b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
@@ -2,8 +2,7 @@
#include "executor_thread_service.h"
#include <vespa/vespalib/util/closuretask.h>
-#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/sync.h>
+#include <vespa/fastos/thread.h>
using vespalib::makeClosure;
using vespalib::makeTask;
@@ -14,6 +13,13 @@ using vespalib::ThreadStackExecutorBase;
namespace proton {
+namespace internal {
+
+struct ThreadId {
+ FastOS_ThreadId _id;
+};
+}
+
namespace {
void
@@ -22,11 +28,11 @@ sampleThreadId(FastOS_ThreadId *threadId)
*threadId = FastOS_Thread::GetCurrentThreadId();
}
-FastOS_ThreadId
+std::unique_ptr<internal::ThreadId>
getThreadId(ThreadStackExecutorBase &executor)
{
- FastOS_ThreadId id;
- executor.execute(makeTask(makeClosure(&sampleThreadId, &id)));
+ std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>();
+ executor.execute(makeTask(makeClosure(&sampleThreadId, &id->_id)));
executor.sync();
return id;
}
@@ -46,6 +52,8 @@ ExecutorThreadService::ExecutorThreadService(ThreadStackExecutorBase &executor)
{
}
+ExecutorThreadService::~ExecutorThreadService() {}
+
void
ExecutorThreadService::run(Runnable &runnable)
{
@@ -62,7 +70,7 @@ bool
ExecutorThreadService::isCurrentThread() const
{
FastOS_ThreadId currentThreadId = FastOS_Thread::GetCurrentThreadId();
- return FastOS_Thread::CompareThreadIds(_threadId, currentThreadId);
+ return FastOS_Thread::CompareThreadIds(_threadId->_id, currentThreadId);
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
index ea4c51f8858..47e41fcd1db 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
@@ -6,6 +6,7 @@
namespace proton {
+namespace internal { class ThreadId; }
/**
* Implementation of IThreadService using an underlying thread stack executor
* with a single thread.
@@ -14,10 +15,11 @@ class ExecutorThreadService : public searchcorespi::index::IThreadService
{
private:
vespalib::ThreadStackExecutorBase &_executor;
- FastOS_ThreadId _threadId;
+ std::unique_ptr<internal::ThreadId> _threadId;
public:
ExecutorThreadService(vespalib::ThreadStackExecutorBase &executor);
+ ~ExecutorThreadService();
/**
* Implements IThreadService
diff --git a/vespalib/src/tests/executor/stress_test.cpp b/vespalib/src/tests/executor/stress_test.cpp
index aa5d9d53955..5c7620afb14 100644
--- a/vespalib/src/tests/executor/stress_test.cpp
+++ b/vespalib/src/tests/executor/stress_test.cpp
@@ -2,12 +2,11 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/locale/c.h>
-#include <cmath>
using namespace vespalib;
+using namespace std::literals;
uint32_t doStuff(uint32_t input) {
char buf[128];
@@ -131,7 +130,7 @@ Test::Main()
Executor::Task::UP t(new CPUTask(taskSize, result));
t = executor.execute(std::move(t));
while (t.get() != 0) {
- FastOS_Thread::Sleep(10);
+ std::this_thread::sleep_for(10ms);
t = executor.execute(std::move(t));
}
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index e9f101f242e..76557762479 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -1,15 +1,29 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "threadstackexecutorbase.h"
+#include <vespa/fastos/thread.h>
namespace vespalib {
+namespace thread {
+
+struct ThreadInit : public FastOS_Runnable {
+ Runnable &worker;
+ ThreadStackExecutorBase::init_fun_t init_fun;
+
+ explicit ThreadInit(Runnable &worker_in, ThreadStackExecutorBase::init_fun_t init_fun_in)
+ : worker(worker_in), init_fun(std::move(init_fun_in)) {}
+
+ void Run(FastOS_ThreadInterface *, void *) override;
+};
+
void
-ThreadStackExecutorBase::ThreadInit::Run(FastOS_ThreadInterface *, void *)
-{
+ThreadInit::Run(FastOS_ThreadInterface *, void *) {
init_fun(worker);
}
+}
+
void
ThreadStackExecutorBase::BlockedThread::wait() const
{
@@ -115,7 +129,7 @@ ThreadStackExecutorBase::run()
ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
uint32_t taskLimit,
init_fun_t init_fun)
- : _pool(stackSize),
+ : _pool(std::make_unique<FastOS_ThreadPool>(stackSize)),
_monitor(),
_stats(),
_executorCompletion(),
@@ -125,7 +139,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
_taskCount(0),
_taskLimit(taskLimit),
_closed(false),
- _thread_init(std::make_unique<ThreadInit>(*this, std::move(init_fun)))
+ _thread_init(std::make_unique<thread::ThreadInit>(*this, std::move(init_fun)))
{
assert(taskLimit > 0);
}
@@ -135,7 +149,7 @@ ThreadStackExecutorBase::start(uint32_t threads)
{
assert(threads > 0);
for (uint32_t i = 0; i < threads; ++i) {
- FastOS_ThreadInterface *thread = _pool.NewThread(_thread_init.get());
+ FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get());
assert(thread != 0);
(void)thread;
}
@@ -236,12 +250,12 @@ ThreadStackExecutorBase::cleanup()
{
shutdown().sync();
_executorCompletion.countDown();
- _pool.Close();
+ _pool->Close();
}
ThreadStackExecutorBase::~ThreadStackExecutorBase()
{
- assert(_pool.isClosed());
+ assert(_pool->isClosed());
assert(_taskCount == 0);
assert(_blocked.empty());
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index a91114b055e..4ea27a2bcde 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -11,10 +11,13 @@
#include <memory>
#include <vector>
#include <functional>
-#include <vespa/fastos/thread.h>
+
+class FastOS_ThreadPool;
namespace vespalib {
+namespace thread { class ThreadInit; }
+
// Convenience macro used to create a function that can be used as an
// init function when creating an executor to inject a frame with the
// given name into the stack of all worker threads.
@@ -46,13 +49,6 @@ public:
using init_fun_t = std::function<int(Runnable&)>;
private:
- struct ThreadInit : public FastOS_Runnable {
- Runnable &worker;
- init_fun_t init_fun;
- explicit ThreadInit(Runnable &worker_in, init_fun_t init_fun_in)
- : worker(worker_in), init_fun(std::move(init_fun_in)) {}
- void Run(FastOS_ThreadInterface *, void *) override;
- };
struct TaggedTask {
Task::UP task;
@@ -102,18 +98,18 @@ private:
void unblock();
};
- FastOS_ThreadPool _pool;
- Monitor _monitor;
- Stats _stats;
- Gate _executorCompletion;
- ArrayQueue<TaggedTask> _tasks;
- ArrayQueue<Worker*> _workers;
- std::vector<BlockedThread*> _blocked;
- EventBarrier<BarrierCompletion> _barrier;
- uint32_t _taskCount;
- uint32_t _taskLimit;
- bool _closed;
- std::unique_ptr<ThreadInit> _thread_init;
+ std::unique_ptr<FastOS_ThreadPool> _pool;
+ Monitor _monitor;
+ Stats _stats;
+ Gate _executorCompletion;
+ ArrayQueue<TaggedTask> _tasks;
+ ArrayQueue<Worker*> _workers;
+ std::vector<BlockedThread*> _blocked;
+ EventBarrier<BarrierCompletion> _barrier;
+ uint32_t _taskCount;
+ uint32_t _taskLimit;
+ bool _closed;
+ std::unique_ptr<thread::ThreadInit> _thread_init;
void block_thread(const LockGuard &, BlockedThread &blocked_thread);
void unblock_threads(const MonitorGuard &);