diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-18 12:57:09 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-18 12:57:09 +0200 |
commit | 36df8bd3d9fd4ee60aadd04af89199a8bc504e68 (patch) | |
tree | acef0d7d853f971d4cbff1ffb6bab492a82e1d2b | |
parent | 6a8d74ab5bcc462e9e78cdffdc0747f0f2f94f4a (diff) | |
parent | a53696877acab6e08168ffd60b3cbfa19b0658e3 (diff) |
Merge pull request #22636 from vespa-engine/geirst/simplify-setup-of-shared-field-writer
Simplify setup of shared field writer executor
18 files changed, 119 insertions, 157 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java index 8b9f32fa135..2d01f9d903e 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/ContentSearchCluster.java @@ -381,18 +381,11 @@ public class ContentSearchCluster extends AbstractConfigProducer<SearchCluster> hasAnyNonIndexedCluster = true; ddbB.inputdoctypename(type.getFullName().getName()) .configid(findStreamingCluster(docTypeName).get().getDocumentDBConfigId()) - .mode(ProtonConfig.Documentdb.Mode.Enum.STREAMING) - .feeding.concurrency(0.0); + .mode(ProtonConfig.Documentdb.Mode.Enum.STREAMING); } else if (hasIndexingModeIndexed(type)) { getIndexed().fillDocumentDBConfig(type.getFullName().getName(), ddbB); - if (tuning != null && tuning.searchNode != null && tuning.searchNode.feeding != null) { - ddbB.feeding.concurrency(tuning.searchNode.feeding.concurrency); - } else { - ddbB.feeding.concurrency(defaultFeedConcurrency); - } } else { hasAnyNonIndexedCluster = true; - ddbB.feeding.concurrency(0.0); ddbB.mode(ProtonConfig.Documentdb.Mode.Enum.STORE_ONLY); } if (globalDocType) { diff --git a/config-model/src/test/java/com/yahoo/vespa/model/search/test/DocumentDatabaseTestCase.java b/config-model/src/test/java/com/yahoo/vespa/model/search/test/DocumentDatabaseTestCase.java index 059c1ceb208..d541a6422e7 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/search/test/DocumentDatabaseTestCase.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/search/test/DocumentDatabaseTestCase.java @@ -42,21 +42,21 @@ public class DocumentDatabaseTestCase { @Test public void requireThatConcurrencyIsReflectedCorrectlyForDefault() { - verifyConcurrency("index", "", 0.50, 0.50); - verifyConcurrency("streaming", "", 1.0, 0.0); - verifyConcurrency("store-only", "", 1.0, 0.0); + verifyConcurrency("index", "", 0.50); + verifyConcurrency("streaming", "", 1.0); + verifyConcurrency("store-only", "", 1.0); } @Test public void requireThatFeatureFlagConcurrencyIsReflectedCorrectlyForDefault() { - verifyConcurrency("index", "", 0.30, 0.30, 0.3); - verifyConcurrency("streaming", "", 0.6, 0.0, 0.3); - verifyConcurrency("store-only", "", 0.8, 0.0, 0.4); + verifyConcurrency("index", "", 0.30, 0.3); + verifyConcurrency("streaming", "", 0.6, 0.3); + verifyConcurrency("store-only", "", 0.8, 0.4); } @Test public void requireThatMixedModeConcurrencyIsReflectedCorrectlyForDefault() { - verifyConcurrency(Arrays.asList(DocType.create("a", "index"), DocType.create("b", "streaming")), "", 1.0, Arrays.asList(0.50, 0.0)); + verifyConcurrency(Arrays.asList(DocType.create("a", "index"), DocType.create("b", "streaming")), "", 1.0); } @Test @@ -64,7 +64,7 @@ public class DocumentDatabaseTestCase { String feedTuning = "<feeding>" + " <concurrency>0.7</concurrency>" + "</feeding>\n"; - verifyConcurrency(Arrays.asList(DocType.create("a", "index"), DocType.create("b", "streaming")), feedTuning, 0.7, Arrays.asList(0.7, 0.0)); + verifyConcurrency(Arrays.asList(DocType.create("a", "index"), DocType.create("b", "streaming")), feedTuning, 0.7); } @Test @@ -72,25 +72,24 @@ public class DocumentDatabaseTestCase { String feedTuning = "<feeding>" + " <concurrency>0.7</concurrency>" + "</feeding>\n"; - verifyConcurrency("index", feedTuning, 0.7, 0.7); - verifyConcurrency("streaming", feedTuning, 0.7, 0.0); - verifyConcurrency("store-only", feedTuning, 0.7, 0.0); + verifyConcurrency("index", feedTuning, 0.7); + verifyConcurrency("streaming", feedTuning, 0.7); + verifyConcurrency("store-only", feedTuning, 0.7); } - private void verifyConcurrency(String mode, String xmlTuning, double global, double local, double featureFlagConcurrency) { - verifyConcurrency(Arrays.asList(DocType.create("a", mode)), xmlTuning, global, Arrays.asList(local), featureFlagConcurrency); + private void verifyConcurrency(String mode, String xmlTuning, double expectedConcurrency, double featureFlagConcurrency) { + verifyConcurrency(Arrays.asList(DocType.create("a", mode)), xmlTuning, expectedConcurrency, featureFlagConcurrency); } - private void verifyConcurrency(String mode, String xmlTuning, double global, double local) { - verifyConcurrency(Arrays.asList(DocType.create("a", mode)), xmlTuning, global, Arrays.asList(local), null); + private void verifyConcurrency(String mode, String xmlTuning, double expectedConcurrency) { + verifyConcurrency(Arrays.asList(DocType.create("a", mode)), xmlTuning, expectedConcurrency, null); } - private void verifyConcurrency(List<DocType> nameAndModes, String xmlTuning, double global, List<Double> local) { - verifyConcurrency(nameAndModes, xmlTuning, global, local, null); + private void verifyConcurrency(List<DocType> nameAndModes, String xmlTuning, double expectedConcurrency) { + verifyConcurrency(nameAndModes, xmlTuning, expectedConcurrency, null); } - private void verifyConcurrency(List<DocType> nameAndModes, String xmlTuning, double global, List<Double> local, Double featureFlagConcurrency) { - assertEquals(nameAndModes.size(), local.size()); + private void verifyConcurrency(List<DocType> nameAndModes, String xmlTuning, double expectedConcurrency, Double featureFlagConcurrency) { TestProperties properties = new TestProperties(); if (featureFlagConcurrency != null) { properties.setFeedConcurrency(featureFlagConcurrency); @@ -99,11 +98,7 @@ public class DocumentDatabaseTestCase { VespaModel model = tester.createModel(nameAndModes, xmlTuning, new DeployState.Builder().properties(properties)); ContentSearchCluster contentSearchCluster = model.getContentClusters().get("test").getSearch(); ProtonConfig proton = tester.getProtonConfig(contentSearchCluster); - assertEquals(global, proton.feeding().concurrency(), SMALL); - assertEquals(local.size(), proton.documentdb().size()); - for (int i = 0; i < local.size(); i++) { - assertEquals(local.get(i), proton.documentdb(i).feeding().concurrency(), SMALL); - } + assertEquals(expectedConcurrency, proton.feeding().concurrency(), SMALL); } @Test diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 6b23f596835..ec48d8ebc65 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -149,7 +149,7 @@ public: schema, std::make_shared<DocumentDBMaintenanceConfig>(), search::LogDocumentStore::Config(), - std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)), + std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make()), std::make_shared<const AllocConfig>(), "client", docTypeName.getName()); diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index d4f4e24ba6c..bc02f460b4e 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -26,16 +26,13 @@ public: ExecutorThreadingServiceTest() : _transport(1), field_writer_executor(SequencedTaskExecutor::create(my_field_writer_executor, 3, 200)), - service() - { - } - void setup(uint32_t indexing_threads) { - service = std::make_unique<ExecutorThreadingService>(_transport.shared(), + service(std::make_unique<ExecutorThreadingService>(_transport.shared(), _transport.transport(), _transport.clock(), *field_writer_executor, nullptr, - ThreadingServiceConfig::make(indexing_threads)); + ThreadingServiceConfig::make())) + { } SequencedTaskExecutor* index_inverter() { return to_concrete_type(service->indexFieldInverter()); @@ -60,7 +57,6 @@ assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t ex TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside) { - setup(4); EXPECT_EQ(field_writer(), index_inverter()); EXPECT_EQ(field_writer(), index_writer()); EXPECT_EQ(field_writer(), attribute_writer()); @@ -69,7 +65,6 @@ TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outs TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated) { - setup(4); service->set_task_limits(5, 7, 11); EXPECT_EQ(5, service->master_task_limit()); EXPECT_EQ(7, service->index().getTaskLimit()); diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp index fc8bd474813..1cee63ecfcc 100644 --- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -14,54 +14,32 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; struct Fixture { ProtonConfig cfg; - Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500) - : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit)) + Fixture(uint32_t master_task_limit = 2000, int32_t task_limit = 500) + : cfg(makeConfig(master_task_limit, task_limit)) { } - ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) { + ProtonConfig makeConfig(uint32_t master_task_limit, int32_t task_limit) { ProtonConfigBuilder builder; - builder.indexing.threads = baseLineIndexingThreads; builder.indexing.tasklimit = task_limit; builder.feeding.masterTaskLimit = master_task_limit; return builder; } - ThreadingServiceConfig make(uint32_t cpuCores) { - return ThreadingServiceConfig::make(cfg, 0.5, HwInfo::Cpu(cpuCores)); - } - void assertIndexingThreads(uint32_t expIndexingThreads, uint32_t cpuCores) { - EXPECT_EQUAL(expIndexingThreads, make(cpuCores).indexingThreads()); + ThreadingServiceConfig make() { + return ThreadingServiceConfig::make(cfg); } }; -TEST_F("require that indexing threads are set based on cpu cores and feeding concurrency", Fixture) -{ - TEST_DO(f.assertIndexingThreads(2, 1)); - TEST_DO(f.assertIndexingThreads(2, 4)); - TEST_DO(f.assertIndexingThreads(2, 8)); - TEST_DO(f.assertIndexingThreads(2, 12)); - TEST_DO(f.assertIndexingThreads(3, 13)); - TEST_DO(f.assertIndexingThreads(3, 18)); - TEST_DO(f.assertIndexingThreads(4, 19)); - TEST_DO(f.assertIndexingThreads(4, 24)); - TEST_DO(f.assertIndexingThreads(11, 64)); -} - -TEST_F("require that indexing threads is always >= 1", Fixture(0)) -{ - TEST_DO(f.assertIndexingThreads(1, 0)); -} - TEST_F("require that task limits are set", Fixture) { - auto tcfg = f.make(24); + auto tcfg = f.make(); EXPECT_EQUAL(2000u, tcfg.master_task_limit()); EXPECT_EQUAL(500u, tcfg.defaultTaskLimit()); EXPECT_TRUE(tcfg.is_task_limit_hard()); } -TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700)) +TEST_F("require that negative task limit makes it soft", Fixture(3000, -700)) { - auto tcfg = f.make(24); + auto tcfg = f.make(); EXPECT_EQUAL(3000u, tcfg.master_task_limit()); EXPECT_EQUAL(700u, tcfg.defaultTaskLimit()); EXPECT_FALSE(tcfg.is_task_limit_hard()); @@ -69,23 +47,21 @@ TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700)) namespace { -void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_master_task_limit, - uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) { - EXPECT_EQUAL(exp_indexing_threads, config.indexingThreads()); +void assertConfig(uint32_t exp_master_task_limit, uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) { EXPECT_EQUAL(exp_master_task_limit, config.master_task_limit()); EXPECT_EQUAL(exp_default_task_limit, config.defaultTaskLimit()); } } -TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 3000, 1000)) +TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(3000, 1000)) { - auto cfg1 = f1.make(1); - assertConfig(2u, 2000, 500u, cfg1); - const auto cfg2 = f2.make(13); - assertConfig(3u, 3000u, 1000u, cfg2); + auto cfg1 = f1.make(); + assertConfig(2000, 500u, cfg1); + const auto cfg2 = f2.make(); + assertConfig(3000u, 1000u, cfg2); cfg1.update(cfg2); - assertConfig(2u, 3000u, 1000u, cfg1); // Indexing threads not changed + assertConfig(3000u, 1000u, cfg1); } TEST_MAIN() diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp index 85f8e8171a8..8a12219de3c 100644 --- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp +++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp @@ -106,7 +106,7 @@ struct DBConfigFixture { buildSchema(), std::make_shared<DocumentDBMaintenanceConfig>(), search::LogDocumentStore::Config(), - std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1)), + std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make()), std::make_shared<const AllocConfig>(), configId, docTypeName); diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp index 948c98b1034..2027ad56768 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp +++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp @@ -17,7 +17,7 @@ using vespalib::ISequencedTaskExecutor; using vespalib::SequencedTaskExecutor; ProtonConfig -make_proton_config(double concurrency) +make_proton_config(double concurrency, uint32_t indexing_threads = 1) { ProtonConfigBuilder builder; // This setup requires a minimum of 4 shared threads. @@ -27,6 +27,7 @@ make_proton_config(double concurrency) builder.feeding.concurrency = concurrency; builder.indexing.tasklimit = 255; + builder.indexing.threads = indexing_threads; return builder; } @@ -38,6 +39,13 @@ expect_shared_threads(uint32_t exp_threads, uint32_t cpu_cores) EXPECT_EQ(exp_threads * 16, cfg.shared_task_limit()); } +void +expect_field_writer_threads(uint32_t exp_threads, uint32_t cpu_cores, uint32_t indexing_threads = 1) +{ + auto cfg = SharedThreadingServiceConfig::make(make_proton_config(0.5, indexing_threads), HwInfo::Cpu(cpu_cores)); + EXPECT_EQ(exp_threads, cfg.field_writer_threads()); +} + TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores_and_feeding_concurrency) { expect_shared_threads(4, 1); @@ -47,6 +55,21 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores expect_shared_threads(5, 10); } +TEST(SharedThreadingServiceConfigTest, field_writer_threads_are_derived_from_cpu_cores_and_feeding_concurrency) +{ + expect_field_writer_threads(3, 1); + expect_field_writer_threads(3, 4); + expect_field_writer_threads(3, 6); + expect_field_writer_threads(4, 7); + expect_field_writer_threads(4, 8); + expect_field_writer_threads(5, 9); +} + +TEST(SharedThreadingServiceConfigTest, field_writer_threads_can_be_overridden_in_proton_config) +{ + expect_field_writer_threads(4, 1, 4); +} + class SharedThreadingServiceTest : public ::testing::Test { public: Transport transport; diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 4db46ead525..14c96faef42 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -202,7 +202,7 @@ std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<Docume schema, std::make_shared<proton::DocumentDBMaintenanceConfig>(), search::LogDocumentStore::Config(), - std::make_shared<const proton::ThreadingServiceConfig>(proton::ThreadingServiceConfig::make(1)), + std::make_shared<const proton::ThreadingServiceConfig>(proton::ThreadingServiceConfig::make()), std::make_shared<const proton::AllocConfig>(), "client", doc_type_name.getName()); diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 7cc78178b51..bb664ea1743 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -129,7 +129,8 @@ indexing.write.io enum {NORMAL, OSYNC, DIRECTIO} default=DIRECTIO restart ## Control io options during read both under dump and fusion. indexing.read.io enum {NORMAL, DIRECTIO} default=DIRECTIO restart -## Control number of threads used for indexing +## Overrides the number of threads used for writing fields across all document dbs. +## See feeding.concurrency for details. indexing.threads int default=1 restart ## Option to specify what is most important during indexing. @@ -293,17 +294,6 @@ documentdb[].configid string documentdb[].visibilitydelay double default=0.0 ## Whether this document type is globally distributed or not. documentdb[].global bool default=false -## A number between 0.0 and 1.0 that specifies the concurrency when handling feed operations. -## When set to 1.0 all cores on the cpu is utilized. -## -## 3 thread pools used for various aspect of feeding are configured based on this setting: -## 1) Writing changes to attribute fields -## 2) Inverting index fields -## 3) Writing changes to index fields -## -## The number of threads in each of pools is calculated as: -## max(ceil((hwinfo.cpu.cores * feeding.concurrency)/3), indexing.threads) -documentdb[].feeding.concurrency double default=0.2 ## Minimum initial size for any per document tables. documentdb[].allocation.initialnumdocs long default=1024 @@ -493,17 +483,15 @@ hwinfo.cpu.cores int default = 0 restart ## A number between 0.0 and 1.0 that specifies the concurrency when handling feed operations. ## When set to 1.0 all cores on the cpu is utilized. ## -## 4 thread pools used for various aspect of feeding are configured based on this setting: -## 1) Compressing and compacting documents -## 2) Writing changes to attribute fields -## 3) Inverting index fields -## 4) Writing changes to index fields +## 3 thread pools used for various aspect of feeding are configured based on this setting: +## 1) Basic shared thread pool. E.g. used for compressing and compacting documents. +## 2) Warmup thread pool. Used for disk index warmup. +## 3) Field writer thread pool. Used for writing data to document fields: +## - Inverting index fields +## - Writing changes to index fields +## - Writing changes to attribute fields ## -## The number of threads in pool 1 is calculated as: -## max(ceil(hwinfo.cpu.cores * feeding.concurrency), summary.log.numthreads) -## The number of threads in each of pools 2-4 is calculated as: -## max(ceil((hwinfo.cpu.cores * feeding.concurrency)/3), indexing.threads) -## Deprecated -> Use documentdb.feeding.concurrency +## See shared_threading_service_config.cpp for details on how the thread pool sizes are calculated. feeding.concurrency double default = 0.2 restart ## Maximum number of pending tasks for the master thread in each document db. diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index 8cc64f1b0de..bb6d45ac482 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -248,18 +248,6 @@ find_document_db_config_entry(const ProtonConfig::DocumentdbVector& document_dbs return default_document_db_config_entry; } -std::shared_ptr<const ThreadingServiceConfig> -build_threading_service_config(const ProtonConfig &proton_config, - const HwInfo &hw_info, - const vespalib::string& doc_type_name) -{ - auto& document_db_config_entry = find_document_db_config_entry(proton_config.documentdb, doc_type_name); - return std::make_shared<const ThreadingServiceConfig> - (ThreadingServiceConfig::make(proton_config, - document_db_config_entry.feeding.concurrency, - hw_info.cpu())); -} - std::shared_ptr<const AllocConfig> build_alloc_config(const ProtonConfig& proton_config, const vespalib::string& doc_type_name) { @@ -420,7 +408,7 @@ DocumentDBConfigManager::update(FNET_Transport & transport, const ConfigSnapshot if (newMaintenanceConfig && oldMaintenanceConfig && (*newMaintenanceConfig == *oldMaintenanceConfig)) { newMaintenanceConfig = oldMaintenanceConfig; } - auto new_threading_service_config = build_threading_service_config(_bootstrapConfig->getProtonConfig(), _bootstrapConfig->getHwInfo(), _docTypeName); + auto new_threading_service_config = std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(_bootstrapConfig->getProtonConfig())); if (new_threading_service_config && old_threading_service_config && (*new_threading_service_config == *old_threading_service_config)) { new_threading_service_config = old_threading_service_config; diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index eb1c953dcf5..8c73067056d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -39,9 +39,8 @@ VESPA_THREAD_STACK_TAG(summary_executor) ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport& transport, const vespalib::Clock& clock, - vespalib::ISequencedTaskExecutor& field_writer, - uint32_t num_treads) - : ExecutorThreadingService(sharedExecutor, transport, clock, field_writer, nullptr, ThreadingServiceConfig::make(num_treads)) + vespalib::ISequencedTaskExecutor& field_writer) + : ExecutorThreadingService(sharedExecutor, transport, clock, field_writer, nullptr, ThreadingServiceConfig::make()) {} ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedExecutor, diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 03c3feabd86..7c8056b816c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -42,8 +42,7 @@ public: ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport& transport, const vespalib::Clock& clock, - vespalib::ISequencedTaskExecutor& field_writer, - uint32_t num_treads = 1); + vespalib::ISequencedTaskExecutor& field_writer); ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport, diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index 82d7f6c650d..86db96a20ac 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -34,7 +34,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi { const auto& fw_cfg = cfg.field_writer_config(); _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE), - fw_cfg.indexingThreads() * 3, + cfg.field_writer_threads(), fw_cfg.defaultTaskLimit(), fw_cfg.is_task_limit_hard(), fw_cfg.optimize(), diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp index 76b7982fedd..002ac508b4a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp @@ -11,39 +11,57 @@ using ProtonConfig = SharedThreadingServiceConfig::ProtonConfig; SharedThreadingServiceConfig::SharedThreadingServiceConfig(uint32_t shared_threads_in, uint32_t shared_task_limit_in, uint32_t warmup_threads_in, + uint32_t field_writer_threads_in, const ThreadingServiceConfig& field_writer_config_in) : _shared_threads(shared_threads_in), _shared_task_limit(shared_task_limit_in), _warmup_threads(warmup_threads_in), + _field_writer_threads(field_writer_threads_in), _field_writer_config(field_writer_config_in) { } namespace { -size_t +uint32_t derive_shared_threads(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info) { - size_t scaled_cores = (size_t)std::ceil(cpu_info.cores() * cfg.feeding.concurrency); + uint32_t scaled_cores = (uint32_t)std::ceil(cpu_info.cores() * cfg.feeding.concurrency); // We need at least 1 guaranteed free worker in order to ensure progress. - return std::max(scaled_cores, cfg.documentdb.size() + cfg.flush.maxconcurrent + 1); + return std::max(scaled_cores, (uint32_t)cfg.documentdb.size() + cfg.flush.maxconcurrent + 1); } -size_t +uint32_t derive_warmup_threads(const HwInfo::Cpu& cpu_info) { return std::max(1u, std::min(4u, cpu_info.cores()/8)); } +uint32_t +derive_field_writer_threads(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info) +{ + uint32_t scaled_cores = (size_t)std::ceil(cpu_info.cores() * cfg.feeding.concurrency); + uint32_t field_writer_threads = std::max(scaled_cores, (uint32_t)cfg.indexing.threads); + // Originally we used at least 3 threads for writing fields: + // - index field inverter + // - index field writer + // - attribute field writer + // We keep the same lower bound for similar behavior when using the shared field writer. + return std::max(field_writer_threads, 3u); +} + } SharedThreadingServiceConfig SharedThreadingServiceConfig::make(const proton::SharedThreadingServiceConfig::ProtonConfig& cfg, const proton::HwInfo::Cpu& cpu_info) { - size_t shared_threads = derive_shared_threads(cfg, cpu_info); - return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, derive_warmup_threads(cpu_info), - ThreadingServiceConfig::make(cfg, cfg.feeding.concurrency, cpu_info)); + uint32_t shared_threads = derive_shared_threads(cfg, cpu_info); + uint32_t field_writer_threads = derive_field_writer_threads(cfg, cpu_info); + return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, + derive_warmup_threads(cpu_info), + field_writer_threads, + ThreadingServiceConfig::make(cfg)); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h index 1214bfa77fa..5a2468ca1ab 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h @@ -19,12 +19,14 @@ private: uint32_t _shared_threads; uint32_t _shared_task_limit; uint32_t _warmup_threads; + uint32_t _field_writer_threads; ThreadingServiceConfig _field_writer_config; public: SharedThreadingServiceConfig(uint32_t shared_threads_in, uint32_t shared_task_limit_in, uint32_t warmup_threads_in, + uint32_t field_writer_threads_in, const ThreadingServiceConfig& field_writer_config_in); static SharedThreadingServiceConfig make(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info); @@ -32,6 +34,7 @@ public: uint32_t shared_threads() const { return _shared_threads; } uint32_t shared_task_limit() const { return _shared_task_limit; } uint32_t warmup_threads() const { return _warmup_threads; } + uint32_t field_writer_threads() const { return _field_writer_threads; } const ThreadingServiceConfig& field_writer_config() const { return _field_writer_config; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index c129a78b045..a2ab9e7c925 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -10,14 +10,12 @@ using ProtonConfig = ThreadingServiceConfig::ProtonConfig; using OptimizeFor = vespalib::Executor::OptimizeFor; -ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, - uint32_t master_task_limit_, +ThreadingServiceConfig::ThreadingServiceConfig(uint32_t master_task_limit_, int32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_) - : _indexingThreads(indexingThreads_), - _master_task_limit(master_task_limit_), + : _master_task_limit(master_task_limit_), _defaultTaskLimit(std::abs(defaultTaskLimit_)), _is_task_limit_hard(defaultTaskLimit_ >= 0), _optimize(optimize_), @@ -28,14 +26,6 @@ ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, namespace { -uint32_t -calculateIndexingThreads(const ProtonConfig::Indexing & indexing, double concurrency, const HwInfo::Cpu &cpuInfo) -{ - double scaledCores = cpuInfo.cores() * concurrency; - uint32_t indexingThreads = std::max((int32_t)std::ceil(scaledCores / 3), indexing.threads); - return std::max(indexingThreads, 1u); -} - OptimizeFor selectOptimization(ProtonConfig::Indexing::Optimize optimize) { using CfgOptimize = ProtonConfig::Indexing::Optimize; @@ -50,11 +40,9 @@ selectOptimization(ProtonConfig::Indexing::Optimize optimize) { } ThreadingServiceConfig -ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo) +ThreadingServiceConfig::make(const ProtonConfig& cfg) { - uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo); - return ThreadingServiceConfig(indexingThreads, - cfg.feeding.masterTaskLimit, + return ThreadingServiceConfig(cfg.feeding.masterTaskLimit, cfg.indexing.tasklimit, selectOptimization(cfg.indexing.optimize), cfg.indexing.kindOfWatermark, @@ -62,8 +50,8 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const } ThreadingServiceConfig -ThreadingServiceConfig::make(uint32_t indexingThreads) { - return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms); +ThreadingServiceConfig::make() { + return ThreadingServiceConfig(0, 100, OptimizeFor::LATENCY, 0, 10ms); } void @@ -76,8 +64,7 @@ ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg) bool ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const { - return _indexingThreads == rhs._indexingThreads && - _master_task_limit == rhs._master_task_limit && + return _master_task_limit == rhs._master_task_limit && _defaultTaskLimit == rhs._defaultTaskLimit && _is_task_limit_hard == rhs._is_task_limit_hard && _optimize == rhs._optimize && diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index 8c63340a62d..d13c7fb392f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -19,7 +19,6 @@ public: using OptimizeFor = vespalib::Executor::OptimizeFor; private: - uint32_t _indexingThreads; uint32_t _master_task_limit; uint32_t _defaultTaskLimit; bool _is_task_limit_hard; @@ -28,14 +27,13 @@ private: vespalib::duration _reactionTime; // Maximum reaction time to new tasks private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_, + ThreadingServiceConfig(uint32_t master_task_limit_, int32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_); public: - static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); - static ThreadingServiceConfig make(uint32_t indexingThreads); + static ThreadingServiceConfig make(const ProtonConfig& cfg); + static ThreadingServiceConfig make(); void update(const ThreadingServiceConfig& cfg); - uint32_t indexingThreads() const { return _indexingThreads; } uint32_t master_task_limit() const { return _master_task_limit; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } bool is_task_limit_hard() const { return _is_task_limit_hard; } diff --git a/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp b/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp index fbd3dbd2402..301f2a97d14 100644 --- a/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/documentdb_config_builder.cpp @@ -48,7 +48,7 @@ DocumentDBConfigBuilder::DocumentDBConfigBuilder(int64_t generation, _schema(schema), _maintenance(std::make_shared<DocumentDBMaintenanceConfig>()), _store(), - _threading_service_config(std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make(1))), + _threading_service_config(std::make_shared<const ThreadingServiceConfig>(ThreadingServiceConfig::make())), _alloc_config(std::make_shared<const AllocConfig>()), _configId(configId), _docTypeName(docTypeName) |