aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
blob: 0f1c6d3a083c85ab83607f08156fa64859169243 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/shared_operation_throttler.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/barrier.h>
#include <thread>

using vespalib::steady_clock;

namespace vespalib {

using ThrottleToken = SharedOperationThrottler::Token;

struct DynamicThrottleFixture {
    std::unique_ptr<SharedOperationThrottler> _throttler;

    DynamicThrottleFixture() {
        SharedOperationThrottler::DynamicThrottleParams params;
        params.window_size_increment = 1;
        params.min_window_size = 1;
        _throttler = SharedOperationThrottler::make_dynamic_throttler(params);
    }
};

TEST("unlimited throttler does not throttle") {
    // We technically can't test that the unlimited throttler _never_ throttles, but at
    // least check that it doesn't throttle _twice_, and then induce from this ;)
    auto throttler = SharedOperationThrottler::make_unlimited_throttler();
    auto token1 = throttler->try_acquire_one();
    EXPECT_TRUE(token1.valid());
    auto token2 = throttler->blocking_acquire_one();
    EXPECT_TRUE(token2.valid());
    // Window size should be zero (i.e. unlimited) for unlimited throttler
    EXPECT_EQUAL(throttler->current_window_size(), 0u);
    // But we still track the active token count
    EXPECT_EQUAL(throttler->current_active_token_count(), 2u);
}

TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) {
    auto token1 = f1._throttler->try_acquire_one();
    EXPECT_TRUE(token1.valid());
    auto token2 = f1._throttler->try_acquire_one();
    EXPECT_FALSE(token2.valid());

    EXPECT_EQUAL(f1._throttler->current_window_size(), 1u);
    EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u);
}

TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) {
    auto token = f1._throttler->blocking_acquire_one();
    EXPECT_TRUE(token.valid());
    token.reset();
    token = f1._throttler->blocking_acquire_one(steady_clock::now() + 600s); // Should never block.
    EXPECT_TRUE(token.valid());
}

TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixture()) {
    vespalib::Barrier barrier(2);
    std::thread t([&] {
        auto token = f1._throttler->try_acquire_one();
        assert(token.valid());
        barrier.await();
        while (f1._throttler->waiting_threads() != 1) {
            std::this_thread::sleep_for(100us);
        }
        // Implicit token release at thread scope exit
    });
    barrier.await();
    auto token = f1._throttler->blocking_acquire_one();
    EXPECT_TRUE(token.valid());
    t.join();
}

TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) {
    auto window_filling_token = f1._throttler->try_acquire_one();
    auto before = steady_clock::now();
    // Will block for at least 1ms. Since no window slot will be available by that time,
    // an invalid token should be returned.
    auto token = f1._throttler->blocking_acquire_one(before + 1ms);
    auto after = steady_clock::now();
    EXPECT_TRUE((after - before) >= 1ms);
    EXPECT_FALSE(token.valid());
}

TEST("default constructed token is invalid") {
    ThrottleToken token;
    EXPECT_FALSE(token.valid());
    token.reset(); // no-op
    EXPECT_FALSE(token.valid());
}

TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture()) {
    {
        auto token = f1._throttler->try_acquire_one();
        EXPECT_TRUE(token.valid());
        EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u);
    }
    EXPECT_EQUAL(f1._throttler->current_active_token_count(), 0u);

    auto token = f1._throttler->try_acquire_one();
    EXPECT_TRUE(token.valid());
    EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u);
}

TEST_F("token can be moved and reset", DynamicThrottleFixture()) {
    auto token1 = f1._throttler->try_acquire_one();
    auto token2 = std::move(token1); // move ctor
    EXPECT_TRUE(token2.valid());
    EXPECT_FALSE(token1.valid());
    ThrottleToken token3;
    token3 = std::move(token2); // move assignment op
    EXPECT_TRUE(token3.valid());
    EXPECT_FALSE(token2.valid());

    // Trying to fetch new token should not succeed due to active token and win size of 1
    token1 = f1._throttler->try_acquire_one();
    EXPECT_FALSE(token1.valid());
    // Resetting the token should free up the slot in the window
    token3.reset();
    token1 = f1._throttler->try_acquire_one();
    EXPECT_TRUE(token1.valid());
}

// Note on test semantics: these tests are adapted from a subset of the MessageBus
// throttling tests. Some tests have been simplified due to no longer having access
// to the low-level DynamicThrottlePolicy API.

struct WindowFixture {
    uint64_t _milli_time;
    std::unique_ptr<SharedOperationThrottler> _throttler;

    WindowFixture(uint32_t window_size_increment = 5,
                  uint32_t min_window_size = 20,
                  uint32_t max_window_size = INT_MAX)
        : _milli_time(0),
          _throttler()
    {
        SharedOperationThrottler::DynamicThrottleParams params;
        params.resize_rate = 1;
        params.window_size_increment = window_size_increment;
        params.min_window_size = min_window_size;
        params.max_window_size = max_window_size;
        params.window_size_decrement_factor = 2;
        params.window_size_backoff = 0.9;
        _throttler = SharedOperationThrottler::make_dynamic_throttler(params, [&]() noexcept {
            return steady_time(std::chrono::milliseconds(_milli_time));
        });
    }

    std::vector<SharedOperationThrottler::Token> fill_entire_throttle_window() {
        std::vector<SharedOperationThrottler::Token> tokens;
        while (true) {
            auto token = _throttler->try_acquire_one();
            if (!token.valid()) {
                break;
            }
            tokens.emplace_back(std::move(token));
        }
        return tokens;
    }

    uint32_t attempt_converge_on_stable_window_size(uint32_t max_pending) {
        for (uint32_t i = 0; i < 999; ++i) {
            auto tokens = fill_entire_throttle_window();
            uint32_t num_pending = static_cast<uint32_t>(tokens.size());

            uint64_t trip_time = (num_pending < max_pending) ? 1000 : 1000 + (num_pending - max_pending) * 1000;
            _milli_time += trip_time;
            // Throttle window slots implicitly freed up as tokens are destructed.
        }
        uint32_t ret = _throttler->current_window_size();
        fprintf(stderr, "attempt_converge_on_stable_window_size() = %u\n", ret);
        return ret;
    }
};

TEST_F("window size changes dynamically based on throughput", WindowFixture()) {
    uint32_t window_size = f1.attempt_converge_on_stable_window_size(100);
    ASSERT_TRUE(window_size >= 90 && window_size <= 105);

    window_size = f1.attempt_converge_on_stable_window_size(200);
    ASSERT_TRUE(window_size >= 180 && window_size <= 205);

    window_size = f1.attempt_converge_on_stable_window_size(50);
    ASSERT_TRUE(window_size >= 45 && window_size <= 55);

    window_size = f1.attempt_converge_on_stable_window_size(500);
    ASSERT_TRUE(window_size >= 450 && window_size <= 505);

    window_size = f1.attempt_converge_on_stable_window_size(100);
    ASSERT_TRUE(window_size >= 90 && window_size <= 115);
}

TEST_F("window size is reset after idle time period", WindowFixture(5, 1)) {
    double window_size = f1.attempt_converge_on_stable_window_size(100);
    ASSERT_TRUE(window_size >= 90 && window_size <= 110);

    f1._milli_time += 30001; // Not yet past 60s idle time
    auto tokens = f1.fill_entire_throttle_window();
    ASSERT_TRUE(tokens.size() >= 90 && tokens.size() <= 110);
    tokens.clear();

    f1._milli_time += 60001; // Idle time passed
    tokens = f1.fill_entire_throttle_window();
    EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size
}

TEST_F("minimum window size is respected", WindowFixture(5, 150, INT_MAX)) {
    double window_size = f1.attempt_converge_on_stable_window_size(200);
    ASSERT_TRUE(window_size >= 150 && window_size <= 210);
}

TEST_F("maximum window size is respected", WindowFixture(5, 1, 50)) {
    double window_size = f1.attempt_converge_on_stable_window_size(100);
    ASSERT_TRUE(window_size >= 40 && window_size <= 50);
}

}

TEST_MAIN() {
    TEST_RUN_ALL();
}