summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp1
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp1
-rw-r--r--searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp51
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h33
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h1
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp33
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h3
-rw-r--r--vespalib/src/vespa/vespalib/util/threadexecutor.h1
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h2
19 files changed, 104 insertions, 67 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 30fed6fa49e..6c9ffc210a1 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -55,7 +55,6 @@ using storage::spi::RemoveResult;
using storage::spi::Result;
using storage::spi::Timestamp;
using storage::spi::UpdateResult;
-using vespalib::BlockingThreadStackExecutor;
using vespalib::ThreadStackExecutor;
using vespalib::ThreadStackExecutorBase;
using vespalib::makeClosure;
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 264cf6d8cfa..8cc075773f7 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -48,7 +48,6 @@ using search::memoryindex::FieldIndexCollection;
using search::queryeval::Source;
using std::set;
using std::string;
-using vespalib::BlockingThreadStackExecutor;
using vespalib::ThreadStackExecutor;
using vespalib::makeLambdaTask;
using std::chrono::duration_cast;
diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
index dfb1268aaa6..c26b008f769 100644
--- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
+++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
@@ -243,7 +243,6 @@ struct MyLog
struct MyProtonConfigurerOwner : public IProtonConfigurerOwner,
public MyLog
{
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
vespalib::ThreadStackExecutor _executor;
std::map<DocTypeName, std::shared_ptr<MyDocumentDBConfigOwner>> _dbs;
@@ -254,9 +253,9 @@ struct MyProtonConfigurerOwner : public IProtonConfigurerOwner,
_dbs()
{
}
- virtual ~MyProtonConfigurerOwner() { }
+ ~MyProtonConfigurerOwner() { }
- virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName,
+ std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName,
document::BucketSpace bucketSpace,
const vespalib::string &configId,
const std::shared_ptr<BootstrapConfig> &bootstrapConfig,
@@ -275,14 +274,14 @@ struct MyProtonConfigurerOwner : public IProtonConfigurerOwner,
_log.push_back(os.str());
return db;
}
- virtual void removeDocumentDB(const DocTypeName &docTypeName) override {
+ void removeDocumentDB(const DocTypeName &docTypeName) override {
ASSERT_FALSE(_dbs.find(docTypeName) == _dbs.end());
_dbs.erase(docTypeName);
std::ostringstream os;
os << "remove db " << docTypeName.getName();
_log.push_back(os.str());
}
- virtual void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override {
+ void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override {
std::ostringstream os;
os << "apply config " << bootstrapConfig->getGeneration();
_log.push_back(os.str());
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index f296e264903..1a1d97a657b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -85,7 +85,7 @@ private:
DocumentStoreCacheStats() : total(), readySubDb(), notReadySubDb(), removedSubDb() {}
};
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
using IFlushTargetList = std::vector<std::shared_ptr<searchcorespi::IFlushTarget>>;
using StatusReportUP = std::unique_ptr<StatusReport>;
using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType;
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
index 6ca385711b0..b13fa2baed3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
@@ -2,13 +2,14 @@
#include "executor_thread_service.h"
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/fastos/thread.h>
using vespalib::makeLambdaTask;
using vespalib::Executor;
using vespalib::Gate;
using vespalib::Runnable;
-using vespalib::ThreadStackExecutorBase;
+using vespalib::SyncableThreadExecutor;
namespace proton {
@@ -28,7 +29,7 @@ sampleThreadId(FastOS_ThreadId *threadId)
}
std::unique_ptr<internal::ThreadId>
-getThreadId(ThreadStackExecutorBase &executor)
+getThreadId(SyncableThreadExecutor &executor)
{
std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>();
executor.execute(makeLambdaTask([threadId=&id->_id] { sampleThreadId(threadId);}));
@@ -45,7 +46,7 @@ runRunnable(Runnable *runnable, Gate *gate)
} // namespace
-ExecutorThreadService::ExecutorThreadService(ThreadStackExecutorBase &executor)
+ExecutorThreadService::ExecutorThreadService(SyncableThreadExecutor &executor)
: _executor(executor),
_threadId(getThreadId(executor))
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
index ccdfb6b72cd..26069b4b8dd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
@@ -2,7 +2,7 @@
#pragma once
#include <vespa/searchcorespi/index/i_thread_service.h>
-#include <vespa/vespalib/util/threadstackexecutorbase.h>
+#include <vespa/vespalib/util/threadexecutor.h>
namespace proton {
@@ -14,11 +14,11 @@ namespace internal { struct ThreadId; }
class ExecutorThreadService : public searchcorespi::index::IThreadService
{
private:
- vespalib::ThreadStackExecutorBase &_executor;
+ vespalib::SyncableThreadExecutor &_executor;
std::unique_ptr<internal::ThreadId> _threadId;
public:
- ExecutorThreadService(vespalib::ThreadStackExecutorBase &executor);
+ ExecutorThreadService(vespalib::SyncableThreadExecutor &executor);
~ExecutorThreadService();
Stats getStats() override;
@@ -31,6 +31,10 @@ public:
_executor.sync();
return *this;
}
+ ExecutorThreadService & shutdown() override {
+ _executor.shutdown();
+ return *this;
+ }
bool isCurrentThread() const override;
size_t getNumThreads() const override { return _executor.getNumThreads(); }
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 6e7b4967f6d..a725b00d485 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -3,23 +3,42 @@
#include "executorthreadingservice.h"
#include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h>
#include <vespa/searchlib/common/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
-using vespalib::ThreadStackExecutorBase;
+
+using vespalib::SyncableThreadExecutor;
+using vespalib::BlockingThreadStackExecutor;
+using vespalib::SingleExecutor;
using search::SequencedTaskExecutor;
+using OptimizeFor = vespalib::Executor::OptimizeFor;
namespace proton {
-ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutorBase & sharedExecutor,
+namespace {
+
+std::unique_ptr<SyncableThreadExecutor>
+createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize) {
+ if (optimize == OptimizeFor::THROUGHPUT) {
+ return std::make_unique<SingleExecutor>(taskLimit);
+ } else {
+ return std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit);
+ }
+}
+
+}
+
+ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor & sharedExecutor,
uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
OptimizeFor optimize)
: _sharedExecutor(sharedExecutor),
_masterExecutor(1, stackSize),
- _indexExecutor(1, stackSize, taskLimit),
- _summaryExecutor(1, stackSize, taskLimit),
+ _indexExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
+ _summaryExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
_masterService(_masterExecutor),
- _indexService(_indexExecutor),
- _summaryService(_summaryExecutor),
+ _indexService(*_indexExecutor),
+ _summaryService(*_summaryExecutor),
_indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)),
_indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)),
_attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize))
@@ -36,8 +55,8 @@ ExecutorThreadingService::sync()
_masterExecutor.sync();
}
_attributeFieldWriter->sync();
- _indexExecutor.sync();
- _summaryExecutor.sync();
+ _indexExecutor->sync();
+ _summaryExecutor->sync();
_indexFieldInverter->sync();
_indexFieldWriter->sync();
if (!isMasterThread) {
@@ -52,10 +71,10 @@ ExecutorThreadingService::shutdown()
_masterExecutor.shutdown();
_masterExecutor.sync();
_attributeFieldWriter->sync();
- _summaryExecutor.shutdown();
- _summaryExecutor.sync();
- _indexExecutor.shutdown();
- _indexExecutor.sync();
+ _summaryExecutor->shutdown();
+ _summaryExecutor->sync();
+ _indexExecutor->shutdown();
+ _indexExecutor->sync();
_indexFieldInverter->sync();
_indexFieldWriter->sync();
}
@@ -63,8 +82,8 @@ ExecutorThreadingService::shutdown()
void
ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit)
{
- _indexExecutor.setTaskLimit(taskLimit);
- _summaryExecutor.setTaskLimit(summaryTaskLimit);
+ _indexExecutor->setTaskLimit(taskLimit);
+ _summaryExecutor->setTaskLimit(summaryTaskLimit);
_indexFieldInverter->setTaskLimit(taskLimit);
_indexFieldWriter->setTaskLimit(taskLimit);
_attributeFieldWriter->setTaskLimit(taskLimit);
@@ -74,8 +93,8 @@ ExecutorThreadingServiceStats
ExecutorThreadingService::getStats()
{
return ExecutorThreadingServiceStats(_masterExecutor.getStats(),
- _indexExecutor.getStats(),
- _summaryExecutor.getStats(),
+ _indexExecutor->getStats(),
+ _summaryExecutor->getStats(),
_sharedExecutor.getStats(),
_indexFieldInverter->getStats(),
_indexFieldWriter->getStats(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 2e4dd2035f3..4d018e2b6f3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -3,7 +3,6 @@
#include "executor_thread_service.h"
#include <vespa/searchcorespi/index/ithreadingservice.h>
-#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
namespace proton {
@@ -17,16 +16,16 @@ class ExecutorThreadingServiceStats;
class ExecutorThreadingService : public searchcorespi::index::IThreadingService
{
private:
- vespalib::ThreadStackExecutorBase & _sharedExecutor;
- vespalib::ThreadStackExecutor _masterExecutor;
- vespalib::BlockingThreadStackExecutor _indexExecutor;
- vespalib::BlockingThreadStackExecutor _summaryExecutor;
- ExecutorThreadService _masterService;
- ExecutorThreadService _indexService;
- ExecutorThreadService _summaryService;
- std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter;
- std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter;
- std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter;
+ vespalib::SyncableThreadExecutor & _sharedExecutor;
+ vespalib::ThreadStackExecutor _masterExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor;
+ ExecutorThreadService _masterService;
+ ExecutorThreadService _indexService;
+ ExecutorThreadService _summaryService;
+ std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter;
+ std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter;
+ std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter;
public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
@@ -36,7 +35,7 @@ public:
* @stackSize The size of the stack of the underlying executors.
* @taskLimit The task limit for the index executor.
*/
- ExecutorThreadingService(vespalib::ThreadStackExecutorBase &sharedExecutor,
+ ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor,
uint32_t threads = 1,
uint32_t stackSize = 128 * 1024,
uint32_t taskLimit = 1000,
@@ -56,11 +55,11 @@ public:
vespalib::ThreadStackExecutorBase &getMasterExecutor() {
return _masterExecutor;
}
- vespalib::ThreadStackExecutorBase &getIndexExecutor() {
- return _indexExecutor;
+ vespalib::SyncableThreadExecutor &getIndexExecutor() {
+ return *_indexExecutor;
}
- vespalib::ThreadStackExecutorBase &getSummaryExecutor() {
- return _summaryExecutor;
+ vespalib::SyncableThreadExecutor &getSummaryExecutor() {
+ return *_summaryExecutor;
}
/**
@@ -76,7 +75,7 @@ public:
searchcorespi::index::IThreadService &summary() override {
return _summaryService;
}
- vespalib::ThreadExecutor &shared() override {
+ vespalib::SyncableThreadExecutor &shared() override {
return _sharedExecutor;
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
index fec8430e41d..5a457b168ec 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
@@ -18,8 +18,8 @@ class DocumentDBConfigOwner;
*/
class IProtonConfigurerOwner
{
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
public:
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
virtual ~IProtonConfigurerOwner() { }
virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName,
document::BucketSpace bucketSpace,
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 20de5bb07c1..3f8db3f2ff9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -32,6 +32,7 @@
#include <vespa/vespalib/util/host_name.h>
#include <vespa/vespalib/util/random.h>
#include <vespa/vespalib/net/state_server.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/searchlib/aggregation/forcelink.hpp>
#include <vespa/searchlib/expression/forcelink.hpp>
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 410f45162e4..4c9d4c77cc4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -57,7 +57,7 @@ private:
typedef search::engine::MonitorClient MonitorClient;
typedef std::map<DocTypeName, DocumentDB::SP> DocumentDBMap;
typedef BootstrapConfig::ProtonConfigSP ProtonConfigSP;
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
using BucketSpace = document::BucketSpace;
struct MetricsUpdateHook : metrics::UpdateHook
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
index 0b9293a4aab..45e3c978dd9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
@@ -39,7 +39,7 @@ getBucketSpace(const BootstrapConfig &bootstrapConfig, const DocTypeName &name)
}
-ProtonConfigurer::ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor,
+ProtonConfigurer::ProtonConfigurer(vespalib::SyncableThreadExecutor &executor,
IProtonConfigurerOwner &owner,
const std::unique_ptr<IProtonDiskLayout> &diskLayout)
: IProtonConfigurer(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
index c896f12bd4f..54399a26365 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
@@ -25,7 +25,7 @@ class IProtonDiskLayout;
class ProtonConfigurer : public IProtonConfigurer
{
using DocumentDBs = std::map<DocTypeName, std::pair<std::weak_ptr<IDocumentDBConfigOwner>, std::weak_ptr<DocumentDBDirectoryHolder>>>;
- using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+ using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>;
ExecutorThreadService _executor;
IProtonConfigurerOwner &_owner;
@@ -48,11 +48,11 @@ class ProtonConfigurer : public IProtonConfigurer
void pruneInitialDocumentDBDirs(const ProtonConfigSnapshot &configSnapshot);
public:
- ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor,
+ ProtonConfigurer(vespalib::SyncableThreadExecutor &executor,
IProtonConfigurerOwner &owner,
const std::unique_ptr<IProtonDiskLayout> &diskLayout);
- ~ProtonConfigurer();
+ ~ProtonConfigurer() override;
void setAllowReconfig(bool allowReconfig);
diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
index 766bdeeefb0..127b696c4ab 100644
--- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
@@ -31,6 +31,10 @@ public:
_service.sync();
return *this;
}
+ ThreadServiceObserver &shutdown() override {
+ _service.shutdown();
+ return *this;
+ }
bool isCurrentThread() const override {
return _service.isCurrentThread();
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
index 7ac9c0c68f2..23c62d179b1 100644
--- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h
@@ -69,6 +69,7 @@ public:
search::ISequencedTaskExecutor &attributeFieldWriter() override {
return _attributeFieldWriter;
}
+
};
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 4b92d9a9687..90eb18c23ef 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -18,11 +18,13 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_queueSize(),
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
- _wp(0)
+ _wp(0),
+ _closed(false)
{
_thread.start();
}
SingleExecutor::~SingleExecutor() {
+ shutdown();
sync();
_thread.stop().join();
}
@@ -32,16 +34,6 @@ SingleExecutor::getNumThreads() const {
return 1;
}
-uint64_t
-SingleExecutor::addTask(Task::UP task) {
- Lock guard(_mutex);
- wait_for_room(guard);
- uint64_t wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_release);
- return wp;
-}
-
void
SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) {
_producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed);
@@ -51,7 +43,17 @@ SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeup
Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
- uint64_t wp = addTask(std::move(task));
+ uint64_t wp;
+ {
+ Lock guard(_mutex);
+ if (_closed) {
+ return task;
+ }
+ wait_for_room(guard);
+ wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ }
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
_consumerCondition.notify_one();
}
@@ -88,6 +90,13 @@ SingleExecutor::sync() {
return *this;
}
+SingleExecutor &
+SingleExecutor::shutdown() {
+ Lock lock(_mutex);
+ _closed = true;
+ return *this;
+}
+
void
SingleExecutor::run() {
while (!_thread.stopped()) {
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 5beac5c1bec..3d759769ea3 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -27,9 +27,9 @@ public:
size_t getNumThreads() const override;
uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
Stats getStats() override;
+ SingleExecutor & shutdown() override;
private:
using Lock = std::unique_lock<std::mutex>;
- uint64_t addTask(Task::UP task);
void drain(Lock & lock);
void run() override;
void drain_tasks();
@@ -56,6 +56,7 @@ private:
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
+ bool _closed;
};
}
diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h
index 202e516bc60..61a5d9d5ac7 100644
--- a/vespalib/src/vespa/vespalib/util/threadexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h
@@ -40,6 +40,7 @@ public:
class SyncableThreadExecutor : public ThreadExecutor, public Syncable
{
public:
+ virtual SyncableThreadExecutor & shutdown() = 0;
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 2c0bc56d6df..6333a8fc66e 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -226,7 +226,7 @@ public:
*
* @return this object; for chaining
**/
- ThreadStackExecutorBase &shutdown();
+ ThreadStackExecutorBase &shutdown() override;
/**
* Will invoke shutdown then sync.