aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-04 16:08:42 +0100
committerGitHub <noreply@github.com>2021-11-04 16:08:42 +0100
commit39da265d599c3b302dbce4f6d25103bc4d38418b (patch)
tree1b21d2fab9e081adbd378524dbf6d2259f410764 /searchcore
parent23a6aa13e9150a3858f85bf786a674dfcdcf3d89 (diff)
parent28098c7776ae90a88348c8d0d6daa0ac5e336a11 (diff)
Merge pull request #19867 from vespa-engine/geirst/support-shared-field-writer-executor-for-documentdb
Add support for using a shared field writer executor inside the threa…
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt9
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp79
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp71
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h9
8 files changed, 170 insertions, 29 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index f7d1b763117..36c7e386445 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -90,6 +90,7 @@ vespa_define_module(
src/tests/proton/documentdb/documentbucketmover
src/tests/proton/documentdb/documentdbconfig
src/tests/proton/documentdb/documentdbconfigscout
+ src/tests/proton/documentdb/executor_threading_service
src/tests/proton/documentdb/feedhandler
src/tests/proton/documentdb/feedview
src/tests/proton/documentdb/fileconfigmanager
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt
new file mode 100644
index 00000000000..721f2207213
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_executor_threading_service_test_app TEST
+ SOURCES
+ executor_threading_service_test.cpp
+ DEPENDS
+ searchcore_server
+ GTest::GTest
+)
+vespa_add_test(NAME searchcore_executor_threading_service_test_app COMMAND searchcore_executor_threading_service_test_app)
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
new file mode 100644
index 00000000000..714ffaa16b7
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
@@ -0,0 +1,79 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/searchcore/proton/server/executorthreadingservice.h>
+#include <vespa/searchcore/proton/server/threading_service_config.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+
+using namespace proton;
+using vespalib::ISequencedTaskExecutor;
+using vespalib::SequencedTaskExecutor;
+using SharedFieldWriterExecutor = ThreadingServiceConfig::SharedFieldWriterExecutor;
+
+
+SequencedTaskExecutor*
+to_concrete_type(ISequencedTaskExecutor& exec)
+{
+ return dynamic_cast<SequencedTaskExecutor*>(&exec);
+}
+
+class ExecutorThreadingServiceTest : public ::testing::Test {
+public:
+ vespalib::ThreadStackExecutor shared_executor;
+ std::unique_ptr<ExecutorThreadingService> service;
+ ExecutorThreadingServiceTest()
+ : shared_executor(1, 1000),
+ service()
+ {
+ }
+ void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) {
+ service = std::make_unique<ExecutorThreadingService>(shared_executor,
+ ThreadingServiceConfig::make(indexing_threads, shared_field_writer));
+ }
+ SequencedTaskExecutor* index_inverter() {
+ return to_concrete_type(service->indexFieldInverter());
+ }
+ SequencedTaskExecutor* index_writer() {
+ return to_concrete_type(service->indexFieldWriter());
+ }
+ SequencedTaskExecutor* attribute_writer() {
+ return to_concrete_type(service->attributeFieldWriter());
+ }
+};
+
+void
+assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t exp_task_limit)
+{
+ EXPECT_EQ(exp_executors, exec->getNumExecutors());
+ EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit());
+}
+
+TEST_F(ExecutorThreadingServiceTest, no_shared_field_writer_executor)
+{
+ setup(4, SharedFieldWriterExecutor::NONE);
+ EXPECT_NE(index_inverter(), index_writer());
+ EXPECT_NE(index_writer(), attribute_writer());
+ assert_executor(index_inverter(), 4, 100);
+ assert_executor(index_writer(), 4, 100);
+ assert_executor(attribute_writer(), 4, 100);
+}
+
+TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_field_writers)
+{
+ setup(4, SharedFieldWriterExecutor::INDEX);
+ EXPECT_EQ(index_inverter(), index_writer());
+ EXPECT_NE(index_inverter(), attribute_writer());
+ assert_executor(index_inverter(), 8, 100);
+ assert_executor(attribute_writer(), 4, 100);
+}
+
+TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_field_writers)
+{
+ setup(4, SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE);
+ EXPECT_EQ(index_inverter(), index_writer());
+ EXPECT_EQ(index_inverter(), attribute_writer());
+ assert_executor(index_inverter(), 12, 100);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
+
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index e1bdf13fd36..6b85f4a6829 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -493,6 +493,16 @@ hwinfo.cpu.cores int default = 0 restart
## Deprecated -> Use documentdb.feeding.concurrency
feeding.concurrency double default = 0.2 restart
+## Whether we should use a shared field writer executor in the document db threading service:
+##
+## NONE: Don't use a shared executor.
+## INDEX: Use a shared executor for index field inverter and index field writer.
+## INDEX_AND_ATTRIBUTE: Use a shared executor for index field inverter, index field writer, and attribute field writer.
+## DOCUMENT_DB: Use a shared executor for index field inverter, index field writer, and attribute field writer among all document dbs.
+##
+## TODO: Remove this when a shared executor is the default.
+feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = NONE restart
+
## Adjustment to resource limit when determining if maintenance jobs can run.
##
## Currently used by 'lid_space_compaction' and 'move_buckets' jobs.
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index cb4b396d63d..d35aaf9f909 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -12,6 +12,7 @@ using vespalib::BlockingThreadStackExecutor;
using vespalib::SingleExecutor;
using vespalib::SequencedTaskExecutor;
using OptimizeFor = vespalib::Executor::OptimizeFor;
+using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor;
namespace proton {
@@ -30,9 +31,10 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor
VESPA_THREAD_STACK_TAG(master_executor)
VESPA_THREAD_STACK_TAG(index_executor)
VESPA_THREAD_STACK_TAG(summary_executor)
-VESPA_THREAD_STACK_TAG(field_inverter_executor)
+VESPA_THREAD_STACK_TAG(index_field_inverter_executor)
+VESPA_THREAD_STACK_TAG(index_field_writer_executor)
+VESPA_THREAD_STACK_TAG(attribute_field_writer_executor)
VESPA_THREAD_STACK_TAG(field_writer_executor)
-VESPA_THREAD_STACK_TAG(attribute_executor)
}
@@ -50,11 +52,38 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor & sh
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
_summaryService(*_summaryExecutor),
- _indexFieldInverter(SequencedTaskExecutor::create(field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())),
- _indexFieldWriter(SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())),
- _attributeFieldWriter(SequencedTaskExecutor::create(attribute_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()))
+ _indexFieldInverter(),
+ _indexFieldWriter(),
+ _attributeFieldWriter(),
+ _field_writer(),
+ _index_field_inverter_ptr(),
+ _index_field_writer_ptr(),
+ _attribute_field_writer_ptr()
{
+ if (cfg.shared_field_writer() == SharedFieldWriterExecutor::INDEX) {
+ _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
+ _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
+ cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ _index_field_inverter_ptr = _field_writer.get();
+ _index_field_writer_ptr = _field_writer.get();
+ _attribute_field_writer_ptr = _attributeFieldWriter.get();
+
+ } else if (cfg.shared_field_writer() == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
+ _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
+ cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ _index_field_inverter_ptr = _field_writer.get();
+ _index_field_writer_ptr = _field_writer.get();
+ _attribute_field_writer_ptr = _field_writer.get();
+ } else {
+ // TODO: Add support for shared field writer across all document dbs.
+ _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
+ _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
+ _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
+ cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime());
+ _index_field_inverter_ptr = _indexFieldInverter.get();
+ _index_field_writer_ptr = _indexFieldWriter.get();
+ _attribute_field_writer_ptr = _attributeFieldWriter.get();
+ }
}
ExecutorThreadingService::~ExecutorThreadingService() = default;
@@ -74,11 +103,11 @@ ExecutorThreadingService::syncOnce() {
if (!isMasterThread) {
_masterExecutor.sync();
}
- _attributeFieldWriter->sync_all();
+ _attribute_field_writer_ptr->sync_all();
_indexExecutor->sync();
_summaryExecutor->sync();
- _indexFieldInverter->sync_all();
- _indexFieldWriter->sync_all();
+ _index_field_inverter_ptr->sync_all();
+ _index_field_writer_ptr->sync_all();
if (!isMasterThread) {
_masterExecutor.sync();
}
@@ -89,13 +118,13 @@ ExecutorThreadingService::shutdown()
{
_masterExecutor.shutdown();
_masterExecutor.sync();
- _attributeFieldWriter->sync_all();
+ _attribute_field_writer_ptr->sync_all();
_summaryExecutor->shutdown();
_summaryExecutor->sync();
_indexExecutor->shutdown();
_indexExecutor->sync();
- _indexFieldInverter->sync_all();
- _indexFieldWriter->sync_all();
+ _index_field_inverter_ptr->sync_all();
+ _index_field_writer_ptr->sync_all();
}
void
@@ -103,9 +132,9 @@ ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskL
{
_indexExecutor->setTaskLimit(taskLimit);
_summaryExecutor->setTaskLimit(summaryTaskLimit);
- _indexFieldInverter->setTaskLimit(taskLimit);
- _indexFieldWriter->setTaskLimit(taskLimit);
- _attributeFieldWriter->setTaskLimit(taskLimit);
+ _index_field_inverter_ptr->setTaskLimit(taskLimit);
+ _index_field_writer_ptr->setTaskLimit(taskLimit);
+ _attribute_field_writer_ptr->setTaskLimit(taskLimit);
}
ExecutorThreadingServiceStats
@@ -115,24 +144,24 @@ ExecutorThreadingService::getStats()
_indexExecutor->getStats(),
_summaryExecutor->getStats(),
_sharedExecutor.getStats(),
- _indexFieldInverter->getStats(),
- _indexFieldWriter->getStats(),
- _attributeFieldWriter->getStats());
+ _index_field_inverter_ptr->getStats(),
+ _index_field_writer_ptr->getStats(),
+ _attribute_field_writer_ptr->getStats());
}
vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::indexFieldInverter() {
- return *_indexFieldInverter;
+ return *_index_field_inverter_ptr;
}
vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::indexFieldWriter() {
- return *_indexFieldWriter;
+ return *_index_field_writer_ptr;
}
vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::attributeFieldWriter() {
- return *_attributeFieldWriter;
+ return *_attribute_field_writer_ptr;
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index ed34e518114..51da27586f7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -27,6 +27,10 @@ private:
std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldInverter;
std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldWriter;
std::unique_ptr<vespalib::ISequencedTaskExecutor> _attributeFieldWriter;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer;
+ vespalib::ISequencedTaskExecutor* _index_field_inverter_ptr;
+ vespalib::ISequencedTaskExecutor* _index_field_writer_ptr;
+ vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr;
void syncOnce();
public:
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index e2d3c4a2366..012d91cb49f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -14,12 +14,14 @@ ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
- vespalib::duration reactionTime_)
+ vespalib::duration reactionTime_,
+ SharedFieldWriterExecutor shared_field_writer_)
: _indexingThreads(indexingThreads_),
_defaultTaskLimit(defaultTaskLimit_),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
- _reactionTime(reactionTime_)
+ _reactionTime(reactionTime_),
+ _shared_field_writer(shared_field_writer_)
{
}
@@ -60,12 +62,13 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit,
selectOptimization(cfg.indexing.optimize),
cfg.indexing.kindOfWatermark,
- vespalib::from_s(cfg.indexing.reactiontime));
+ vespalib::from_s(cfg.indexing.reactiontime),
+ cfg.feeding.sharedFieldWriterExecutor);
}
ThreadingServiceConfig
-ThreadingServiceConfig::make(uint32_t indexingThreads) {
- return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms);
+ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) {
+ return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
}
void
@@ -81,7 +84,8 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
_defaultTaskLimit == rhs._defaultTaskLimit &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
- _reactionTime == rhs._reactionTime;
+ _reactionTime == rhs._reactionTime &&
+ _shared_field_writer == rhs._shared_field_writer;
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index d945e9911fc..5869eaf9c2e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <vespa/searchcore/config/config-proton.h>
#include <vespa/searchcore/proton/common/hw_info.h>
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/time.h>
@@ -16,6 +17,7 @@ class ThreadingServiceConfig {
public:
using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType;
using OptimizeFor = vespalib::Executor::OptimizeFor;
+ using SharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor;
private:
uint32_t _indexingThreads;
@@ -23,19 +25,22 @@ private:
OptimizeFor _optimize;
uint32_t _kindOfWatermark;
vespalib::duration _reactionTime; // Maximum reaction time to new tasks
+ SharedFieldWriterExecutor _shared_field_writer;
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize, uint32_t kindOfWatermark, vespalib::duration reactionTime);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize_,
+ uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
- static ThreadingServiceConfig make(uint32_t indexingThreads);
+ static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE);
void update(const ThreadingServiceConfig& cfg);
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
OptimizeFor optimize() const { return _optimize; }
uint32_t kindOfwatermark() const { return _kindOfWatermark; }
vespalib::duration reactionTime() const { return _reactionTime; }
+ SharedFieldWriterExecutor shared_field_writer() const { return _shared_field_writer; }
bool operator==(const ThreadingServiceConfig &rhs) const;
};