summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-06 15:06:43 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-06 15:06:43 +0000
commitba1dcd889e54af3c16c2122c000cca94ca03d22e (patch)
treeb7e069e930d2812cba79e982f005198ffe7eb727 /searchcore
parentb5882399caf70648f9dba597696d20019e574fea (diff)
Add control for kind_of_watermark and reaction time.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h19
7 files changed, 53 insertions, 28 deletions
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
index 14ad82a0aa1..89930a6b1a3 100644
--- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
@@ -79,7 +79,7 @@ struct ViewPtrs
~ViewPtrs();
};
-ViewPtrs::~ViewPtrs() {}
+ViewPtrs::~ViewPtrs() = default;
struct ViewSet
{
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 9434fe7d7ff..f895bd0c88d 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -132,6 +132,14 @@ indexing.tasklimit int default=1000 restart
## is 40000 then effective task limit is 10000.
indexing.semiunboundtasklimit int default = 40000 restart
+## Kind of watermark for when to activate extra manpower
+## Utilized if optimize is set to either THROUGHPUT or ADAPTIVE
+indexing.kind_of_watermark int default = 0 restart
+
+## Controls minimum reaction time in seconds if using THROUGHPUT
+indexing.reactiontime double default = 0.005 restart
+
+
## How long a freshly loaded index shall be warmed up
## before being used for serving
index.warmup.time double default=0.0 restart
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 1ee1b703ea9..a88fe109326 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -137,11 +137,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
ThreadingServiceConfig::make(protonCfg,
findDocumentDB(protonCfg.documentdb, docTypeName.getName())->feeding.concurrency,
hwInfo.cpu())),
- _writeService(sharedExecutor,
- _writeServiceConfig.indexingThreads(),
- indexing_thread_stack_size,
- _writeServiceConfig.defaultTaskLimit(),
- _writeServiceConfig.optimize()),
+ _writeService(sharedExecutor, _writeServiceConfig, indexing_thread_stack_size),
_initializeThreads(std::move(initializeThreads)),
_initConfigSnapshot(),
_initConfigSerialNum(0u),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index e3657eefdc8..b29dd955ff3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "executorthreadingservice.h"
+#include "threading_service_config.h"
#include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/singleexecutor.h>
@@ -28,20 +29,24 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor
}
+ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads)
+ : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads))
+{}
+
ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor & sharedExecutor,
- uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
- OptimizeFor optimize)
+ const ThreadingServiceConfig & cfg, uint32_t stackSize)
: _sharedExecutor(sharedExecutor),
_masterExecutor(1, stackSize),
- _indexExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
- _summaryExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)),
+ _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())),
+ _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())),
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
_summaryService(*_summaryExecutor),
- _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)),
- _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)),
- _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize))
+ _indexFieldInverter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())),
+ _indexFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())),
+ _attributeFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(),
+ cfg.kindOfwatermark(), cfg.reactionTime()))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
index dc446a202ec..280e50aea56 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h
@@ -8,6 +8,7 @@
namespace proton {
class ExecutorThreadingServiceStats;
+class ThreadingServiceConfig;
/**
* Implementation of IThreadingService using 2 underlying thread stack executors
@@ -33,13 +34,11 @@ public:
* Constructor.
*
* @stackSize The size of the stack of the underlying executors.
- * @taskLimit The task limit for the index executor.
+ * @cfg config used to set up all executors.
*/
ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor,
- uint32_t threads = 1,
- uint32_t stackSize = 128 * 1024,
- uint32_t taskLimit = 1000,
- OptimizeFor optimize = OptimizeFor::LATENCY);
+ const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024);
+ ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads = 1);
~ExecutorThreadingService() override;
/**
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index e95920ca606..22eb64acdee 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -9,14 +9,19 @@ namespace proton {
using ProtonConfig = ThreadingServiceConfig::ProtonConfig;
using OptimizeFor = vespalib::Executor::OptimizeFor;
+
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t defaultTaskLimit_,
uint32_t semiUnboundTaskLimit_,
- OptimizeFor optimize)
+ OptimizeFor optimize_,
+ uint32_t kindOfWatermark_,
+ vespalib::duration reactionTime_)
: _indexingThreads(indexingThreads_),
_defaultTaskLimit(defaultTaskLimit_),
_semiUnboundTaskLimit(semiUnboundTaskLimit_),
- _optimize(optimize)
+ _optimize(optimize_),
+ _kindOfWatermark(kindOfWatermark_),
+ _reactionTime(reactionTime_)
{
}
@@ -52,7 +57,14 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const
uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing.threads, concurrency, cpuInfo);
return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit,
(cfg.indexing.semiunboundtasklimit / indexingThreads),
- selectOptimization(cfg.indexing.optimize));
+ selectOptimization(cfg.indexing.optimize),
+ cfg.indexing.kindOfWatermark,
+ vespalib::from_s(cfg.indexing.reactiontime));
+}
+
+ThreadingServiceConfig
+ThreadingServiceConfig::make(uint32_t indexingThreads) {
+ return ThreadingServiceConfig(indexingThreads, 100, 1000, OptimizeFor::LATENCY, 0, 10ms);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index 149215f97dc..d608b49f49e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -3,6 +3,7 @@
#include <vespa/searchcore/proton/common/hw_info.h>
#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/time.h>
#include <cstdint>
namespace vespa::config::search::core::internal { class InternalProtonType; }
@@ -17,21 +18,25 @@ public:
using OptimizeFor = vespalib::Executor::OptimizeFor;
private:
- uint32_t _indexingThreads;
- uint32_t _defaultTaskLimit;
- uint32_t _semiUnboundTaskLimit;
- OptimizeFor _optimize;
+ uint32_t _indexingThreads;
+ uint32_t _defaultTaskLimit;
+ uint32_t _semiUnboundTaskLimit;
+ OptimizeFor _optimize;
+ uint32_t _kindOfWatermark;
+ vespalib::duration _reactionTime; // Minimum reaction time to new tasks
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize, uint32_t kindOfWatermark, vespalib::duration reactionTime);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
-
+ static ThreadingServiceConfig make(uint32_t indexingThreads);
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; }
- OptimizeFor optimize() const { return _optimize;}
+ OptimizeFor optimize() const { return _optimize; }
+ uint32_t kindOfwatermark() const { return _kindOfWatermark; }
+ vespalib::duration reactionTime() const { return _reactionTime; }
};
}