aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp
blob: 1b6fa49df72e9d094633ecf7965cbdd259dc3156 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/config-proton.h>
#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/searchcore/proton/server/shared_threading_service.h>
#include <vespa/searchcore/proton/server/shared_threading_service_config.h>
#include <vespa/searchcore/proton/test/transport_helper.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>

using ProtonConfig = vespa::config::search::core::ProtonConfig;
using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
using namespace proton;
using storage::spi::dummy::DummyBucketExecutor;
using vespalib::ISequencedTaskExecutor;
using vespalib::SequencedTaskExecutor;

ProtonConfig
make_proton_config(double concurrency, uint32_t indexing_threads = 1)
{
    ProtonConfigBuilder builder;
    builder.documentdb.push_back(ProtonConfig::Documentdb());
    builder.documentdb.push_back(ProtonConfig::Documentdb());
    builder.flush.maxconcurrent = 1;

    builder.feeding.concurrency = concurrency;
    builder.indexing.tasklimit = 255;
    builder.indexing.threads = indexing_threads;
    return builder;
}

void
expect_shared_threads(uint32_t exp_threads, uint32_t cpu_cores)
{
    auto cfg = SharedThreadingServiceConfig::make(make_proton_config(0.5), HwInfo::Cpu(cpu_cores));
    EXPECT_EQ(exp_threads, cfg.shared_threads());
    EXPECT_EQ(exp_threads * 16, cfg.shared_task_limit());
}

void
expect_field_writer_threads(uint32_t exp_threads, uint32_t cpu_cores, uint32_t indexing_threads = 1)
{
    auto cfg = SharedThreadingServiceConfig::make(make_proton_config(0.5, indexing_threads), HwInfo::Cpu(cpu_cores));
    EXPECT_EQ(exp_threads, cfg.field_writer_threads());
}

TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores_and_feeding_concurrency)
{
    expect_shared_threads(2, 1);
    expect_shared_threads(2, 4);
    expect_shared_threads(3, 5);
    expect_shared_threads(3, 6);
    expect_shared_threads(4, 8);
    expect_shared_threads(5, 9);
    expect_shared_threads(5, 10);
}

TEST(SharedThreadingServiceConfigTest, field_writer_threads_are_derived_from_cpu_cores_and_feeding_concurrency)
{
    expect_field_writer_threads(3, 1);
    expect_field_writer_threads(3, 4);
    expect_field_writer_threads(3, 6);
    expect_field_writer_threads(4, 7);
    expect_field_writer_threads(4, 8);
    expect_field_writer_threads(5, 9);
}

TEST(SharedThreadingServiceConfigTest, field_writer_threads_can_be_overridden_in_proton_config)
{
    expect_field_writer_threads(4, 1, 4);
}

class SharedThreadingServiceTest : public ::testing::Test {
public:
    Transport transport;
    storage::spi::dummy::DummyBucketExecutor bucket_executor;
    std::unique_ptr<SharedThreadingService> service;
    SharedThreadingServiceTest()
        : transport(),
          bucket_executor(2),
          service()
    { }
    ~SharedThreadingServiceTest() = default;
    void setup(double concurrency, uint32_t cpu_cores) {
        service = std::make_unique<SharedThreadingService>(
                SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)),
                transport.transport(), bucket_executor);
    }
    SequencedTaskExecutor* field_writer() {
        return dynamic_cast<SequencedTaskExecutor*>(&service->field_writer());
    }
};

void
assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t exp_task_limit)
{
    EXPECT_EQ(exp_executors, exec->getNumExecutors());
    EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit());
}

TEST_F(SharedThreadingServiceTest, field_writer_can_be_shared_across_all_document_dbs)
{
    setup(0.75, 8);
    EXPECT_TRUE(field_writer());
    EXPECT_EQ(6, field_writer()->getNumExecutors());
    // This is rounded to the nearest power of 2 when using THROUGHPUT feed executor.
    EXPECT_EQ(256, field_writer()->first_executor()->getTaskLimit());
}

GTEST_MAIN_RUN_ALL_TESTS()