diff options
Diffstat (limited to 'staging_vespalib')
6 files changed, 33 insertions, 13 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index 575552971fa..ff523b1e35d 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -25,7 +25,8 @@ public: Stats getStats() override { return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); } - virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } + void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } + void wakeup() override { } }; } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index d457de26f54..46562a2902c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -50,6 +50,10 @@ public: * @param task unique pointer to the task to be executed */ virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0; + /** + * Call this one to ensure you get the attention of the workers. + */ + virtual void wakeup() { } /** * Wrap lambda function into a task and schedule it to be run. diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 963264a62e7..cf385275bfb 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -14,8 +14,17 @@ namespace { constexpr uint32_t stackSize = 128 * 1024; constexpr uint8_t MAGIC = 255; +bool +isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & executors) { + for (const auto &executor : executors) { + if (dynamic_cast<const vespalib::SingleExecutor *>(executor.get()) == nullptr) { + return false; + } + } + return true; } +} std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) @@ -46,6 +55,7 @@ SequencedTaskExecutor::~SequencedTaskExecutor() SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) : ISequencedTaskExecutor(executors->size()), _executors(std::move(executors)), + _lazyExecutors(isLazy(*_executors)), _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC), _mutex(), _nextId(0) @@ -70,18 +80,21 @@ SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP t } void -SequencedTaskExecutor::sync() -{ +SequencedTaskExecutor::sync() { + wakeup(); for (auto &executor : *_executors) { - SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get()); - if (single) { + executor->sync(); + } +} + +void +SequencedTaskExecutor::wakeup() { + if (_lazyExecutors) { + for (auto &executor : *_executors) { //Enforce parallel wakeup of napping executors. - single->startSync(); + executor->wakeup(); } } - for (auto &executor : *_executors) { - executor->sync(); - } } SequencedTaskExecutor::Stats diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index c37bd2eecf4..180cd1cc6cc 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -19,13 +19,14 @@ public: using ISequencedTaskExecutor::getExecutorId; using OptimizeFor = vespalib::Executor::OptimizeFor; - ~SequencedTaskExecutor(); + ~SequencedTaskExecutor() override; void setTaskLimit(uint32_t taskLimit) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; ExecutorId getExecutorId(uint64_t componentId) const override; void sync() override; Stats getStats() override; + void wakeup() override; /* * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside. @@ -39,9 +40,10 @@ public: uint32_t getComponentHashSize() const { return _component2Id.size(); } uint32_t getComponentEffectiveHashSize() const { return _nextId; } private: - SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); + explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; + const bool _lazyExecutors; mutable std::vector<uint8_t> _component2Id; mutable std::mutex _mutex; mutable uint32_t _nextId; diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index a17037799a3..65857183879 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -83,7 +83,7 @@ SingleExecutor::drain(Lock & lock) { } void -SingleExecutor::startSync() { +SingleExecutor::wakeup() { _consumerCondition.notify_one(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index a58128c15aa..7b8a2741d87 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -24,7 +24,7 @@ public: Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; SingleExecutor & sync() override; - void startSync(); + void wakeup() override; size_t getNumThreads() const override; uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; |