diff options
7 files changed, 60 insertions, 40 deletions
diff --git a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp index bc17a698536..440e720a0b6 100644 --- a/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/documentmetastore_test.cpp @@ -45,11 +45,11 @@ using storage::spi::Timestamp; using vespalib::GenerationHandler; using vespalib::GenerationHolder; using searchcorespi::IFlushTarget; +using namespace std::literals; namespace proton { -namespace -{ +namespace { static constexpr uint32_t numBucketBits = UINT32_C(20); static constexpr uint64_t timestampBias = UINT64_C(2000000000000); @@ -666,7 +666,7 @@ TEST("requireThatStatsAreUpdated") EXPECT_GREATER(lastAllocated, perGidUsed); EXPECT_GREATER(lastUsed, perGidUsed); - FastOS_Thread::Sleep(2200); + std::this_thread::sleep_for(2200ms); addGid(dms, gid1, bucketId1, time1); EXPECT_EQUAL(2u, dms.getStatus().getNumDocs()); EXPECT_EQUAL(2u, dms.getStatus().getNumValues()); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index f703223bcbb..7f4698610c8 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -7,6 +7,7 @@ #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/fastos/thread.h> #include <set> namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp index 4927f831fc4..af8e16f657b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp @@ -2,8 +2,7 @@ #include "executor_thread_service.h" #include <vespa/vespalib/util/closuretask.h> -#include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/sync.h> +#include <vespa/fastos/thread.h> using vespalib::makeClosure; using vespalib::makeTask; @@ -14,6 +13,13 @@ using vespalib::ThreadStackExecutorBase; namespace proton { +namespace internal { + +struct ThreadId { + FastOS_ThreadId _id; +}; +} + namespace { void @@ -22,11 +28,11 @@ sampleThreadId(FastOS_ThreadId *threadId) *threadId = FastOS_Thread::GetCurrentThreadId(); } -FastOS_ThreadId +std::unique_ptr<internal::ThreadId> getThreadId(ThreadStackExecutorBase &executor) { - FastOS_ThreadId id; - executor.execute(makeTask(makeClosure(&sampleThreadId, &id))); + std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>(); + executor.execute(makeTask(makeClosure(&sampleThreadId, &id->_id))); executor.sync(); return id; } @@ -46,6 +52,8 @@ ExecutorThreadService::ExecutorThreadService(ThreadStackExecutorBase &executor) { } +ExecutorThreadService::~ExecutorThreadService() {} + void ExecutorThreadService::run(Runnable &runnable) { @@ -62,7 +70,7 @@ bool ExecutorThreadService::isCurrentThread() const { FastOS_ThreadId currentThreadId = FastOS_Thread::GetCurrentThreadId(); - return FastOS_Thread::CompareThreadIds(_threadId, currentThreadId); + return FastOS_Thread::CompareThreadIds(_threadId->_id, currentThreadId); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index ea4c51f8858..47e41fcd1db 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -6,6 +6,7 @@ namespace proton { +namespace internal { class ThreadId; } /** * Implementation of IThreadService using an underlying thread stack executor * with a single thread. @@ -14,10 +15,11 @@ class ExecutorThreadService : public searchcorespi::index::IThreadService { private: vespalib::ThreadStackExecutorBase &_executor; - FastOS_ThreadId _threadId; + std::unique_ptr<internal::ThreadId> _threadId; public: ExecutorThreadService(vespalib::ThreadStackExecutorBase &executor); + ~ExecutorThreadService(); /** * Implements IThreadService diff --git a/vespalib/src/tests/executor/stress_test.cpp b/vespalib/src/tests/executor/stress_test.cpp index aa5d9d53955..5c7620afb14 100644 --- a/vespalib/src/tests/executor/stress_test.cpp +++ b/vespalib/src/tests/executor/stress_test.cpp @@ -2,12 +2,11 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/locale/c.h> -#include <cmath> using namespace vespalib; +using namespace std::literals; uint32_t doStuff(uint32_t input) { char buf[128]; @@ -131,7 +130,7 @@ Test::Main() Executor::Task::UP t(new CPUTask(taskSize, result)); t = executor.execute(std::move(t)); while (t.get() != 0) { - FastOS_Thread::Sleep(10); + std::this_thread::sleep_for(10ms); t = executor.execute(std::move(t)); } } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index e9f101f242e..76557762479 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -1,15 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "threadstackexecutorbase.h" +#include <vespa/fastos/thread.h> namespace vespalib { +namespace thread { + +struct ThreadInit : public FastOS_Runnable { + Runnable &worker; + ThreadStackExecutorBase::init_fun_t init_fun; + + explicit ThreadInit(Runnable &worker_in, ThreadStackExecutorBase::init_fun_t init_fun_in) + : worker(worker_in), init_fun(std::move(init_fun_in)) {} + + void Run(FastOS_ThreadInterface *, void *) override; +}; + void -ThreadStackExecutorBase::ThreadInit::Run(FastOS_ThreadInterface *, void *) -{ +ThreadInit::Run(FastOS_ThreadInterface *, void *) { init_fun(worker); } +} + void ThreadStackExecutorBase::BlockedThread::wait() const { @@ -115,7 +129,7 @@ ThreadStackExecutorBase::run() ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit, init_fun_t init_fun) - : _pool(stackSize), + : _pool(std::make_unique<FastOS_ThreadPool>(stackSize)), _monitor(), _stats(), _executorCompletion(), @@ -125,7 +139,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, _taskCount(0), _taskLimit(taskLimit), _closed(false), - _thread_init(std::make_unique<ThreadInit>(*this, std::move(init_fun))) + _thread_init(std::make_unique<thread::ThreadInit>(*this, std::move(init_fun))) { assert(taskLimit > 0); } @@ -135,7 +149,7 @@ ThreadStackExecutorBase::start(uint32_t threads) { assert(threads > 0); for (uint32_t i = 0; i < threads; ++i) { - FastOS_ThreadInterface *thread = _pool.NewThread(_thread_init.get()); + FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get()); assert(thread != 0); (void)thread; } @@ -236,12 +250,12 @@ ThreadStackExecutorBase::cleanup() { shutdown().sync(); _executorCompletion.countDown(); - _pool.Close(); + _pool->Close(); } ThreadStackExecutorBase::~ThreadStackExecutorBase() { - assert(_pool.isClosed()); + assert(_pool->isClosed()); assert(_taskCount == 0); assert(_blocked.empty()); } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index a91114b055e..4ea27a2bcde 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -11,10 +11,13 @@ #include <memory> #include <vector> #include <functional> -#include <vespa/fastos/thread.h> + +class FastOS_ThreadPool; namespace vespalib { +namespace thread { class ThreadInit; } + // Convenience macro used to create a function that can be used as an // init function when creating an executor to inject a frame with the // given name into the stack of all worker threads. @@ -46,13 +49,6 @@ public: using init_fun_t = std::function<int(Runnable&)>; private: - struct ThreadInit : public FastOS_Runnable { - Runnable &worker; - init_fun_t init_fun; - explicit ThreadInit(Runnable &worker_in, init_fun_t init_fun_in) - : worker(worker_in), init_fun(std::move(init_fun_in)) {} - void Run(FastOS_ThreadInterface *, void *) override; - }; struct TaggedTask { Task::UP task; @@ -102,18 +98,18 @@ private: void unblock(); }; - FastOS_ThreadPool _pool; - Monitor _monitor; - Stats _stats; - Gate _executorCompletion; - ArrayQueue<TaggedTask> _tasks; - ArrayQueue<Worker*> _workers; - std::vector<BlockedThread*> _blocked; - EventBarrier<BarrierCompletion> _barrier; - uint32_t _taskCount; - uint32_t _taskLimit; - bool _closed; - std::unique_ptr<ThreadInit> _thread_init; + std::unique_ptr<FastOS_ThreadPool> _pool; + Monitor _monitor; + Stats _stats; + Gate _executorCompletion; + ArrayQueue<TaggedTask> _tasks; + ArrayQueue<Worker*> _workers; + std::vector<BlockedThread*> _blocked; + EventBarrier<BarrierCompletion> _barrier; + uint32_t _taskCount; + uint32_t _taskLimit; + bool _closed; + std::unique_ptr<thread::ThreadInit> _thread_init; void block_thread(const LockGuard &, BlockedThread &blocked_thread); void unblock_threads(const MonitorGuard &); |