diff options
12 files changed, 97 insertions, 36 deletions
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp index 14ad82a0aa1..89930a6b1a3 100644 --- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp +++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp @@ -79,7 +79,7 @@ struct ViewPtrs ~ViewPtrs(); }; -ViewPtrs::~ViewPtrs() {} +ViewPtrs::~ViewPtrs() = default; struct ViewSet { diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 9434fe7d7ff..f895bd0c88d 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -132,6 +132,14 @@ indexing.tasklimit int default=1000 restart ## is 40000 then effective task limit is 10000. indexing.semiunboundtasklimit int default = 40000 restart +## Kind of watermark for when to activate extra manpower +## Utilized if optimize is set to either THROUGHPUT or ADAPTIVE +indexing.kind_of_watermark int default = 0 restart + +## Controls minimum reaction time in seconds if using THROUGHPUT +indexing.reactiontime double default = 0.005 restart + + ## How long a freshly loaded index shall be warmed up ## before being used for serving index.warmup.time double default=0.0 restart diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 1ee1b703ea9..a88fe109326 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -137,11 +137,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, ThreadingServiceConfig::make(protonCfg, findDocumentDB(protonCfg.documentdb, docTypeName.getName())->feeding.concurrency, hwInfo.cpu())), - _writeService(sharedExecutor, - _writeServiceConfig.indexingThreads(), - indexing_thread_stack_size, - _writeServiceConfig.defaultTaskLimit(), - _writeServiceConfig.optimize()), + _writeService(sharedExecutor, _writeServiceConfig, indexing_thread_stack_size), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), _initConfigSerialNum(0u), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index e3657eefdc8..b29dd955ff3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "executorthreadingservice.h" +#include "threading_service_config.h" #include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/singleexecutor.h> @@ -28,20 +29,24 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor } +ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads) + : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads)) +{} + ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor & sharedExecutor, - uint32_t threads, uint32_t stackSize, uint32_t taskLimit, - OptimizeFor optimize) + const ThreadingServiceConfig & cfg, uint32_t stackSize) : _sharedExecutor(sharedExecutor), _masterExecutor(1, stackSize), - _indexExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)), - _summaryExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)), + _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())), + _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())), _masterService(_masterExecutor), _indexService(*_indexExecutor), _summaryService(*_summaryExecutor), - _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)), - _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)), - _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize)) + _indexFieldInverter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())), + _indexFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())), + _attributeFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(), + cfg.kindOfwatermark(), cfg.reactionTime())) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index dc446a202ec..280e50aea56 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -8,6 +8,7 @@ namespace proton { class ExecutorThreadingServiceStats; +class ThreadingServiceConfig; /** * Implementation of IThreadingService using 2 underlying thread stack executors @@ -33,13 +34,11 @@ public: * Constructor. * * @stackSize The size of the stack of the underlying executors. - * @taskLimit The task limit for the index executor. + * @cfg config used to set up all executors. */ ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, - uint32_t threads = 1, - uint32_t stackSize = 128 * 1024, - uint32_t taskLimit = 1000, - OptimizeFor optimize = OptimizeFor::LATENCY); + const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024); + ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads = 1); ~ExecutorThreadingService() override; /** diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index e95920ca606..22eb64acdee 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -9,14 +9,19 @@ namespace proton { using ProtonConfig = ThreadingServiceConfig::ProtonConfig; using OptimizeFor = vespalib::Executor::OptimizeFor; + ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, - OptimizeFor optimize) + OptimizeFor optimize_, + uint32_t kindOfWatermark_, + vespalib::duration reactionTime_) : _indexingThreads(indexingThreads_), _defaultTaskLimit(defaultTaskLimit_), _semiUnboundTaskLimit(semiUnboundTaskLimit_), - _optimize(optimize) + _optimize(optimize_), + _kindOfWatermark(kindOfWatermark_), + _reactionTime(reactionTime_) { } @@ -52,7 +57,14 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing.threads, concurrency, cpuInfo); return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit, (cfg.indexing.semiunboundtasklimit / indexingThreads), - selectOptimization(cfg.indexing.optimize)); + selectOptimization(cfg.indexing.optimize), + cfg.indexing.kindOfWatermark, + vespalib::from_s(cfg.indexing.reactiontime)); +} + +ThreadingServiceConfig +ThreadingServiceConfig::make(uint32_t indexingThreads) { + return ThreadingServiceConfig(indexingThreads, 100, 1000, OptimizeFor::LATENCY, 0, 10ms); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index 149215f97dc..d608b49f49e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -3,6 +3,7 @@ #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/time.h> #include <cstdint> namespace vespa::config::search::core::internal { class InternalProtonType; } @@ -17,21 +18,25 @@ public: using OptimizeFor = vespalib::Executor::OptimizeFor; private: - uint32_t _indexingThreads; - uint32_t _defaultTaskLimit; - uint32_t _semiUnboundTaskLimit; - OptimizeFor _optimize; + uint32_t _indexingThreads; + uint32_t _defaultTaskLimit; + uint32_t _semiUnboundTaskLimit; + OptimizeFor _optimize; + uint32_t _kindOfWatermark; + vespalib::duration _reactionTime; // Minimum reaction time to new tasks private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize, uint32_t kindOfWatermark, vespalib::duration reactionTime); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); - + static ThreadingServiceConfig make(uint32_t indexingThreads); uint32_t indexingThreads() const { return _indexingThreads; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; } - OptimizeFor optimize() const { return _optimize;} + OptimizeFor optimize() const { return _optimize; } + uint32_t kindOfwatermark() const { return _kindOfWatermark; } + vespalib::duration reactionTime() const { return _reactionTime; } }; } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index f5f04738e92..70d0f1c743d 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <vespa/vespalib/util/adaptive_sequenced_executor.h> + #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/test/insertion_operators.h> @@ -245,6 +247,26 @@ TEST("require that you distribute well") { EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize()); } +TEST("Test creation of different types") { + auto iseq = SequencedTaskExecutor::create(1); + + EXPECT_EQUAL(1u, iseq->getNumExecutors()); + auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); + ASSERT_TRUE(seq != nullptr); + + iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY); + seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); + ASSERT_TRUE(seq != nullptr); + + iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT); + seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); + ASSERT_TRUE(seq != nullptr); + + iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); + auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get()); + ASSERT_TRUE(aseq != nullptr); +} + } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 6d42895bffc..a0c2f0ac237 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -15,16 +15,17 @@ constexpr uint32_t stackSize = 128 * 1024; std::unique_ptr<ISequencedTaskExecutor> -SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) +SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) { if (optimize == OptimizeFor::ADAPTIVE) { - return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, 0, taskLimit); + return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit); } else { auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>(); executors->reserve(threads); for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { - executors->push_back(std::make_unique<SingleExecutor>(taskLimit)); + uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark; + executors->push_back(std::make_unique<SingleExecutor>(taskLimit, watermark, reactionTime)); } else { executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 14127970403..b3dd400478a 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -2,6 +2,7 @@ #pragma once #include "isequencedtaskexecutor.h" +#include <vespa/vespalib/util/time.h> namespace vespalib { @@ -30,9 +31,10 @@ public: /* * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside. + * */ static std::unique_ptr<ISequencedTaskExecutor> - create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY); + create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms); }; } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 90eb18c23ef..a17037799a3 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -6,6 +6,10 @@ namespace vespalib { SingleExecutor::SingleExecutor(uint32_t taskLimit) + : SingleExecutor(taskLimit, taskLimit/10, 5ms) +{ } + +SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime) : _taskLimit(vespalib::roundUp2inN(taskLimit)), _wantedTaskLimit(_taskLimit.load()), _rp(0), @@ -19,10 +23,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wakeupConsumerAt(0), _producerNeedWakeupAt(0), _wp(0), + _watermark(std::min(_taskLimit.load(), watermark)), + _reactionTime(reactionTime), _closed(false) { + assert(taskLimit >= watermark); _thread.start(); } + SingleExecutor::~SingleExecutor() { shutdown(); sync(); @@ -62,7 +70,7 @@ SingleExecutor::execute(Task::UP task) { void SingleExecutor::setTaskLimit(uint32_t taskLimit) { - _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); + _wantedTaskLimit = std::max(vespalib::roundUp2inN(taskLimit), size_t(_watermark)); } void @@ -102,10 +110,10 @@ SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); _producerCondition.notify_all(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { - _consumerCondition.wait_for(lock, 10ms); + _consumerCondition.wait_for(lock, _reactionTime); } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } @@ -144,7 +152,7 @@ SingleExecutor::wait_for_room(Lock & lock) { } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, 10ms, wp - taskLimit/4); + sleepProducer(lock, _reactionTime, wp - _watermark); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 3d759769ea3..a58128c15aa 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -19,6 +19,7 @@ namespace vespalib { class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: explicit SingleExecutor(uint32_t taskLimit); + SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; @@ -56,6 +57,8 @@ private: std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; + const uint32_t _watermark; + const duration _reactionTime; bool _closed; }; |