aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
blob: ff75a59c41b8f9ef669916c58476ba5ed3dea380 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "threading_service_config.h"
#include <vespa/searchcore/config/config-proton.h>
#include <cmath>

namespace proton {

using ProtonConfig = ThreadingServiceConfig::ProtonConfig;
using OptimizeFor = vespalib::Executor::OptimizeFor;


ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
                                               uint32_t master_task_limit_,
                                               uint32_t defaultTaskLimit_,
                                               OptimizeFor optimize_,
                                               uint32_t kindOfWatermark_,
                                               vespalib::duration reactionTime_,
                                               SharedFieldWriterExecutor shared_field_writer_)
    : _indexingThreads(indexingThreads_),
      _master_task_limit(master_task_limit_),
      _defaultTaskLimit(defaultTaskLimit_),
      _optimize(optimize_),
      _kindOfWatermark(kindOfWatermark_),
      _reactionTime(reactionTime_),
      _shared_field_writer(shared_field_writer_)
{
}

namespace {

uint32_t
calculateIndexingThreads(const ProtonConfig::Indexing & indexing, double concurrency, const HwInfo::Cpu &cpuInfo)
{
    double scaledCores = cpuInfo.cores() * concurrency;
    if (indexing.optimize != ProtonConfig::Indexing::Optimize::ADAPTIVE) {
        // We are capping at 12 threads to reduce cost of waking up threads
        // to achieve a better throughput.
        // TODO: Fix this in a simpler/better way.
        scaledCores = std::min(12.0, scaledCores);
    }

    uint32_t indexingThreads = std::max((int32_t)std::ceil(scaledCores / 3), indexing.threads);
    return std::max(indexingThreads, 1u);
}

OptimizeFor
selectOptimization(ProtonConfig::Indexing::Optimize optimize) {
    using CfgOptimize = ProtonConfig::Indexing::Optimize;
    switch (optimize) {
        case CfgOptimize::LATENCY: return OptimizeFor::LATENCY;
        case CfgOptimize::THROUGHPUT: return OptimizeFor::THROUGHPUT;
        case CfgOptimize::ADAPTIVE: return OptimizeFor::ADAPTIVE;
    }
    return OptimizeFor::LATENCY;
}

}

ThreadingServiceConfig
ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo)
{
    uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo);
    return ThreadingServiceConfig(indexingThreads,
                                  cfg.feeding.masterTaskLimit,
                                  cfg.indexing.tasklimit,
                                  selectOptimization(cfg.indexing.optimize),
                                  cfg.indexing.kindOfWatermark,
                                  vespalib::from_s(cfg.indexing.reactiontime),
                                  cfg.feeding.sharedFieldWriterExecutor);
}

ThreadingServiceConfig
ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) {
    return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_);
}

void
ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg)
{
    _master_task_limit = cfg._master_task_limit;
    _defaultTaskLimit = cfg._defaultTaskLimit;
}

bool
ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
{
    return _indexingThreads == rhs._indexingThreads &&
        _master_task_limit == rhs._master_task_limit &&
        _defaultTaskLimit == rhs._defaultTaskLimit &&
        _optimize == rhs._optimize &&
        _kindOfWatermark == rhs._kindOfWatermark &&
        _reactionTime == rhs._reactionTime &&
        _shared_field_writer == rhs._shared_field_writer;
}

}