summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-26 21:46:34 +0100
committerGitHub <noreply@github.com>2020-03-26 21:46:34 +0100
commit8f0f88ff1b16d13371984c30ffc79d1887788113 (patch)
tree66caadf8364c121236dcc63ee87cfaf141b34d10 /staging_vespalib
parentfa9c389e779851c563ccef77a2a37a7277c24e1c (diff)
parent36b3be76ae151682295faa04de17989f3600c29b (diff)
Merge pull request #12727 from vespa-engine/balder/let-optimize-flag-control-index-and-summary-executor-too
Add shutdown to thread interface.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp33
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h3
2 files changed, 23 insertions, 13 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 4b92d9a9687..90eb18c23ef 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -18,11 +18,13 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_queueSize(),
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
- _wp(0)
+ _wp(0),
+ _closed(false)
{
_thread.start();
}
SingleExecutor::~SingleExecutor() {
+ shutdown();
sync();
_thread.stop().join();
}
@@ -32,16 +34,6 @@ SingleExecutor::getNumThreads() const {
return 1;
}
-uint64_t
-SingleExecutor::addTask(Task::UP task) {
- Lock guard(_mutex);
- wait_for_room(guard);
- uint64_t wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_release);
- return wp;
-}
-
void
SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) {
_producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed);
@@ -51,7 +43,17 @@ SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeup
Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
- uint64_t wp = addTask(std::move(task));
+ uint64_t wp;
+ {
+ Lock guard(_mutex);
+ if (_closed) {
+ return task;
+ }
+ wait_for_room(guard);
+ wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ }
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
_consumerCondition.notify_one();
}
@@ -88,6 +90,13 @@ SingleExecutor::sync() {
return *this;
}
+SingleExecutor &
+SingleExecutor::shutdown() {
+ Lock lock(_mutex);
+ _closed = true;
+ return *this;
+}
+
void
SingleExecutor::run() {
while (!_thread.stopped()) {
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 5beac5c1bec..3d759769ea3 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -27,9 +27,9 @@ public:
size_t getNumThreads() const override;
uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
Stats getStats() override;
+ SingleExecutor & shutdown() override;
private:
using Lock = std::unique_lock<std::mutex>;
- uint64_t addTask(Task::UP task);
void drain(Lock & lock);
void run() override;
void drain_tasks();
@@ -56,6 +56,7 @@ private:
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
+ bool _closed;
};
}