1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "threading_service_config.h"
#include <vespa/searchcore/config/config-proton.h>
#include <cmath>
namespace proton {
using ProtonConfig = ThreadingServiceConfig::ProtonConfig;
using OptimizeFor = vespalib::Executor::OptimizeFor;
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
vespalib::duration reactionTime_,
SharedFieldWriterExecutor shared_field_writer_)
: _indexingThreads(indexingThreads_),
_defaultTaskLimit(defaultTaskLimit_),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
_reactionTime(reactionTime_),
_shared_field_writer(shared_field_writer_)
{
}
namespace {
uint32_t
calculateIndexingThreads(const ProtonConfig::Indexing & indexing, double concurrency, const HwInfo::Cpu &cpuInfo)
{
double scaledCores = cpuInfo.cores() * concurrency;
if (indexing.optimize != ProtonConfig::Indexing::Optimize::ADAPTIVE) {
// We are capping at 12 threads to reduce cost of waking up threads
// to achieve a better throughput.
// TODO: Fix this in a simpler/better way.
scaledCores = std::min(12.0, scaledCores);
}
uint32_t indexingThreads = std::max((int32_t)std::ceil(scaledCores / 3), indexing.threads);
return std::max(indexingThreads, 1u);
}
OptimizeFor
selectOptimization(ProtonConfig::Indexing::Optimize optimize) {
using CfgOptimize = ProtonConfig::Indexing::Optimize;
switch (optimize) {
case CfgOptimize::LATENCY: return OptimizeFor::LATENCY;
case CfgOptimize::THROUGHPUT: return OptimizeFor::THROUGHPUT;
case CfgOptimize::ADAPTIVE: return OptimizeFor::ADAPTIVE;
}
return OptimizeFor::LATENCY;
}
}
ThreadingServiceConfig
ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo)
{
uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo);
return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit,
selectOptimization(cfg.indexing.optimize),
cfg.indexing.kindOfWatermark,
vespalib::from_s(cfg.indexing.reactiontime),
cfg.feeding.sharedFieldWriterExecutor);
}
ThreadingServiceConfig
ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) {
return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
}
void
ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg)
{
_defaultTaskLimit = cfg._defaultTaskLimit;
}
bool
ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
{
return _indexingThreads == rhs._indexingThreads &&
_defaultTaskLimit == rhs._defaultTaskLimit &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
_reactionTime == rhs._reactionTime &&
_shared_field_writer == rhs._shared_field_writer;
}
}
|