aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp')
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp86
1 files changed, 86 insertions, 0 deletions
diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
new file mode 100644
index 00000000000..7711eddf51c
--- /dev/null
+++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
@@ -0,0 +1,86 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "threadpoolimpl.h"
+#include "threadimpl.h"
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <thread>
+#include <vespa/log/log.h>
+LOG_SETUP(".storageframework.thread_pool_impl");
+
+using namespace std::chrono_literals;
+using vespalib::IllegalStateException;
+
+namespace storage::framework::defaultimplementation {
+
+ThreadPoolImpl::ThreadPoolImpl(Clock& clock)
+ : _backendThreadPool(512_Ki),
+ _clock(clock),
+ _stopping(false)
+{ }
+
+ThreadPoolImpl::~ThreadPoolImpl()
+{
+ {
+ std::lock_guard lock(_threadVectorLock);
+ _stopping = true;
+ for (ThreadImpl * thread : _threads) {
+ thread->interrupt();
+ }
+ for (ThreadImpl * thread : _threads) {
+ thread->join();
+ }
+ }
+ for (uint32_t i=0; true; i+=10) {
+ {
+ std::lock_guard lock(_threadVectorLock);
+ if (_threads.empty()) break;
+ }
+ if (i > 1000) {
+ fprintf(stderr, "Failed to kill thread pool. Threads won't die. (And if allowing thread pool object"
+ " to be deleted this will create a segfault later)\n");
+ LOG_ABORT("should not be reached");
+ }
+ std::this_thread::sleep_for(10ms);
+ }
+ _backendThreadPool.Close();
+}
+
+Thread::UP
+ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, vespalib::duration waitTime,
+ vespalib::duration maxProcessTime, int ticksBeforeWait,
+ std::optional<vespalib::CpuUsage::Category> cpu_category)
+{
+ std::lock_guard lock(_threadVectorLock);
+ if (_stopping) {
+ throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC);
+ }
+ auto thread = std::make_unique<ThreadImpl>(*this, runnable, id, waitTime, maxProcessTime, ticksBeforeWait, cpu_category);
+ _threads.push_back(thread.get());
+ return thread;
+}
+
+void
+ThreadPoolImpl::visitThreads(ThreadVisitor& visitor) const
+{
+ std::lock_guard lock(_threadVectorLock);
+ for (const ThreadImpl * thread : _threads) {
+ visitor.visitThread(thread->getId(), thread->getProperties(), thread->getTickData());
+ }
+}
+
+void
+ThreadPoolImpl::unregisterThread(ThreadImpl& t)
+{
+ std::lock_guard lock(_threadVectorLock);
+ std::vector<ThreadImpl*> threads;
+ threads.reserve(_threads.size());
+ for (ThreadImpl * thread : _threads) {
+ if (thread != &t) {
+ threads.push_back(thread);
+ }
+ }
+ _threads.swap(threads);
+}
+
+}