1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/searchcore/config/config-proton.h>
#include <vespa/searchcore/proton/server/shared_threading_service.h>
#include <vespa/searchcore/proton/server/shared_threading_service_config.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
using namespace proton;
using vespalib::ISequencedTaskExecutor;
using vespalib::SequencedTaskExecutor;
using ProtonConfig = vespa::config::search::core::ProtonConfig;
using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
ProtonConfig
make_proton_config(double concurrency)
{
ProtonConfigBuilder builder;
// This setup requires a minimum of 4 shared threads.
builder.documentdb.push_back(ProtonConfig::Documentdb());
builder.documentdb.push_back(ProtonConfig::Documentdb());
builder.flush.maxconcurrent = 1;
builder.feeding.concurrency = concurrency;
builder.feeding.sharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor::DOCUMENT_DB;
builder.indexing.tasklimit = 255;
return builder;
}
void
expect_shared_threads(uint32_t exp_threads, uint32_t cpu_cores)
{
auto cfg = SharedThreadingServiceConfig::make(make_proton_config(0.5), HwInfo::Cpu(cpu_cores));
EXPECT_EQ(exp_threads, cfg.shared_threads());
EXPECT_EQ(exp_threads * 16, cfg.shared_task_limit());
}
TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores_and_feeding_concurrency)
{
expect_shared_threads(4, 1);
expect_shared_threads(4, 6);
expect_shared_threads(4, 8);
expect_shared_threads(5, 9);
expect_shared_threads(5, 10);
}
class SharedThreadingServiceTest : public ::testing::Test {
public:
std::unique_ptr<SharedThreadingService> service;
SharedThreadingServiceTest()
: service()
{
}
void setup(double concurrency, uint32_t cpu_cores) {
service = std::make_unique<SharedThreadingService>(
SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)));
}
SequencedTaskExecutor* field_writer() {
return dynamic_cast<SequencedTaskExecutor*>(service->field_writer());
}
};
void
assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t exp_task_limit)
{
EXPECT_EQ(exp_executors, exec->getNumExecutors());
EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit());
}
TEST_F(SharedThreadingServiceTest, field_writer_can_be_shared_across_all_document_dbs)
{
setup(0.75, 8);
EXPECT_TRUE(field_writer());
EXPECT_EQ(6, field_writer()->getNumExecutors());
// This is rounded to the nearest power of 2 when using THROUGHPUT feed executor.
EXPECT_EQ(256, field_writer()->first_executor()->getTaskLimit());
}
GTEST_MAIN_RUN_ALL_TESTS()
|