// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include using vespalib::Executor; using namespace proton; using Task = Executor::Task; using vespalib::makeLambdaTask; namespace { class TestTask : public Task { private: vespalib::CountDownLatch &_latch; public: TestTask(vespalib::CountDownLatch & latch) : _latch(latch) { } void run() override { _latch.countDown(); } }; } template std::unique_ptr make_scheduled_executor(FNET_Transport& transport, vespalib::Executor& executor); template <> std::unique_ptr make_scheduled_executor(FNET_Transport& transport, vespalib::Executor&) { return std::make_unique(transport); } template <> std::unique_ptr make_scheduled_executor(FNET_Transport& transport, vespalib::Executor& executor) { return std::make_unique(transport, executor); } template class ScheduledExecutorTest : public testing::Test { public: FNET_Transport transport; vespalib::ThreadStackExecutor executor; std::unique_ptr timer; ScheduledExecutorTest() : transport(), executor(1) { transport.Start(); timer = make_scheduled_executor(transport, executor); } ~ScheduledExecutorTest() { transport.ShutDown(true); } }; using ScheduledTypes = ::testing::Types; TYPED_TEST_SUITE(ScheduledExecutorTest, ScheduledTypes); TYPED_TEST(ScheduledExecutorTest, test_scheduling) { vespalib::CountDownLatch latch1(3); vespalib::CountDownLatch latch2(2); auto handleA = this->timer->scheduleAtFixedRate(std::make_unique(latch1), 100ms, 200ms); auto handleB = this->timer->scheduleAtFixedRate(std::make_unique(latch2), 500ms, 500ms); EXPECT_TRUE(latch1.await(60s)); EXPECT_TRUE(latch2.await(60s)); } TYPED_TEST(ScheduledExecutorTest, test_drop_handle) { vespalib::CountDownLatch latch1(2); auto handleA = this->timer->scheduleAtFixedRate(std::make_unique(latch1), 2s, 3s); handleA.reset(); EXPECT_TRUE(!latch1.await(3s)); auto handleB = this->timer->scheduleAtFixedRate(std::make_unique(latch1), 200ms, 300ms); EXPECT_TRUE(latch1.await(60s)); } TYPED_TEST(ScheduledExecutorTest, test_only_one_instance_running) { vespalib::TimeBomb time_bomb(60s); vespalib::Gate latch; std::atomic counter = 0; auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms); std::this_thread::sleep_for(2s); EXPECT_EQ(1, counter); latch.countDown(); while (counter <= 10) { std::this_thread::sleep_for(1ms); } EXPECT_GT(counter, 10); } TYPED_TEST(ScheduledExecutorTest, test_sync_delete) { vespalib::TimeBomb time_bomb(60s); vespalib::Gate latch; std::atomic counter = 0; std::atomic reset_counter = 0; std::mutex handleLock; auto handleA = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { counter++; latch.await();}), 0ms, 1ms); auto handleB = this->timer->scheduleAtFixedRate(makeLambdaTask([&]() { std::lock_guard guard(handleLock); handleA.reset(); reset_counter++; }), 0ms, 1ms); while (counter < 1) { std::this_thread::sleep_for(1ms); } EXPECT_EQ(1, counter); EXPECT_EQ(0, reset_counter); latch.countDown(); while (reset_counter <= 10) { std::this_thread::sleep_for(1ms); } EXPECT_EQ(1, counter); std::lock_guard guard(handleLock); EXPECT_EQ(nullptr, handleA.get()); EXPECT_FALSE(nullptr == handleB.get()); } GTEST_MAIN_RUN_ALL_TESTS()