diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-03 22:19:37 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-03 22:19:37 +0200 |
commit | 5d8748786cd3db3b4cdc983c83bf65005dcdd149 (patch) | |
tree | 0413bfc7b3b2358fa700c6189ae87c386f7c70c5 | |
parent | 6e9b1b6b04c7b556df3af92ecdec839a9bb5aaf3 (diff) | |
parent | e81ece87547cea19f53f05fe79ae17a5ece4516e (diff) |
Merge pull request #14694 from vespa-engine/balder/add-executor-wakeup
Add wakeup call on the executors to allow to urgent wakeup of lazy ex…
17 files changed, 57 insertions, 15 deletions
diff --git a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp index a0dad889d9a..5dea89b3a63 100644 --- a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp +++ b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp @@ -28,6 +28,7 @@ struct MyExecutor : public Executor { tasks.clear(); } ~MyExecutor() { run_tasks(); } + void wakeup() override { } }; //----------------------------------------------------------------------------- diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index dbfbbd16820..08fa1b11229 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -51,6 +51,7 @@ public: _done.countDown(); return Task::UP(); } + void wakeup() override { } }; class SimpleGetSerialNum : public IGetSerialNum diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 13695b969bf..71a4eddbb08 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -771,6 +771,7 @@ AttributeWriter::forceCommit(SerialNum serialNum, OnWriteDoneType onWriteDone) auto commitTask = std::make_unique<CommitTask>(wc, serialNum, onWriteDone); _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(commitTask)); } + _attributeFieldWriter.wakeup(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp index b13fa2baed3..24f5fa9a5e6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp @@ -81,4 +81,9 @@ void ExecutorThreadService::setTaskLimit(uint32_t taskLimit) { _executor.setTaskLimit(taskLimit); } +void +ExecutorThreadService::wakeup() { + _executor.wakeup(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index 26069b4b8dd..4f27a8f86c2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -39,6 +39,7 @@ public: size_t getNumThreads() const override { return _executor.getNumThreads(); } void setTaskLimit(uint32_t taskLimit) override; + void wakeup() override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index 564857a1e37..360cac6e2ee 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -214,6 +214,7 @@ SearchableFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTy { Parent::internalForceCommit(serialNum, onCommitDone); _writeService.index().execute(makeLambdaTask([=]() { performIndexForceCommit(serialNum, onCommitDone); })); + _writeService.index().wakeup(); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 90875aa8591..84dad379d31 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -265,6 +265,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp { LOG(debug, "internalForceCommit: serial=%" PRIu64 ".", serialNum); _writeService.summary().execute(makeLambdaTask([onDone=onCommitDone]() {(void) onDone;})); + _writeService.summary().wakeup(); std::vector<uint32_t> lidsToReuse; lidsToReuse = _lidReuseDelayer.getReuseLids(); if (!lidsToReuse.empty()) { diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h index 127b696c4ab..350673b9c90 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h @@ -48,6 +48,10 @@ public: _service.setTaskLimit(taskLimit); } + void wakeup() override { + _service.wakeup(); + } + }; } diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index 575552971fa..ff523b1e35d 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -25,7 +25,8 @@ public: Stats getStats() override { return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); } - virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } + void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } + void wakeup() override { } }; } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index d457de26f54..46562a2902c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -50,6 +50,10 @@ public: * @param task unique pointer to the task to be executed */ virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0; + /** + * Call this one to ensure you get the attention of the workers. + */ + virtual void wakeup() { } /** * Wrap lambda function into a task and schedule it to be run. diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 963264a62e7..cf385275bfb 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -14,8 +14,17 @@ namespace { constexpr uint32_t stackSize = 128 * 1024; constexpr uint8_t MAGIC = 255; +bool +isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & executors) { + for (const auto &executor : executors) { + if (dynamic_cast<const vespalib::SingleExecutor *>(executor.get()) == nullptr) { + return false; + } + } + return true; } +} std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) @@ -46,6 +55,7 @@ SequencedTaskExecutor::~SequencedTaskExecutor() SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) : ISequencedTaskExecutor(executors->size()), _executors(std::move(executors)), + _lazyExecutors(isLazy(*_executors)), _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC), _mutex(), _nextId(0) @@ -70,18 +80,21 @@ SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP t } void -SequencedTaskExecutor::sync() -{ +SequencedTaskExecutor::sync() { + wakeup(); for (auto &executor : *_executors) { - SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get()); - if (single) { + executor->sync(); + } +} + +void +SequencedTaskExecutor::wakeup() { + if (_lazyExecutors) { + for (auto &executor : *_executors) { //Enforce parallel wakeup of napping executors. - single->startSync(); + executor->wakeup(); } } - for (auto &executor : *_executors) { - executor->sync(); - } } SequencedTaskExecutor::Stats diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index c37bd2eecf4..180cd1cc6cc 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -19,13 +19,14 @@ public: using ISequencedTaskExecutor::getExecutorId; using OptimizeFor = vespalib::Executor::OptimizeFor; - ~SequencedTaskExecutor(); + ~SequencedTaskExecutor() override; void setTaskLimit(uint32_t taskLimit) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; ExecutorId getExecutorId(uint64_t componentId) const override; void sync() override; Stats getStats() override; + void wakeup() override; /* * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside. @@ -39,9 +40,10 @@ public: uint32_t getComponentHashSize() const { return _component2Id.size(); } uint32_t getComponentEffectiveHashSize() const { return _nextId; } private: - SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); + explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; + const bool _lazyExecutors; mutable std::vector<uint8_t> _component2Id; mutable std::mutex _mutex; mutable uint32_t _nextId; diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index a17037799a3..65857183879 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -83,7 +83,7 @@ SingleExecutor::drain(Lock & lock) { } void -SingleExecutor::startSync() { +SingleExecutor::wakeup() { _consumerCondition.notify_one(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index a58128c15aa..7b8a2741d87 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -24,7 +24,7 @@ public: Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; SingleExecutor & sync() override; - void startSync(); + void wakeup() override; size_t getNumThreads() const override; uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h index a412832ecaf..57ad28344b9 100644 --- a/vespalib/src/vespa/vespalib/util/executor.h +++ b/vespalib/src/vespa/vespalib/util/executor.h @@ -38,9 +38,10 @@ public: virtual Task::UP execute(Task::UP task) = 0; /** - * Empty. + * In case you have a lazy executor that naps inbetween. **/ - virtual ~Executor() {} + virtual void wakeup() = 0; + virtual ~Executor() =default; }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index ad5d78d5ab6..18ab1ac9dfb 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -174,6 +174,11 @@ ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit) } void +ThreadStackExecutorBase::wakeup() { + // Nothing to do here as workers are always attentive. +} + +void ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit) { MonitorGuard monitor(_monitor); diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 6333a8fc66e..ecd9918292f 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -219,6 +219,7 @@ public: size_t getNumThreads() const override; void setTaskLimit(uint32_t taskLimit) override; + void wakeup() override; /** * Shut down this executor. This will make this executor reject |