summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-11 19:28:20 +0200
committerGitHub <noreply@github.com>2017-10-11 19:28:20 +0200
commite3dea9daba5b89fab618921a94f9394f8ae9144c (patch)
tree4afba0fa93e2ca5894dff7deede43fb50428630d /searchcore
parent8ba8fa4e896d8c9775a868e524d9ef37072aeaa6 (diff)
parent0d30571f227704ae5ffee683f0402c7852f85fb4 (diff)
Merge pull request #3716 from vespa-engine/geirst/add-feeding-concurrency-setting-to-proton
Geirst/add feeding concurrency setting to proton
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/CMakeLists.txt8
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp66
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp23
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h17
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp43
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h36
10 files changed, 189 insertions, 33 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index 0dce35f7eb3..2944b753dca 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -97,6 +97,7 @@ vespa_define_module(
src/tests/proton/documentdb/maintenancecontroller
src/tests/proton/documentdb/move_operation_limiter
src/tests/proton/documentdb/storeonlyfeedview
+ src/tests/proton/documentdb/threading_service_config
src/tests/proton/documentmetastore
src/tests/proton/documentmetastore/lidreusedelayer
src/tests/proton/feed_and_search
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/threading_service_config/CMakeLists.txt
new file mode 100644
index 00000000000..214b5c9b86d
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_threading_service_config_test_app TEST
+ SOURCES
+ threading_service_config_test.cpp
+ DEPENDS
+ searchcore_server
+)
+vespa_add_test(NAME searchcore_threading_service_config_test_app COMMAND searchcore_threading_service_config_test_app)
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
new file mode 100644
index 00000000000..658ebe818eb
--- /dev/null
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
@@ -0,0 +1,66 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/log/log.h>
+LOG_SETUP("threading_service_config_test");
+
+#include <vespa/searchcore/config/config-proton.h>
+#include <vespa/searchcore/proton/common/hw_info.h>
+#include <vespa/searchcore/proton/server/threading_service_config.h>
+#include <vespa/vespalib/testkit/testapp.h>
+
+using namespace proton;
+using ProtonConfig = vespa::config::search::core::ProtonConfig;
+using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
+
+struct Fixture {
+ ProtonConfig cfg;
+ Fixture(uint32_t baseLineIndexingThreads = 2)
+ : cfg(makeConfig(baseLineIndexingThreads))
+ {
+ }
+ ProtonConfig makeConfig(uint32_t baseLineIndexingThreads) {
+ ProtonConfigBuilder builder;
+ builder.indexing.threads = baseLineIndexingThreads;
+ builder.indexing.tasklimit = 500;
+ builder.indexing.semiunboundtasklimit = 50000;
+ builder.feeding.concurrency = 0.5;
+ return builder;
+ }
+ ThreadingServiceConfig make(uint32_t cpuCores) {
+ return ThreadingServiceConfig::make(cfg, HwInfo::Cpu(cpuCores));
+ }
+ void assertIndexingThreads(uint32_t expIndexingThreads, uint32_t cpuCores) {
+ EXPECT_EQUAL(expIndexingThreads, make(cpuCores).indexingThreads());
+ }
+};
+
+TEST_F("require that indexing threads are set based on cpu cores and feeding concurrency", Fixture)
+{
+ TEST_DO(f.assertIndexingThreads(2, 1));
+ TEST_DO(f.assertIndexingThreads(2, 4));
+ TEST_DO(f.assertIndexingThreads(2, 8));
+ TEST_DO(f.assertIndexingThreads(2, 12));
+ TEST_DO(f.assertIndexingThreads(3, 13));
+ TEST_DO(f.assertIndexingThreads(3, 18));
+ TEST_DO(f.assertIndexingThreads(4, 19));
+ TEST_DO(f.assertIndexingThreads(4, 24));
+}
+
+TEST_F("require that indexing threads is always >= 1", Fixture(0))
+{
+ TEST_DO(f.assertIndexingThreads(1, 0));
+}
+
+TEST_F("require that default task limit is set", Fixture)
+{
+ EXPECT_EQUAL(500u, f.make(24).defaultTaskLimit());
+}
+
+TEST_F("require that semiunbound task limit is scaled based on indexing threads", Fixture)
+{
+ EXPECT_EQUAL(12500u, f.make(24).semiUnboundTaskLimit());
+}
+
+TEST_MAIN()
+{
+ TEST_RUN_ALL();
+}
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 520dab6c988..730c396c4f6 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -433,6 +433,21 @@ hwinfo.memory.size long default = 0
## If set to 0, this is sampled by using std::thread::hardware_concurrency().
hwinfo.cpu.cores int default = 0
+## A number between 0.0 and 1.0 that specifies the concurrency when handling feed operations.
+## When set to 1.0 all cores on the cpu is utilized.
+##
+## 4 thread pools used for various aspect of feeding are configured based on this setting:
+## 1) Compressing and compacting documents
+## 2) Writing changes to attribute fields
+## 3) Inverting index fields
+## 4) Writing changes to index fields
+##
+## The number of threads in pool 1 is calculated as:
+## max(ceil(hwinfo.cpu.cores * feeding.concurrency), summary.log.numthreads)
+## The number of threads in each of pools 2-4 is calculated as:
+## max(ceil((hwinfo.cpu.cores * feeding.concurrency)/3), indexing.threads)
+feeding.concurrency double default = 1.0 restart
+
## Adjustment to resource limit when determining if maintenance jobs can run.
##
## Currently used by 'lid_space_compaction' and 'move_buckets' jobs.
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 1716441c30b..5bb512c12ce 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -93,6 +93,7 @@ vespa_add_library(searchcore_server STATIC
storeonlydocsubdb.cpp
storeonlyfeedview.cpp
summaryadapter.cpp
+ threading_service_config.cpp
tlcproxy.cpp
tlssyncer.cpp
transactionlogmanager.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index ada2c1b1069..cffa014534e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -61,16 +61,7 @@ using searchcorespi::IFlushTarget;
namespace proton {
namespace {
-
constexpr uint32_t indexing_thread_stack_size = 128 * 1024;
-
-uint32_t semiUnboundTaskLimit(uint32_t semiUnboundExecutorTaskLimit,
- uint32_t indexingThreads)
-{
- uint32_t taskLimit = semiUnboundExecutorTaskLimit / indexingThreads;
- return taskLimit;
-}
-
}
template <typename FunctionType>
@@ -106,12 +97,10 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_bucketSpace(bucketSpace),
_baseDir(baseDir + "/" + _docTypeName.toString()),
// Only one thread per executor, or performDropFeedView() will fail.
- _defaultExecutorTaskLimit(protonCfg.indexing.tasklimit),
- _semiUnboundExecutorTaskLimit(protonCfg.indexing.semiunboundtasklimit),
- _indexingThreads(protonCfg.indexing.threads),
- _writeService(std::max(1, protonCfg.indexing.threads),
+ _writeServiceConfig(ThreadingServiceConfig::make(protonCfg, hwInfo.cpu())),
+ _writeService(_writeServiceConfig.indexingThreads(),
indexing_thread_stack_size,
- _defaultExecutorTaskLimit),
+ _writeServiceConfig.defaultTaskLimit()),
_initializeThreads(initializeThreads),
_initConfigSnapshot(),
_initConfigSerialNum(0u),
@@ -181,7 +170,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
fastos::TimeStamp visibilityDelay = loaded_config->getMaintenanceConfigSP()->getVisibilityDelay();
_visibility.setVisibilityDelay(visibilityDelay);
if (_visibility.getVisibilityDelay() > 0) {
- _writeService.setTaskLimit(semiUnboundTaskLimit(_semiUnboundExecutorTaskLimit, _indexingThreads));
+ _writeService.setTaskLimit(_writeServiceConfig.semiUnboundTaskLimit());
}
}
@@ -425,9 +414,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_visibility.setVisibilityDelay(visibilityDelay);
}
if (_visibility.getVisibilityDelay() > 0) {
- _writeService.setTaskLimit(semiUnboundTaskLimit(_semiUnboundExecutorTaskLimit, _indexingThreads));
+ _writeService.setTaskLimit(_writeServiceConfig.semiUnboundTaskLimit());
} else {
- _writeService.setTaskLimit(_defaultExecutorTaskLimit);
+ _writeService.setTaskLimit(_writeServiceConfig.defaultTaskLimit());
}
if (params.shouldSubDbsChange() || hasVisibilityDelayChanged) {
applySubDBConfig(*configSnapshot, serialNum, params);
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index cd0335b7b75..e23cd78b3ad 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -5,20 +5,23 @@
#include "clusterstatehandler.h"
#include "configstore.h"
#include "ddbstate.h"
+#include "disk_mem_usage_forwarder.h"
#include "documentdbconfig.h"
#include "documentsubdbcollection.h"
+#include "executorthreadingservice.h"
#include "feedhandler.h"
+#include "i_document_db_config_owner.h"
+#include "i_document_subdb_owner.h"
#include "i_feed_handler_owner.h"
#include "i_lid_space_compaction_handler.h"
#include "ifeedview.h"
#include "ireplayconfig.h"
#include "maintenancecontroller.h"
-#include "i_document_db_config_owner.h"
-#include "executorthreadingservice.h"
+#include "threading_service_config.h"
#include "visibilityhandler.h"
-#include "i_document_subdb_owner.h"
-#include "disk_mem_usage_forwarder.h"
+#include <vespa/metrics/updatehook.h>
+#include <vespa/searchcore/proton/attribute/attribute_usage_filter.h>
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/searchcore/proton/metrics/documentdb_job_trackers.h>
@@ -28,8 +31,6 @@
#include <vespa/searchlib/docstore/cachestats.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
#include <vespa/vespalib/util/varholder.h>
-#include <vespa/searchcore/proton/attribute/attribute_usage_filter.h>
-#include <vespa/metrics/updatehook.h>
#include <mutex>
#include <condition_variable>
@@ -80,9 +81,7 @@ private:
DocTypeName _docTypeName;
document::BucketSpace _bucketSpace;
vespalib::string _baseDir;
- uint32_t _defaultExecutorTaskLimit;
- uint32_t _semiUnboundExecutorTaskLimit;
- uint32_t _indexingThreads;
+ ThreadingServiceConfig _writeServiceConfig;
// Only one thread per executor, or dropFeedView() will fail.
ExecutorThreadingService _writeService;
// threads for initializer tasks during proton startup
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 556743ad267..36f19c385f7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -85,12 +85,10 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton, const HwInfo &hwInfo)
}
size_t
-deriveBackgroundThreads(const ProtonConfig & proton) {
- size_t threads = std::thread::hardware_concurrency();
- if (proton.background.threads != 0) {
- threads = proton.background.threads;
- }
- threads = std::max(threads, size_t(proton.summary.log.numthreads));
+deriveCompactionCompressionThreads(const ProtonConfig &proton,
+ const HwInfo::Cpu &cpuInfo) {
+ size_t scaledCores = (size_t)std::ceil(cpuInfo.cores() * proton.feeding.concurrency);
+ size_t threads = std::max(scaledCores, size_t(proton.summary.log.numthreads));
// We need at least 1 guaranteed free worker in order to ensure progress so #documentsdbs + 1 should suffice,
// but we will not be cheap and give #documentsdbs * 2
@@ -302,7 +300,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
vespalib::string fileConfigId;
_warmupExecutor.reset(new vespalib::ThreadStackExecutor(4, 128*1024));
- const size_t summaryThreads = deriveBackgroundThreads(protonConfig);
+ const size_t summaryThreads = deriveCompactionCompressionThreads(protonConfig, _hwInfo.cpu());
_summaryExecutor.reset(new vespalib::BlockingThreadStackExecutor(summaryThreads, 128*1024, summaryThreads*16));
InitializeThreads initializeThreads;
if (protonConfig.initialize.threads > 0) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
new file mode 100644
index 00000000000..6422df9cbd2
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -0,0 +1,43 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "threading_service_config.h"
+#include <vespa/searchcore/proton/common/hw_info.h>
+#include <cmath>
+
+namespace proton {
+
+using ProtonConfig = ThreadingServiceConfig::ProtonConfig;
+
+ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
+ uint32_t defaultTaskLimit_,
+ uint32_t semiUnboundTaskLimit_)
+ : _indexingThreads(indexingThreads_),
+ _defaultTaskLimit(defaultTaskLimit_),
+ _semiUnboundTaskLimit(semiUnboundTaskLimit_)
+{
+}
+
+namespace {
+
+uint32_t
+calculateIndexingThreads(const ProtonConfig &cfg,
+ const HwInfo::Cpu &cpuInfo)
+{
+ double scaledCores = cpuInfo.cores() * cfg.feeding.concurrency;
+ uint32_t indexingThreads = std::max((uint32_t)std::ceil(scaledCores / 3), (uint32_t)cfg.indexing.threads);
+ return std::max(indexingThreads, 1u);
+}
+
+}
+
+ThreadingServiceConfig
+ThreadingServiceConfig::make(const ProtonConfig &cfg,
+ const HwInfo::Cpu &cpuInfo)
+{
+ uint32_t indexingThreads = calculateIndexingThreads(cfg, cpuInfo);
+ return ThreadingServiceConfig(indexingThreads,
+ cfg.indexing.tasklimit,
+ (cfg.indexing.semiunboundtasklimit / indexingThreads));
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
new file mode 100644
index 00000000000..67ab4171e80
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -0,0 +1,36 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcore/config/config-proton.h>
+#include <vespa/searchcore/proton/common/hw_info.h>
+#include <cstdint>
+
+namespace proton {
+
+/**
+ * Config for the threading service used by a documentdb.
+ */
+class ThreadingServiceConfig {
+public:
+ using ProtonConfig = vespa::config::search::core::ProtonConfig;
+
+private:
+ uint32_t _indexingThreads;
+ uint32_t _defaultTaskLimit;
+ uint32_t _semiUnboundTaskLimit;
+
+private:
+ ThreadingServiceConfig(uint32_t indexingThreads_,
+ uint32_t defaultTaskLimit_,
+ uint32_t semiUnboundTaskLimit_);
+
+public:
+ static ThreadingServiceConfig make(const ProtonConfig &cfg,
+ const HwInfo::Cpu &cpuInfo);
+
+ uint32_t indexingThreads() const { return _indexingThreads; }
+ uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
+ uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; }
+};
+
+}