summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-06 15:06:43 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-06 15:06:43 +0000
commitba1dcd889e54af3c16c2122c000cca94ca03d22e (patch)
treeb7e069e930d2812cba79e982f005198ffe7eb727
parentb5882399caf70648f9dba597696d20019e574fea (diff)
Add control for kind_of_watermark and reaction time.
-rw-r--r--searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h19
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp22
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp16
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h3
12 files changed, 97 insertions, 36 deletions
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
index 14ad82a0aa1..89930a6b1a3 100644
--- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
@@ -79,7 +79,7 @@ struct ViewPtrs
~ViewPtrs();
};
-ViewPtrs::~ViewPtrs() {}
+ViewPtrs::~ViewPtrs() = default;
struct ViewSet
{
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 9434fe7d7ff..f895bd0c88d 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -132,6 +132,14 @@ indexing.tasklimit int default=1000 restart
## is 40000 then effective task limit is 10000.
indexing.semiunboundtasklimit int default = 40000 restart
+## Kind of watermark for when to activate extra manpower
+## Utilized if optimize is set to either THROUGHPUT or ADAPTIVE
+indexing.kind_of_watermark int default = 0 restart
+
+## Controls minimum reaction time in seconds if using THROUGHPUT
+indexing.reactiontime double default = 0.005 restart
+
+
## How long a freshly loaded index shall be warmed up
## before being used for serving
index.warmup.time double default=0.0 restart
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 1ee1b703ea9..a88fe109326 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -137,11 +137,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
ThreadingServiceConfig::make(protonCfg,
findDocumentDB(protonCfg.documentdb, docTypeName.getName())->feeding.concurrency,
hwInfo.cpu())),
- _writeService(sharedExecutor,
- _writeServiceConfig.indexingThreads(),
- indexing_thread_stack_size,
- _writeServiceConfig.defaultTaskLimit(),
- _writeServiceConfig.optimize()),
+ _writeService(sharedExecutor, _writeServiceConfig, indexing_thread_stack_size),
_initializeThreads(std::move(initializeThreads)),
_initConfigSnapshot(),
_initConfigSerialNum(0u),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index e3657eefdc8..b29dd955ff3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "executorthreadingservice.h"
+#include "threading_service_config.h"
#include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/singleexecutor.h>
@@ -28,20 +29,24 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor
}
+ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads)
+ : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads))
+{}
+
ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor & sharedExecutor,
- uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
- OptimizeFor optimize)
+ const ThreadingServiceConfig & cfg, uint32_t stackSize)
: _sharedExecutor(sharedExecutor),
_masterExecutor(1, stackSize),
- _indexExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
- _summaryExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
+ _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())),
+ _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())),
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
_summaryService(*_summaryExecutor),
- _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)),
- _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)),
- _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize))
+ _indexFieldInverter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())),
+ _indexFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())),
+ _attributeFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(),
+ cfg.kindOfwatermark(), cfg.reactionTime()))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index dc446a202ec..280e50aea56 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -8,6 +8,7 @@
namespace proton {
class ExecutorThreadingServiceStats;
+class ThreadingServiceConfig;
/**
* Implementation of IThreadingService using 2 underlying thread stack executors
@@ -33,13 +34,11 @@ public:
* Constructor.
*
* @stackSize The size of the stack of the underlying executors.
- * @taskLimit The task limit for the index executor.
+ * @cfg config used to set up all executors.
*/
ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor,
- uint32_t threads = 1,
- uint32_t stackSize = 128 * 1024,
- uint32_t taskLimit = 1000,
- OptimizeFor optimize = OptimizeFor::LATENCY);
+ const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024);
+ ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads = 1);
~ExecutorThreadingService() override;
/**
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index e95920ca606..22eb64acdee 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -9,14 +9,19 @@ namespace proton {
using ProtonConfig = ThreadingServiceConfig::ProtonConfig;
using OptimizeFor = vespalib::Executor::OptimizeFor;
+
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t defaultTaskLimit_,
uint32_t semiUnboundTaskLimit_,
- OptimizeFor optimize)
+ OptimizeFor optimize_,
+ uint32_t kindOfWatermark_,
+ vespalib::duration reactionTime_)
: _indexingThreads(indexingThreads_),
_defaultTaskLimit(defaultTaskLimit_),
_semiUnboundTaskLimit(semiUnboundTaskLimit_),
- _optimize(optimize)
+ _optimize(optimize_),
+ _kindOfWatermark(kindOfWatermark_),
+ _reactionTime(reactionTime_)
{
}
@@ -52,7 +57,14 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing.threads, concurrency, cpuInfo);
return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit,
(cfg.indexing.semiunboundtasklimit / indexingThreads),
- selectOptimization(cfg.indexing.optimize));
+ selectOptimization(cfg.indexing.optimize),
+ cfg.indexing.kindOfWatermark,
+ vespalib::from_s(cfg.indexing.reactiontime));
+}
+
+ThreadingServiceConfig
+ThreadingServiceConfig::make(uint32_t indexingThreads) {
+ return ThreadingServiceConfig(indexingThreads, 100, 1000, OptimizeFor::LATENCY, 0, 10ms);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index 149215f97dc..d608b49f49e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -3,6 +3,7 @@
#include <vespa/searchcore/proton/common/hw_info.h>
#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/time.h>
#include <cstdint>
namespace vespa::config::search::core::internal { class InternalProtonType; }
@@ -17,21 +18,25 @@ public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
private:
- uint32_t _indexingThreads;
- uint32_t _defaultTaskLimit;
- uint32_t _semiUnboundTaskLimit;
- OptimizeFor _optimize;
+ uint32_t _indexingThreads;
+ uint32_t _defaultTaskLimit;
+ uint32_t _semiUnboundTaskLimit;
+ OptimizeFor _optimize;
+ uint32_t _kindOfWatermark;
+ vespalib::duration _reactionTime; // Minimum reaction time to new tasks
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize, uint32_t kindOfWatermark, vespalib::duration reactionTime);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
-
+ static ThreadingServiceConfig make(uint32_t indexingThreads);
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; }
- OptimizeFor optimize() const { return _optimize;}
+ OptimizeFor optimize() const { return _optimize; }
+ uint32_t kindOfwatermark() const { return _kindOfWatermark; }
+ vespalib::duration reactionTime() const { return _reactionTime; }
};
}
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index f5f04738e92..70d0f1c743d 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
+
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/test/insertion_operators.h>
@@ -245,6 +247,26 @@ TEST("require that you distribute well") {
EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize());
}
+TEST("Test creation of different types") {
+ auto iseq = SequencedTaskExecutor::create(1);
+
+ EXPECT_EQUAL(1u, iseq->getNumExecutors());
+ auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
+ ASSERT_TRUE(seq != nullptr);
+
+ iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY);
+ seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
+ ASSERT_TRUE(seq != nullptr);
+
+ iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT);
+ seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
+ ASSERT_TRUE(seq != nullptr);
+
+ iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
+ auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get());
+ ASSERT_TRUE(aseq != nullptr);
+}
+
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 6d42895bffc..a0c2f0ac237 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -15,16 +15,17 @@ constexpr uint32_t stackSize = 128 * 1024;
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize)
+SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
{
if (optimize == OptimizeFor::ADAPTIVE) {
- return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, 0, taskLimit);
+ return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit);
} else {
auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>();
executors->reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
- executors->push_back(std::make_unique<SingleExecutor>(taskLimit));
+ uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark;
+ executors->push_back(std::make_unique<SingleExecutor>(taskLimit, watermark, reactionTime));
} else {
executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 14127970403..b3dd400478a 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -2,6 +2,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/time.h>
namespace vespalib {
@@ -30,9 +31,10 @@ public:
/*
* Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside.
+ *
*/
static std::unique_ptr<ISequencedTaskExecutor>
- create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY);
+ create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
};
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 90eb18c23ef..a17037799a3 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -6,6 +6,10 @@
namespace vespalib {
SingleExecutor::SingleExecutor(uint32_t taskLimit)
+ : SingleExecutor(taskLimit, taskLimit/10, 5ms)
+{ }
+
+SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime)
: _taskLimit(vespalib::roundUp2inN(taskLimit)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
@@ -19,10 +23,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
_wp(0),
+ _watermark(std::min(_taskLimit.load(), watermark)),
+ _reactionTime(reactionTime),
_closed(false)
{
+ assert(taskLimit >= watermark);
_thread.start();
}
+
SingleExecutor::~SingleExecutor() {
shutdown();
sync();
@@ -62,7 +70,7 @@ SingleExecutor::execute(Task::UP task) {
void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
- _wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
+ _wantedTaskLimit = std::max(vespalib::roundUp2inN(taskLimit), size_t(_watermark));
}
void
@@ -102,10 +110,10 @@ SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
_producerCondition.notify_all();
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed);
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, 10ms);
+ _consumerCondition.wait_for(lock, _reactionTime);
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -144,7 +152,7 @@ SingleExecutor::wait_for_room(Lock & lock) {
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, 10ms, wp - taskLimit/4);
+ sleepProducer(lock, _reactionTime, wp - _watermark);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 3d759769ea3..a58128c15aa 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -19,6 +19,7 @@ namespace vespalib {
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
explicit SingleExecutor(uint32_t taskLimit);
+ SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
@@ -56,6 +57,8 @@ private:
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
+ const uint32_t _watermark;
+ const duration _reactionTime;
bool _closed;
};