1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/fnet/transport.h>
#include <vespa/searchcore/proton/common/scheduled_forward_executor.h>
#include <vespa/searchcore/proton/common/scheduledexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/count_down_latch.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/testkit/time_bomb.h>
#include <thread>
using vespalib::Executor;
using namespace proton;
using Task = Executor::Task;
using vespalib::makeLambdaTask;
namespace {
class TestTask : public Task {
private:
vespalib::CountDownLatch &_latch;
public:
TestTask(vespalib::CountDownLatch & latch) : _latch(latch) { }
void run() override { _latch.countDown(); }
};
}
template <typename T>
std::unique_ptr<T> make_scheduled_executor(FNET_Transport& transport, vespalib::Executor& executor);
template <>
std::unique_ptr<ScheduledExecutor>
make_scheduled_executor<ScheduledExecutor>(FNET_Transport& transport, vespalib::Executor&) {
return std::make_unique<ScheduledExecutor>(transport);
}
template <>
std::unique_ptr<ScheduledForwardExecutor>
make_scheduled_executor<ScheduledForwardExecutor>(FNET_Transport& transport, vespalib::Executor& executor) {
return std::make_unique<ScheduledForwardExecutor>(transport, executor);
}
template <typename ScheduledT>
class ScheduledExecutorTest : public testing::Test {
public:
FNET_Transport transport;
vespalib::ThreadStackExecutor executor;
std::unique_ptr<ScheduledT> timer;
ScheduledExecutorTest()
: transport(),
executor(1)
{
transport.Start();
timer = make_scheduled_executor<ScheduledT>(transport, executor);
}
~ScheduledExecutorTest() {
transport.ShutDown(true);
}
};
using ScheduledTypes = ::testing::Types<ScheduledExecutor, ScheduledForwardExecutor>;
TYPED_TEST_SUITE(ScheduledExecutorTest, ScheduledTypes);
TYPED_TEST(ScheduledExecutorTest, test_scheduling) {
vespalib::CountDownLatch latch1(3);
vespalib::CountDownLatch latch2(2);
auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms);
auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms);
EXPECT_TRUE(latch1.await(60s));
EXPECT_TRUE(latch2.await(60s));
}
TYPED_TEST(ScheduledExecutorTest, test_drop_handle) {
vespalib::CountDownLatch latch1(2);
auto handleA = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s);
handleA.reset();
EXPECT_TRUE(!latch1.await(3s));
auto handleB = this->timer->scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms);
EXPECT_TRUE(latch1.await(60s));
}
TYPED_TEST(ScheduledExecutorTest, test_only_one_instance_running) {
vespalib::TimeBomb time_bomb(120s);
vespalib::Gate latch;
std::atomic<uint64_t> counter = 0;
auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms);
std::this_thread::sleep_for(2s);
EXPECT_EQ(1, counter);
latch.countDown();
while (counter <= 10) { std::this_thread::sleep_for(1ms); }
EXPECT_GT(counter, 10);
}
TYPED_TEST(ScheduledExecutorTest, test_sync_delete) {
vespalib::TimeBomb time_bomb(120s);
vespalib::Gate latch;
std::atomic<uint64_t> counter = 0;
std::atomic<uint64_t> reset_counter = 0;
std::mutex handleLock;
auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms);
auto handleB = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() {
std::lock_guard guard(handleLock);
handleA.reset();
reset_counter++;
}), 0ms, 1ms);
while (counter < 1) { std::this_thread::sleep_for(1ms); }
EXPECT_EQ(1, counter);
EXPECT_EQ(0, reset_counter);
latch.countDown();
while (reset_counter <= 10) { std::this_thread::sleep_for(1ms); }
EXPECT_EQ(1, counter);
std::lock_guard guard(handleLock);
EXPECT_EQ(nullptr, handleA.get());
EXPECT_FALSE(nullptr == handleB.get());
}
GTEST_MAIN_RUN_ALL_TESTS()
|