summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2022-05-16 14:26:55 +0000
committerGeir Storli <geirst@yahooinc.com>2022-05-18 09:04:38 +0000
commit3aa28e27fb35d63ce340a354bf39b41f9c304bc4 (patch)
tree4da9a0bcd199f326af589abfd2def79ae80253d6 /searchcore
parentb116016d101c88f0649c026af382a1f735e80f50 (diff)
Move tracking of num field writer threads from ThreadingServiceConfig to SharedThreadingServiceConfig.
This is a follow-up for when the shared field writer executor is used across all document dbs.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp11
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp54
-rw-r--r--searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp2
-rw-r--r--searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp25
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp27
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp2
16 files changed, 101 insertions, 123 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index 6b23f596835..ec48d8ebc65 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -149,7 +149,7 @@ public:
schema,
std::make_shared<DocumentDBMaintenanceConfig>(),
search::LogDocumentStore::Config(),
- std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)),
+ std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make()),
std::make_shared<const AllocConfig>(),
"client",
docTypeName.getName());
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
index d4f4e24ba6c..bc02f460b4e 100644
--- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp
@@ -26,16 +26,13 @@ public:
ExecutorThreadingServiceTest()
: _transport(1),
field_writer_executor(SequencedTaskExecutor::create(my_field_writer_executor, 3, 200)),
- service()
- {
- }
- void setup(uint32_t indexing_threads) {
- service = std::make_unique<ExecutorThreadingService>(_transport.shared(),
+ service(std::make_unique<ExecutorThreadingService>(_transport.shared(),
_transport.transport(),
_transport.clock(),
*field_writer_executor,
nullptr,
- ThreadingServiceConfig::make(indexing_threads));
+ ThreadingServiceConfig::make()))
+ {
}
SequencedTaskExecutor* index_inverter() {
return to_concrete_type(service->indexFieldInverter());
@@ -60,7 +57,6 @@ assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t ex
TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside)
{
- setup(4);
EXPECT_EQ(field_writer(), index_inverter());
EXPECT_EQ(field_writer(), index_writer());
EXPECT_EQ(field_writer(), attribute_writer());
@@ -69,7 +65,6 @@ TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outs
TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated)
{
- setup(4);
service->set_task_limits(5, 7, 11);
EXPECT_EQ(5, service->master_task_limit());
EXPECT_EQ(7, service->index().getTaskLimit());
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
index fc8bd474813..1cee63ecfcc 100644
--- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
@@ -14,54 +14,32 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
struct Fixture {
ProtonConfig cfg;
- Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500)
- : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit))
+ Fixture(uint32_t master_task_limit = 2000, int32_t task_limit = 500)
+ : cfg(makeConfig(master_task_limit, task_limit))
{
}
- ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) {
+ ProtonConfig makeConfig(uint32_t master_task_limit, int32_t task_limit) {
ProtonConfigBuilder builder;
- builder.indexing.threads = baseLineIndexingThreads;
builder.indexing.tasklimit = task_limit;
builder.feeding.masterTaskLimit = master_task_limit;
return builder;
}
- ThreadingServiceConfig make(uint32_t cpuCores) {
- return ThreadingServiceConfig::make(cfg, 0.5, HwInfo::Cpu(cpuCores));
- }
- void assertIndexingThreads(uint32_t expIndexingThreads, uint32_t cpuCores) {
- EXPECT_EQUAL(expIndexingThreads, make(cpuCores).indexingThreads());
+ ThreadingServiceConfig make() {
+ return ThreadingServiceConfig::make(cfg);
}
};
-TEST_F("require that indexing threads are set based on cpu cores and feeding concurrency", Fixture)
-{
- TEST_DO(f.assertIndexingThreads(2, 1));
- TEST_DO(f.assertIndexingThreads(2, 4));
- TEST_DO(f.assertIndexingThreads(2, 8));
- TEST_DO(f.assertIndexingThreads(2, 12));
- TEST_DO(f.assertIndexingThreads(3, 13));
- TEST_DO(f.assertIndexingThreads(3, 18));
- TEST_DO(f.assertIndexingThreads(4, 19));
- TEST_DO(f.assertIndexingThreads(4, 24));
- TEST_DO(f.assertIndexingThreads(11, 64));
-}
-
-TEST_F("require that indexing threads is always >= 1", Fixture(0))
-{
- TEST_DO(f.assertIndexingThreads(1, 0));
-}
-
TEST_F("require that task limits are set", Fixture)
{
- auto tcfg = f.make(24);
+ auto tcfg = f.make();
EXPECT_EQUAL(2000u, tcfg.master_task_limit());
EXPECT_EQUAL(500u, tcfg.defaultTaskLimit());
EXPECT_TRUE(tcfg.is_task_limit_hard());
}
-TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700))
+TEST_F("require that negative task limit makes it soft", Fixture(3000, -700))
{
- auto tcfg = f.make(24);
+ auto tcfg = f.make();
EXPECT_EQUAL(3000u, tcfg.master_task_limit());
EXPECT_EQUAL(700u, tcfg.defaultTaskLimit());
EXPECT_FALSE(tcfg.is_task_limit_hard());
@@ -69,23 +47,21 @@ TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700))
namespace {
-void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_master_task_limit,
- uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) {
- EXPECT_EQUAL(exp_indexing_threads, config.indexingThreads());
+void assertConfig(uint32_t exp_master_task_limit, uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) {
EXPECT_EQUAL(exp_master_task_limit, config.master_task_limit());
EXPECT_EQUAL(exp_default_task_limit, config.defaultTaskLimit());
}
}
-TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 3000, 1000))
+TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(3000, 1000))
{
- auto cfg1 = f1.make(1);
- assertConfig(2u, 2000, 500u, cfg1);
- const auto cfg2 = f2.make(13);
- assertConfig(3u, 3000u, 1000u, cfg2);
+ auto cfg1 = f1.make();
+ assertConfig(2000, 500u, cfg1);
+ const auto cfg2 = f2.make();
+ assertConfig(3000u, 1000u, cfg2);
cfg1.update(cfg2);
- assertConfig(2u, 3000u, 1000u, cfg1); // Indexing threads not changed
+ assertConfig(3000u, 1000u, cfg1);
}
TEST_MAIN()
diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
index 85f8e8171a8..8a12219de3c 100644
--- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
+++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
@@ -106,7 +106,7 @@ struct DBConfigFixture {
buildSchema(),
std::make_shared<DocumentDBMaintenanceConfig>(),
search::LogDocumentStore::Config(),
- std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)),
+ std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make()),
std::make_shared<const AllocConfig>(),
configId,
docTypeName);
diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
index 948c98b1034..2027ad56768 100644
--- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
+++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
@@ -17,7 +17,7 @@ using vespalib::ISequencedTaskExecutor;
using vespalib::SequencedTaskExecutor;
ProtonConfig
-make_proton_config(double concurrency)
+make_proton_config(double concurrency, uint32_t indexing_threads = 1)
{
ProtonConfigBuilder builder;
// This setup requires a minimum of 4 shared threads.
@@ -27,6 +27,7 @@ make_proton_config(double concurrency)
builder.feeding.concurrency = concurrency;
builder.indexing.tasklimit = 255;
+ builder.indexing.threads = indexing_threads;
return builder;
}
@@ -38,6 +39,13 @@ expect_shared_threads(uint32_t exp_threads, uint32_t cpu_cores)
EXPECT_EQ(exp_threads * 16, cfg.shared_task_limit());
}
+void
+expect_field_writer_threads(uint32_t exp_threads, uint32_t cpu_cores, uint32_t indexing_threads = 1)
+{
+ auto cfg = SharedThreadingServiceConfig::make(make_proton_config(0.5, indexing_threads), HwInfo::Cpu(cpu_cores));
+ EXPECT_EQ(exp_threads, cfg.field_writer_threads());
+}
+
TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores_and_feeding_concurrency)
{
expect_shared_threads(4, 1);
@@ -47,6 +55,21 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores
expect_shared_threads(5, 10);
}
+TEST(SharedThreadingServiceConfigTest, field_writer_threads_are_derived_from_cpu_cores_and_feeding_concurrency)
+{
+ expect_field_writer_threads(3, 1);
+ expect_field_writer_threads(3, 4);
+ expect_field_writer_threads(3, 6);
+ expect_field_writer_threads(4, 7);
+ expect_field_writer_threads(4, 8);
+ expect_field_writer_threads(5, 9);
+}
+
+TEST(SharedThreadingServiceConfigTest, field_writer_threads_can_be_overridden_in_proton_config)
+{
+ expect_field_writer_threads(4, 1, 4);
+}
+
class SharedThreadingServiceTest : public ::testing::Test {
public:
Transport transport;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 4db46ead525..14c96faef42 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -202,7 +202,7 @@ std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<Docume
schema,
std::make_shared<proton::DocumentDBMaintenanceConfig>(),
search::LogDocumentStore::Config(),
- std::make_shared<const proton::ThreadingServiceConfig>(proton::ThreadingServiceConfig::make(1)),
+ std::make_shared<const proton::ThreadingServiceConfig>(proton::ThreadingServiceConfig::make()),
std::make_shared<const proton::AllocConfig>(),
"client",
doc_type_name.getName());
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 7cc78178b51..4edaa4a6757 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -129,7 +129,8 @@ indexing.write.io enum {NORMAL, OSYNC, DIRECTIO} default=DIRECTIO restart
## Control io options during read both under dump and fusion.
indexing.read.io enum {NORMAL, DIRECTIO} default=DIRECTIO restart
-## Control number of threads used for indexing
+## Overrides the number of threads used for writing fields across all document dbs.
+## See feeding.concurrency for details.
indexing.threads int default=1 restart
## Option to specify what is most important during indexing.
@@ -294,15 +295,8 @@ documentdb[].visibilitydelay double default=0.0
## Whether this document type is globally distributed or not.
documentdb[].global bool default=false
## A number between 0.0 and 1.0 that specifies the concurrency when handling feed operations.
-## When set to 1.0 all cores on the cpu is utilized.
-##
-## 3 thread pools used for various aspect of feeding are configured based on this setting:
-## 1) Writing changes to attribute fields
-## 2) Inverting index fields
-## 3) Writing changes to index fields
-##
-## The number of threads in each of pools is calculated as:
-## max(ceil((hwinfo.cpu.cores * feeding.concurrency)/3), indexing.threads)
+## Deprecated and ignored after the shared field writer is used across all document dbs. See feeding.concurrency for details.
+## TODO: Remove this setting when the config-model is no longer using it.
documentdb[].feeding.concurrency double default=0.2
## Minimum initial size for any per document tables.
@@ -493,17 +487,15 @@ hwinfo.cpu.cores int default = 0 restart
## A number between 0.0 and 1.0 that specifies the concurrency when handling feed operations.
## When set to 1.0 all cores on the cpu is utilized.
##
-## 4 thread pools used for various aspect of feeding are configured based on this setting:
-## 1) Compressing and compacting documents
-## 2) Writing changes to attribute fields
-## 3) Inverting index fields
-## 4) Writing changes to index fields
+## 3 thread pools used for various aspect of feeding are configured based on this setting:
+## 1) Basic shared thread pool. E.g. used for compressing and compacting documents.
+## 2) Warmup thread pool. Used for disk index warmup.
+## 3) Field writer thread pool. Used for writing data to document fields:
+## - Inverting index fields
+## - Writing changes to index fields
+## - Writing changes to attribute fields
##
-## The number of threads in pool 1 is calculated as:
-## max(ceil(hwinfo.cpu.cores * feeding.concurrency), summary.log.numthreads)
-## The number of threads in each of pools 2-4 is calculated as:
-## max(ceil((hwinfo.cpu.cores * feeding.concurrency)/3), indexing.threads)
-## Deprecated -> Use documentdb.feeding.concurrency
+## See shared_threading_service_config.cpp for details on how the thread pool sizes are calculated.
feeding.concurrency double default = 0.2 restart
## Maximum number of pending tasks for the master thread in each document db.
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
index 8cc64f1b0de..bb6d45ac482 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
@@ -248,18 +248,6 @@ find_document_db_config_entry(const ProtonConfig::DocumentdbVector& document_dbs
return default_document_db_config_entry;
}
-std::shared_ptr<const ThreadingServiceConfig>
-build_threading_service_config(const ProtonConfig &proton_config,
- const HwInfo &hw_info,
- const vespalib::string& doc_type_name)
-{
- auto& document_db_config_entry = find_document_db_config_entry(proton_config.documentdb, doc_type_name);
- return std::make_shared<const ThreadingServiceConfig>
- (ThreadingServiceConfig::make(proton_config,
- document_db_config_entry.feeding.concurrency,
- hw_info.cpu()));
-}
-
std::shared_ptr<const AllocConfig>
build_alloc_config(const ProtonConfig& proton_config, const vespalib::string& doc_type_name)
{
@@ -420,7 +408,7 @@ DocumentDBConfigManager::update(FNET_Transport & transport, const ConfigSnapshot
if (newMaintenanceConfig && oldMaintenanceConfig && (*newMaintenanceConfig == *oldMaintenanceConfig)) {
newMaintenanceConfig = oldMaintenanceConfig;
}
- auto new_threading_service_config = build_threading_service_config(_bootstrapConfig->getProtonConfig(), _bootstrapConfig->getHwInfo(), _docTypeName);
+ auto new_threading_service_config = std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(_bootstrapConfig->getProtonConfig()));
if (new_threading_service_config && old_threading_service_config &&
(*new_threading_service_config == *old_threading_service_config)) {
new_threading_service_config = old_threading_service_config;
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index eb1c953dcf5..8c73067056d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -39,9 +39,8 @@ VESPA_THREAD_STACK_TAG(summary_executor)
ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExecutor,
FNET_Transport& transport,
const vespalib::Clock& clock,
- vespalib::ISequencedTaskExecutor& field_writer,
- uint32_t num_treads)
- : ExecutorThreadingService(sharedExecutor, transport, clock, field_writer, nullptr, ThreadingServiceConfig::make(num_treads))
+ vespalib::ISequencedTaskExecutor& field_writer)
+ : ExecutorThreadingService(sharedExecutor, transport, clock, field_writer, nullptr, ThreadingServiceConfig::make())
{}
ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedExecutor,
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index 03c3feabd86..7c8056b816c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -42,8 +42,7 @@ public:
ExecutorThreadingService(vespalib::Executor& sharedExecutor,
FNET_Transport& transport,
const vespalib::Clock& clock,
- vespalib::ISequencedTaskExecutor& field_writer,
- uint32_t num_treads = 1);
+ vespalib::ISequencedTaskExecutor& field_writer);
ExecutorThreadingService(vespalib::Executor& sharedExecutor,
FNET_Transport & transport,
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index 82d7f6c650d..86db96a20ac 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -34,7 +34,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
{
const auto& fw_cfg = cfg.field_writer_config();
_field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE),
- fw_cfg.indexingThreads() * 3,
+ cfg.field_writer_threads(),
fw_cfg.defaultTaskLimit(),
fw_cfg.is_task_limit_hard(),
fw_cfg.optimize(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
index 76b7982fedd..002ac508b4a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp
@@ -11,39 +11,57 @@ using ProtonConfig = SharedThreadingServiceConfig::ProtonConfig;
SharedThreadingServiceConfig::SharedThreadingServiceConfig(uint32_t shared_threads_in,
uint32_t shared_task_limit_in,
uint32_t warmup_threads_in,
+ uint32_t field_writer_threads_in,
const ThreadingServiceConfig& field_writer_config_in)
: _shared_threads(shared_threads_in),
_shared_task_limit(shared_task_limit_in),
_warmup_threads(warmup_threads_in),
+ _field_writer_threads(field_writer_threads_in),
_field_writer_config(field_writer_config_in)
{
}
namespace {
-size_t
+uint32_t
derive_shared_threads(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info)
{
- size_t scaled_cores = (size_t)std::ceil(cpu_info.cores() * cfg.feeding.concurrency);
+ uint32_t scaled_cores = (uint32_t)std::ceil(cpu_info.cores() * cfg.feeding.concurrency);
// We need at least 1 guaranteed free worker in order to ensure progress.
- return std::max(scaled_cores, cfg.documentdb.size() + cfg.flush.maxconcurrent + 1);
+ return std::max(scaled_cores, (uint32_t)cfg.documentdb.size() + cfg.flush.maxconcurrent + 1);
}
-size_t
+uint32_t
derive_warmup_threads(const HwInfo::Cpu& cpu_info) {
return std::max(1u, std::min(4u, cpu_info.cores()/8));
}
+uint32_t
+derive_field_writer_threads(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info)
+{
+ uint32_t scaled_cores = (size_t)std::ceil(cpu_info.cores() * cfg.feeding.concurrency);
+ uint32_t field_writer_threads = std::max(scaled_cores, (uint32_t)cfg.indexing.threads);
+ // Originally we used at least 3 threads for writing fields:
+ // - index field inverter
+ // - index field writer
+ // - attribute field writer
+ // We keep the same lower bound for similar behavior when using the shared field writer.
+ return std::max(field_writer_threads, 3u);
+}
+
}
SharedThreadingServiceConfig
SharedThreadingServiceConfig::make(const proton::SharedThreadingServiceConfig::ProtonConfig& cfg,
const proton::HwInfo::Cpu& cpu_info)
{
- size_t shared_threads = derive_shared_threads(cfg, cpu_info);
- return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, derive_warmup_threads(cpu_info),
- ThreadingServiceConfig::make(cfg, cfg.feeding.concurrency, cpu_info));
+ uint32_t shared_threads = derive_shared_threads(cfg, cpu_info);
+ uint32_t field_writer_threads = derive_field_writer_threads(cfg, cpu_info);
+ return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16,
+ derive_warmup_threads(cpu_info),
+ field_writer_threads,
+ ThreadingServiceConfig::make(cfg));
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h
index 1214bfa77fa..5a2468ca1ab 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h
@@ -19,12 +19,14 @@ private:
uint32_t _shared_threads;
uint32_t _shared_task_limit;
uint32_t _warmup_threads;
+ uint32_t _field_writer_threads;
ThreadingServiceConfig _field_writer_config;
public:
SharedThreadingServiceConfig(uint32_t shared_threads_in,
uint32_t shared_task_limit_in,
uint32_t warmup_threads_in,
+ uint32_t field_writer_threads_in,
const ThreadingServiceConfig& field_writer_config_in);
static SharedThreadingServiceConfig make(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info);
@@ -32,6 +34,7 @@ public:
uint32_t shared_threads() const { return _shared_threads; }
uint32_t shared_task_limit() const { return _shared_task_limit; }
uint32_t warmup_threads() const { return _warmup_threads; }
+ uint32_t field_writer_threads() const { return _field_writer_threads; }
const ThreadingServiceConfig& field_writer_config() const { return _field_writer_config; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index c129a78b045..a2ab9e7c925 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -10,14 +10,12 @@ using ProtonConfig = ThreadingServiceConfig::ProtonConfig;
using OptimizeFor = vespalib::Executor::OptimizeFor;
-ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
- uint32_t master_task_limit_,
+ThreadingServiceConfig::ThreadingServiceConfig(uint32_t master_task_limit_,
int32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
vespalib::duration reactionTime_)
- : _indexingThreads(indexingThreads_),
- _master_task_limit(master_task_limit_),
+ : _master_task_limit(master_task_limit_),
_defaultTaskLimit(std::abs(defaultTaskLimit_)),
_is_task_limit_hard(defaultTaskLimit_ >= 0),
_optimize(optimize_),
@@ -28,14 +26,6 @@ ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
namespace {
-uint32_t
-calculateIndexingThreads(const ProtonConfig::Indexing & indexing, double concurrency, const HwInfo::Cpu &cpuInfo)
-{
- double scaledCores = cpuInfo.cores() * concurrency;
- uint32_t indexingThreads = std::max((int32_t)std::ceil(scaledCores / 3), indexing.threads);
- return std::max(indexingThreads, 1u);
-}
-
OptimizeFor
selectOptimization(ProtonConfig::Indexing::Optimize optimize) {
using CfgOptimize = ProtonConfig::Indexing::Optimize;
@@ -50,11 +40,9 @@ selectOptimization(ProtonConfig::Indexing::Optimize optimize) {
}
ThreadingServiceConfig
-ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo)
+ThreadingServiceConfig::make(const ProtonConfig& cfg)
{
- uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo);
- return ThreadingServiceConfig(indexingThreads,
- cfg.feeding.masterTaskLimit,
+ return ThreadingServiceConfig(cfg.feeding.masterTaskLimit,
cfg.indexing.tasklimit,
selectOptimization(cfg.indexing.optimize),
cfg.indexing.kindOfWatermark,
@@ -62,8 +50,8 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
}
ThreadingServiceConfig
-ThreadingServiceConfig::make(uint32_t indexingThreads) {
- return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms);
+ThreadingServiceConfig::make() {
+ return ThreadingServiceConfig(0, 100, OptimizeFor::LATENCY, 0, 10ms);
}
void
@@ -76,8 +64,7 @@ ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg)
bool
ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
{
- return _indexingThreads == rhs._indexingThreads &&
- _master_task_limit == rhs._master_task_limit &&
+ return _master_task_limit == rhs._master_task_limit &&
_defaultTaskLimit == rhs._defaultTaskLimit &&
_is_task_limit_hard == rhs._is_task_limit_hard &&
_optimize == rhs._optimize &&
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index 8c63340a62d..d13c7fb392f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -19,7 +19,6 @@ public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
private:
- uint32_t _indexingThreads;
uint32_t _master_task_limit;
uint32_t _defaultTaskLimit;
bool _is_task_limit_hard;
@@ -28,14 +27,13 @@ private:
vespalib::duration _reactionTime; // Maximum reaction time to new tasks
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_,
+ ThreadingServiceConfig(uint32_t master_task_limit_, int32_t defaultTaskLimit_,
OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_);
public:
- static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
- static ThreadingServiceConfig make(uint32_t indexingThreads);
+ static ThreadingServiceConfig make(const ProtonConfig& cfg);
+ static ThreadingServiceConfig make();
void update(const ThreadingServiceConfig& cfg);
- uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t master_task_limit() const { return _master_task_limit; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
bool is_task_limit_hard() const { return _is_task_limit_hard; }
diff --git a/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp b/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp
index fbd3dbd2402..301f2a97d14 100644
--- a/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp
+++ b/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp
@@ -48,7 +48,7 @@ DocumentDBConfigBuilder::DocumentDBConfigBuilder(int64_t generation,
_schema(schema),
_maintenance(std::make_shared<DocumentDBMaintenanceConfig>()),
_store(),
- _threading_service_config(std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1))),
+ _threading_service_config(std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make())),
_alloc_config(std::make_shared<const AllocConfig>()),
_configId(configId),
_docTypeName(docTypeName)