summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-03 22:19:37 +0200
committerGitHub <noreply@github.com>2020-10-03 22:19:37 +0200
commit5d8748786cd3db3b4cdc983c83bf65005dcdd149 (patch)
tree0413bfc7b3b2358fa700c6189ae87c386f7c70c5
parent6e9b1b6b04c7b556df3af92ecdec839a9bb5aaf3 (diff)
parente81ece87547cea19f53f05fe79ae17a5ece4516e (diff)
Merge pull request #14694 from vespa-engine/balder/add-executor-wakeup
Add wakeup call on the executors to allow to urgent wakeup of lazy ex…
-rw-r--r--eval/src/tests/eval/compile_cache/compile_cache_test.cpp1
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h3
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp29
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h6
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/executor.h5
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp5
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h1
17 files changed, 57 insertions, 15 deletions
diff --git a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
index a0dad889d9a..5dea89b3a63 100644
--- a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
+++ b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
@@ -28,6 +28,7 @@ struct MyExecutor : public Executor {
tasks.clear();
}
~MyExecutor() { run_tasks(); }
+ void wakeup() override { }
};
//-----------------------------------------------------------------------------
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index dbfbbd16820..08fa1b11229 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -51,6 +51,7 @@ public:
_done.countDown();
return Task::UP();
}
+ void wakeup() override { }
};
class SimpleGetSerialNum : public IGetSerialNum
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
index 13695b969bf..71a4eddbb08 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
+++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp
@@ -771,6 +771,7 @@ AttributeWriter::forceCommit(SerialNum serialNum, OnWriteDoneType onWriteDone)
auto commitTask = std::make_unique<CommitTask>(wc, serialNum, onWriteDone);
_attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(commitTask));
}
+ _attributeFieldWriter.wakeup();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
index b13fa2baed3..24f5fa9a5e6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp
@@ -81,4 +81,9 @@ void ExecutorThreadService::setTaskLimit(uint32_t taskLimit) {
_executor.setTaskLimit(taskLimit);
}
+void
+ExecutorThreadService::wakeup() {
+ _executor.wakeup();
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
index 26069b4b8dd..4f27a8f86c2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
@@ -39,6 +39,7 @@ public:
size_t getNumThreads() const override { return _executor.getNumThreads(); }
void setTaskLimit(uint32_t taskLimit) override;
+ void wakeup() override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
index 564857a1e37..360cac6e2ee 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
@@ -214,6 +214,7 @@ SearchableFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTy
{
Parent::internalForceCommit(serialNum, onCommitDone);
_writeService.index().execute(makeLambdaTask([=]() { performIndexForceCommit(serialNum, onCommitDone); }));
+ _writeService.index().wakeup();
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 90875aa8591..84dad379d31 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -265,6 +265,7 @@ StoreOnlyFeedView::internalForceCommit(SerialNum serialNum, OnForceCommitDoneTyp
{
LOG(debug, "internalForceCommit: serial=%" PRIu64 ".", serialNum);
_writeService.summary().execute(makeLambdaTask([onDone=onCommitDone]() {(void) onDone;}));
+ _writeService.summary().wakeup();
std::vector<uint32_t> lidsToReuse;
lidsToReuse = _lidReuseDelayer.getReuseLids();
if (!lidsToReuse.empty()) {
diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
index 127b696c4ab..350673b9c90 100644
--- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
@@ -48,6 +48,10 @@ public:
_service.setTaskLimit(taskLimit);
}
+ void wakeup() override {
+ _service.wakeup();
+ }
+
};
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
index 575552971fa..ff523b1e35d 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -25,7 +25,8 @@ public:
Stats getStats() override {
return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
}
- virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
+ void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
+ void wakeup() override { }
};
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index d457de26f54..46562a2902c 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -50,6 +50,10 @@ public:
* @param task unique pointer to the task to be executed
*/
virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0;
+ /**
+ * Call this one to ensure you get the attention of the workers.
+ */
+ virtual void wakeup() { }
/**
* Wrap lambda function into a task and schedule it to be run.
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 963264a62e7..cf385275bfb 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -14,8 +14,17 @@ namespace {
constexpr uint32_t stackSize = 128 * 1024;
constexpr uint8_t MAGIC = 255;
+bool
+isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & executors) {
+ for (const auto &executor : executors) {
+ if (dynamic_cast<const vespalib::SingleExecutor *>(executor.get()) == nullptr) {
+ return false;
+ }
+ }
+ return true;
}
+}
std::unique_ptr<ISequencedTaskExecutor>
SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
@@ -46,6 +55,7 @@ SequencedTaskExecutor::~SequencedTaskExecutor()
SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
: ISequencedTaskExecutor(executors->size()),
_executors(std::move(executors)),
+ _lazyExecutors(isLazy(*_executors)),
_component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC),
_mutex(),
_nextId(0)
@@ -70,18 +80,21 @@ SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP t
}
void
-SequencedTaskExecutor::sync()
-{
+SequencedTaskExecutor::sync() {
+ wakeup();
for (auto &executor : *_executors) {
- SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get());
- if (single) {
+ executor->sync();
+ }
+}
+
+void
+SequencedTaskExecutor::wakeup() {
+ if (_lazyExecutors) {
+ for (auto &executor : *_executors) {
//Enforce parallel wakeup of napping executors.
- single->startSync();
+ executor->wakeup();
}
}
- for (auto &executor : *_executors) {
- executor->sync();
- }
}
SequencedTaskExecutor::Stats
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index c37bd2eecf4..180cd1cc6cc 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -19,13 +19,14 @@ public:
using ISequencedTaskExecutor::getExecutorId;
using OptimizeFor = vespalib::Executor::OptimizeFor;
- ~SequencedTaskExecutor();
+ ~SequencedTaskExecutor() override;
void setTaskLimit(uint32_t taskLimit) override;
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
ExecutorId getExecutorId(uint64_t componentId) const override;
void sync() override;
Stats getStats() override;
+ void wakeup() override;
/*
* Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside.
@@ -39,9 +40,10 @@ public:
uint32_t getComponentHashSize() const { return _component2Id.size(); }
uint32_t getComponentEffectiveHashSize() const { return _nextId; }
private:
- SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
+ explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
+ const bool _lazyExecutors;
mutable std::vector<uint8_t> _component2Id;
mutable std::mutex _mutex;
mutable uint32_t _nextId;
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index a17037799a3..65857183879 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -83,7 +83,7 @@ SingleExecutor::drain(Lock & lock) {
}
void
-SingleExecutor::startSync() {
+SingleExecutor::wakeup() {
_consumerCondition.notify_one();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index a58128c15aa..7b8a2741d87 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -24,7 +24,7 @@ public:
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
SingleExecutor & sync() override;
- void startSync();
+ void wakeup() override;
size_t getNumThreads() const override;
uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
Stats getStats() override;
diff --git a/vespalib/src/vespa/vespalib/util/executor.h b/vespalib/src/vespa/vespalib/util/executor.h
index a412832ecaf..57ad28344b9 100644
--- a/vespalib/src/vespa/vespalib/util/executor.h
+++ b/vespalib/src/vespa/vespalib/util/executor.h
@@ -38,9 +38,10 @@ public:
virtual Task::UP execute(Task::UP task) = 0;
/**
- * Empty.
+ * In case you have a lazy executor that naps inbetween.
**/
- virtual ~Executor() {}
+ virtual void wakeup() = 0;
+ virtual ~Executor() =default;
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index ad5d78d5ab6..18ab1ac9dfb 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -174,6 +174,11 @@ ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit)
}
void
+ThreadStackExecutorBase::wakeup() {
+ // Nothing to do here as workers are always attentive.
+}
+
+void
ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit)
{
MonitorGuard monitor(_monitor);
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 6333a8fc66e..ecd9918292f 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -219,6 +219,7 @@ public:
size_t getNumThreads() const override;
void setTaskLimit(uint32_t taskLimit) override;
+ void wakeup() override;
/**
* Shut down this executor. This will make this executor reject