summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-23 16:01:22 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-23 16:13:09 +0000
commit36b3be76ae151682295faa04de17989f3600c29b (patch)
treefbe5156d396bfa694f9acd932b466088c20d2e63 /staging_vespalib
parent23dcf1db150a66ec66cf746d4234982fdbb0e6e2 (diff)
Add shutdown to thread interface.
Let the optimize config control index and summary executor too.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp33
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h3
2 files changed, 23 insertions, 13 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 73612452b09..2d0ffc721e6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -18,11 +18,13 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_maxPending(0),
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
- _wp(0)
+ _wp(0),
+ _closed(false)
{
_thread.start();
}
SingleExecutor::~SingleExecutor() {
+ shutdown();
sync();
_thread.stop().join();
}
@@ -32,16 +34,6 @@ SingleExecutor::getNumThreads() const {
return 1;
}
-uint64_t
-SingleExecutor::addTask(Task::UP task) {
- Lock guard(_mutex);
- wait_for_room(guard);
- uint64_t wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_release);
- return wp;
-}
-
void
SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) {
_producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed);
@@ -51,7 +43,17 @@ SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeup
Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
- uint64_t wp = addTask(std::move(task));
+ uint64_t wp;
+ {
+ Lock guard(_mutex);
+ if (_closed) {
+ return task;
+ }
+ wait_for_room(guard);
+ wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ }
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
_consumerCondition.notify_one();
}
@@ -88,6 +90,13 @@ SingleExecutor::sync() {
return *this;
}
+SingleExecutor &
+SingleExecutor::shutdown() {
+ Lock lock(_mutex);
+ _closed = true;
+ return *this;
+}
+
void
SingleExecutor::run() {
while (!_thread.stopped()) {
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 9c3ebb4caf7..1b9b4f3afc4 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -27,9 +27,9 @@ public:
size_t getNumThreads() const override;
uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
Stats getStats() override;
+ SingleExecutor & shutdown() override;
private:
using Lock = std::unique_lock<std::mutex>;
- uint64_t addTask(Task::UP task);
void drain(Lock & lock);
void run() override;
void drain_tasks();
@@ -56,6 +56,7 @@ private:
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
+ bool _closed;
};
}