summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-02 08:21:18 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-12-02 08:21:18 +0000
commit35d12fd187d562ada3a8c3aaddda67f654d1dbda (patch)
tree51dc86335f612a5b5e4ad01bee801ea6054bb947
parent5574198029611974efac98e383779a0008621208 (diff)
- Use the wakeupservice as main source for frequent regular wakeups.
- Keep a self wakeup of 100ms - Avoid using default arguments to be able to find callsite.
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp21
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h22
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp2
5 files changed, 40 insertions, 16 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 1e3fd5ee158..36c8070f140 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -72,7 +72,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
@@ -82,7 +82,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
} else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();}));
}
@@ -98,7 +98,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index fa4771bee1d..e32cd6f5f4e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -20,7 +20,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
_shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki,
cfg.shared_task_limit(), proton_shared_executor)),
_field_writer(),
- _invokeService(5ms),
+ _invokeService(cfg.field_writer_config().reactionTime()),
_invokeRegistrations()
{
const auto& fw_cfg = cfg.field_writer_config();
@@ -29,8 +29,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
fw_cfg.indexingThreads() * 3,
fw_cfg.defaultTaskLimit(),
fw_cfg.optimize(),
- fw_cfg.kindOfwatermark(),
- fw_cfg.reactionTime());
+ fw_cfg.kindOfwatermark());
if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) {
_invokeRegistrations.push_back(_invokeService.registerInvoke([executor = _field_writer.get()]() {
executor->wakeup();
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 7c4711b6802..db27c13463f 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -40,8 +40,23 @@ find(uint16_t key, const uint16_t values[], size_t numValues) {
}
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
- OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
+SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads) {
+ return create(func, threads, 1000);
+}
+
+std::unique_ptr<ISequencedTaskExecutor>
+SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit) {
+ return create(func, threads, taskLimit, OptimizeFor::LATENCY);
+}
+
+std::unique_ptr<ISequencedTaskExecutor>
+SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) {
+ return create(func, threads, taskLimit, optimize, 0);
+}
+
+std::unique_ptr<ISequencedTaskExecutor>
+SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
+ OptimizeFor optimize, uint32_t kindOfWatermark)
{
if (optimize == OptimizeFor::ADAPTIVE) {
size_t num_strands = std::min(taskLimit, threads*32);
@@ -52,7 +67,7 @@ SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t thre
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 2 : kindOfWatermark;
- executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms));
} else {
executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index db0723d16c8..f6d86ec51fb 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -30,13 +30,23 @@ public:
ExecutorStats getStats() override;
void wakeup() override;
- /*
- * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside.
- *
- */
+#if 0
+ static std::unique_ptr<ISequencedTaskExecutor>
+ create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit = 1000,
+ OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0) {
+ return create(func, threads, taskLimit, optimize, kindOfWatermark, 100ms);
+ }
+
+#endif
+ static std::unique_ptr<ISequencedTaskExecutor>
+ create(Runnable::init_fun_t func, uint32_t threads);
+ static std::unique_ptr<ISequencedTaskExecutor>
+ create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit);
+ static std::unique_ptr<ISequencedTaskExecutor>
+ create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize);
static std::unique_ptr<ISequencedTaskExecutor>
- create(vespalib::Runnable::init_fun_t, uint32_t threads, uint32_t taskLimit = 1000,
- OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
+ create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
+ OptimizeFor optimize, uint32_t kindOfWatermark);
/**
* For testing only
*/
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index af95918ccab..b25bc1a6377 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -7,7 +7,7 @@
namespace vespalib {
SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit)
- : SingleExecutor(func, taskLimit, taskLimit/10, 5ms)
+ : SingleExecutor(func, taskLimit, taskLimit/10, 100ms)
{ }
SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime)