diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-11 19:28:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-11 19:28:20 +0200 |
commit | e3dea9daba5b89fab618921a94f9394f8ae9144c (patch) | |
tree | 4afba0fa93e2ca5894dff7deede43fb50428630d /searchcore | |
parent | 8ba8fa4e896d8c9775a868e524d9ef37072aeaa6 (diff) | |
parent | 0d30571f227704ae5ffee683f0402c7852f85fb4 (diff) |
Merge pull request #3716 from vespa-engine/geirst/add-feeding-concurrency-setting-to-proton
Geirst/add feeding concurrency setting to proton
Diffstat (limited to 'searchcore')
10 files changed, 189 insertions, 33 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index 0dce35f7eb3..2944b753dca 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -97,6 +97,7 @@ vespa_define_module( src/tests/proton/documentdb/maintenancecontroller src/tests/proton/documentdb/move_operation_limiter src/tests/proton/documentdb/storeonlyfeedview + src/tests/proton/documentdb/threading_service_config src/tests/proton/documentmetastore src/tests/proton/documentmetastore/lidreusedelayer src/tests/proton/feed_and_search diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/threading_service_config/CMakeLists.txt new file mode 100644 index 00000000000..214b5c9b86d --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcore_threading_service_config_test_app TEST + SOURCES + threading_service_config_test.cpp + DEPENDS + searchcore_server +) +vespa_add_test(NAME searchcore_threading_service_config_test_app COMMAND searchcore_threading_service_config_test_app) diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp new file mode 100644 index 00000000000..658ebe818eb --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -0,0 +1,66 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/log/log.h> +LOG_SETUP("threading_service_config_test"); + +#include <vespa/searchcore/config/config-proton.h> +#include <vespa/searchcore/proton/common/hw_info.h> +#include <vespa/searchcore/proton/server/threading_service_config.h> +#include <vespa/vespalib/testkit/testapp.h> + +using namespace proton; +using ProtonConfig = vespa::config::search::core::ProtonConfig; +using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; + +struct Fixture { + ProtonConfig cfg; + Fixture(uint32_t baseLineIndexingThreads = 2) + : cfg(makeConfig(baseLineIndexingThreads)) + { + } + ProtonConfig makeConfig(uint32_t baseLineIndexingThreads) { + ProtonConfigBuilder builder; + builder.indexing.threads = baseLineIndexingThreads; + builder.indexing.tasklimit = 500; + builder.indexing.semiunboundtasklimit = 50000; + builder.feeding.concurrency = 0.5; + return builder; + } + ThreadingServiceConfig make(uint32_t cpuCores) { + return ThreadingServiceConfig::make(cfg, HwInfo::Cpu(cpuCores)); + } + void assertIndexingThreads(uint32_t expIndexingThreads, uint32_t cpuCores) { + EXPECT_EQUAL(expIndexingThreads, make(cpuCores).indexingThreads()); + } +}; + +TEST_F("require that indexing threads are set based on cpu cores and feeding concurrency", Fixture) +{ + TEST_DO(f.assertIndexingThreads(2, 1)); + TEST_DO(f.assertIndexingThreads(2, 4)); + TEST_DO(f.assertIndexingThreads(2, 8)); + TEST_DO(f.assertIndexingThreads(2, 12)); + TEST_DO(f.assertIndexingThreads(3, 13)); + TEST_DO(f.assertIndexingThreads(3, 18)); + TEST_DO(f.assertIndexingThreads(4, 19)); + TEST_DO(f.assertIndexingThreads(4, 24)); +} + +TEST_F("require that indexing threads is always >= 1", Fixture(0)) +{ + TEST_DO(f.assertIndexingThreads(1, 0)); +} + +TEST_F("require that default task limit is set", Fixture) +{ + EXPECT_EQUAL(500u, f.make(24).defaultTaskLimit()); +} + +TEST_F("require that semiunbound task limit is scaled based on indexing threads", Fixture) +{ + EXPECT_EQUAL(12500u, f.make(24).semiUnboundTaskLimit()); +} + +TEST_MAIN() +{ + TEST_RUN_ALL(); +} diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 520dab6c988..730c396c4f6 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -433,6 +433,21 @@ hwinfo.memory.size long default = 0 ## If set to 0, this is sampled by using std::thread::hardware_concurrency(). hwinfo.cpu.cores int default = 0 +## A number between 0.0 and 1.0 that specifies the concurrency when handling feed operations. +## When set to 1.0 all cores on the cpu is utilized. +## +## 4 thread pools used for various aspect of feeding are configured based on this setting: +## 1) Compressing and compacting documents +## 2) Writing changes to attribute fields +## 3) Inverting index fields +## 4) Writing changes to index fields +## +## The number of threads in pool 1 is calculated as: +## max(ceil(hwinfo.cpu.cores * feeding.concurrency), summary.log.numthreads) +## The number of threads in each of pools 2-4 is calculated as: +## max(ceil((hwinfo.cpu.cores * feeding.concurrency)/3), indexing.threads) +feeding.concurrency double default = 1.0 restart + ## Adjustment to resource limit when determining if maintenance jobs can run. ## ## Currently used by 'lid_space_compaction' and 'move_buckets' jobs. diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 1716441c30b..5bb512c12ce 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -93,6 +93,7 @@ vespa_add_library(searchcore_server STATIC storeonlydocsubdb.cpp storeonlyfeedview.cpp summaryadapter.cpp + threading_service_config.cpp tlcproxy.cpp tlssyncer.cpp transactionlogmanager.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index ada2c1b1069..cffa014534e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -61,16 +61,7 @@ using searchcorespi::IFlushTarget; namespace proton { namespace { - constexpr uint32_t indexing_thread_stack_size = 128 * 1024; - -uint32_t semiUnboundTaskLimit(uint32_t semiUnboundExecutorTaskLimit, - uint32_t indexingThreads) -{ - uint32_t taskLimit = semiUnboundExecutorTaskLimit / indexingThreads; - return taskLimit; -} - } template <typename FunctionType> @@ -106,12 +97,10 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _bucketSpace(bucketSpace), _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. - _defaultExecutorTaskLimit(protonCfg.indexing.tasklimit), - _semiUnboundExecutorTaskLimit(protonCfg.indexing.semiunboundtasklimit), - _indexingThreads(protonCfg.indexing.threads), - _writeService(std::max(1, protonCfg.indexing.threads), + _writeServiceConfig(ThreadingServiceConfig::make(protonCfg, hwInfo.cpu())), + _writeService(_writeServiceConfig.indexingThreads(), indexing_thread_stack_size, - _defaultExecutorTaskLimit), + _writeServiceConfig.defaultTaskLimit()), _initializeThreads(initializeThreads), _initConfigSnapshot(), _initConfigSerialNum(0u), @@ -181,7 +170,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, fastos::TimeStamp visibilityDelay = loaded_config->getMaintenanceConfigSP()->getVisibilityDelay(); _visibility.setVisibilityDelay(visibilityDelay); if (_visibility.getVisibilityDelay() > 0) { - _writeService.setTaskLimit(semiUnboundTaskLimit(_semiUnboundExecutorTaskLimit, _indexingThreads)); + _writeService.setTaskLimit(_writeServiceConfig.semiUnboundTaskLimit()); } } @@ -425,9 +414,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _visibility.setVisibilityDelay(visibilityDelay); } if (_visibility.getVisibilityDelay() > 0) { - _writeService.setTaskLimit(semiUnboundTaskLimit(_semiUnboundExecutorTaskLimit, _indexingThreads)); + _writeService.setTaskLimit(_writeServiceConfig.semiUnboundTaskLimit()); } else { - _writeService.setTaskLimit(_defaultExecutorTaskLimit); + _writeService.setTaskLimit(_writeServiceConfig.defaultTaskLimit()); } if (params.shouldSubDbsChange() || hasVisibilityDelayChanged) { applySubDBConfig(*configSnapshot, serialNum, params); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index cd0335b7b75..e23cd78b3ad 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -5,20 +5,23 @@ #include "clusterstatehandler.h" #include "configstore.h" #include "ddbstate.h" +#include "disk_mem_usage_forwarder.h" #include "documentdbconfig.h" #include "documentsubdbcollection.h" +#include "executorthreadingservice.h" #include "feedhandler.h" +#include "i_document_db_config_owner.h" +#include "i_document_subdb_owner.h" #include "i_feed_handler_owner.h" #include "i_lid_space_compaction_handler.h" #include "ifeedview.h" #include "ireplayconfig.h" #include "maintenancecontroller.h" -#include "i_document_db_config_owner.h" -#include "executorthreadingservice.h" +#include "threading_service_config.h" #include "visibilityhandler.h" -#include "i_document_subdb_owner.h" -#include "disk_mem_usage_forwarder.h" +#include <vespa/metrics/updatehook.h> +#include <vespa/searchcore/proton/attribute/attribute_usage_filter.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/metrics/documentdb_job_trackers.h> @@ -28,8 +31,6 @@ #include <vespa/searchlib/docstore/cachestats.h> #include <vespa/searchlib/transactionlog/syncproxy.h> #include <vespa/vespalib/util/varholder.h> -#include <vespa/searchcore/proton/attribute/attribute_usage_filter.h> -#include <vespa/metrics/updatehook.h> #include <mutex> #include <condition_variable> @@ -80,9 +81,7 @@ private: DocTypeName _docTypeName; document::BucketSpace _bucketSpace; vespalib::string _baseDir; - uint32_t _defaultExecutorTaskLimit; - uint32_t _semiUnboundExecutorTaskLimit; - uint32_t _indexingThreads; + ThreadingServiceConfig _writeServiceConfig; // Only one thread per executor, or dropFeedView() will fail. ExecutorThreadingService _writeService; // threads for initializer tasks during proton startup diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 556743ad267..36f19c385f7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -85,12 +85,10 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton, const HwInfo &hwInfo) } size_t -deriveBackgroundThreads(const ProtonConfig & proton) { - size_t threads = std::thread::hardware_concurrency(); - if (proton.background.threads != 0) { - threads = proton.background.threads; - } - threads = std::max(threads, size_t(proton.summary.log.numthreads)); +deriveCompactionCompressionThreads(const ProtonConfig &proton, + const HwInfo::Cpu &cpuInfo) { + size_t scaledCores = (size_t)std::ceil(cpuInfo.cores() * proton.feeding.concurrency); + size_t threads = std::max(scaledCores, size_t(proton.summary.log.numthreads)); // We need at least 1 guaranteed free worker in order to ensure progress so #documentsdbs + 1 should suffice, // but we will not be cheap and give #documentsdbs * 2 @@ -302,7 +300,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) vespalib::string fileConfigId; _warmupExecutor.reset(new vespalib::ThreadStackExecutor(4, 128*1024)); - const size_t summaryThreads = deriveBackgroundThreads(protonConfig); + const size_t summaryThreads = deriveCompactionCompressionThreads(protonConfig, _hwInfo.cpu()); _summaryExecutor.reset(new vespalib::BlockingThreadStackExecutor(summaryThreads, 128*1024, summaryThreads*16)); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp new file mode 100644 index 00000000000..6422df9cbd2 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -0,0 +1,43 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "threading_service_config.h" +#include <vespa/searchcore/proton/common/hw_info.h> +#include <cmath> + +namespace proton { + +using ProtonConfig = ThreadingServiceConfig::ProtonConfig; + +ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, + uint32_t defaultTaskLimit_, + uint32_t semiUnboundTaskLimit_) + : _indexingThreads(indexingThreads_), + _defaultTaskLimit(defaultTaskLimit_), + _semiUnboundTaskLimit(semiUnboundTaskLimit_) +{ +} + +namespace { + +uint32_t +calculateIndexingThreads(const ProtonConfig &cfg, + const HwInfo::Cpu &cpuInfo) +{ + double scaledCores = cpuInfo.cores() * cfg.feeding.concurrency; + uint32_t indexingThreads = std::max((uint32_t)std::ceil(scaledCores / 3), (uint32_t)cfg.indexing.threads); + return std::max(indexingThreads, 1u); +} + +} + +ThreadingServiceConfig +ThreadingServiceConfig::make(const ProtonConfig &cfg, + const HwInfo::Cpu &cpuInfo) +{ + uint32_t indexingThreads = calculateIndexingThreads(cfg, cpuInfo); + return ThreadingServiceConfig(indexingThreads, + cfg.indexing.tasklimit, + (cfg.indexing.semiunboundtasklimit / indexingThreads)); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h new file mode 100644 index 00000000000..67ab4171e80 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcore/config/config-proton.h> +#include <vespa/searchcore/proton/common/hw_info.h> +#include <cstdint> + +namespace proton { + +/** + * Config for the threading service used by a documentdb. + */ +class ThreadingServiceConfig { +public: + using ProtonConfig = vespa::config::search::core::ProtonConfig; + +private: + uint32_t _indexingThreads; + uint32_t _defaultTaskLimit; + uint32_t _semiUnboundTaskLimit; + +private: + ThreadingServiceConfig(uint32_t indexingThreads_, + uint32_t defaultTaskLimit_, + uint32_t semiUnboundTaskLimit_); + +public: + static ThreadingServiceConfig make(const ProtonConfig &cfg, + const HwInfo::Cpu &cpuInfo); + + uint32_t indexingThreads() const { return _indexingThreads; } + uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } + uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; } +}; + +} |