diff options
Diffstat (limited to 'storage/src/tests/persistence')
5 files changed, 156 insertions, 39 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index 7b165e11b66..fb8120210c1 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -12,6 +12,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST persistencethread_splittest.cpp processalltest.cpp provider_error_wrapper_test.cpp + shared_operation_throttler_test.cpp splitbitdetectortest.cpp testandsettest.cpp gtest_runner.cpp diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp index a5dd3d929db..8caa84977ce 100644 --- a/storage/src/tests/persistence/active_operations_stats_test.cpp +++ b/storage/src/tests/persistence/active_operations_stats_test.cpp @@ -96,17 +96,17 @@ ActiveOperationsStatsTest::test_active_operations_stats() auto lock0 = filestorHandler->getNextMessage(stripeId); auto lock1 = filestorHandler->getNextMessage(stripeId); auto lock2 = filestorHandler->getNextMessage(stripeId); - ASSERT_TRUE(lock0.first); - ASSERT_TRUE(lock1.first); - ASSERT_FALSE(lock2.first); + ASSERT_TRUE(lock0.lock); + ASSERT_TRUE(lock1.lock); + ASSERT_FALSE(lock2.lock); auto stats = filestorHandler->get_active_operations_stats(false); { SCOPED_TRACE("during"); assert_active_operations_stats(stats, 2, 2, 0); } EXPECT_EQ(3, stats.get_total_size()); - lock0.first.reset(); - lock1.first.reset(); + lock0.lock.reset(); + lock1.lock.reset(); stats = filestorHandler->get_active_operations_stats(false); { SCOPED_TRACE("after"); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index cd496605a6c..939e7ae7b6a 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } class MessagePusherThread : public document::Runnable { @@ -570,7 +570,7 @@ public: void run() override { while (!_done) { FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId); - if (msg.second.get()) { + if (msg.msg.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; std::this_thread::sleep_for(5ms); @@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr); + ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr); } - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } TEST_F(FileStorManagerTest, remap_split) { @@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) { std::this_thread::sleep_for(51ms); for (;;) { auto lock = filestorHandler.getNextMessage(stripeId); - if (lock.first.get()) { - ASSERT_EQ(200, lock.second->getPriority()); + if (lock.lock.get()) { + ASSERT_EQ(200, lock.msg->getPriority()); break; } } @@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri, { EXPECT_TRUE(result.was_scheduled()); ASSERT_TRUE(result.has_async_message()); - EXPECT_EQ(exp_pri, result.async_message().second->getPriority()); + EXPECT_EQ(exp_pri, result.async_message().msg->getPriority()); } void @@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule) auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_async_message(20, result); } - EXPECT_EQ(30, get_next_message().second->getPriority()); - EXPECT_EQ(40, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); + EXPECT_EQ(40, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async) @@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_empty_async_message(result); - EXPECT_EQ(20, get_next_message().second->getPriority()); - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(20, get_next_message().msg->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) @@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) expect_async_message(40, result); } } - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } } // storage diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 2557fa537f5..5f3836727dd 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -91,14 +91,14 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) { f.filestorHandler->schedule(createPut(5432, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); + ASSERT_TRUE(lock0.lock.get()); EXPECT_EQ(document::BucketId(16, 1234), - dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); + dynamic_cast<api::PutCommand&>(*lock0.msg).getBucketId()); auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock1.first.get()); + ASSERT_TRUE(lock1.lock.get()); EXPECT_EQ(document::BucketId(16, 5432), - dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); + dynamic_cast<api::PutCommand&>(*lock1.msg).getBucketId()); } TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) { @@ -108,14 +108,14 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac f.filestorHandler->schedule(createGet(1234)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements()); // Even though we already have a lock on the bucket, Gets allow shared locking and we // should therefore be able to get another lock. auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock1.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements()); + ASSERT_TRUE(lock1.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock1.lock->lockingRequirements()); } TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) { @@ -125,12 +125,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op f.filestorHandler->schedule(createPut(1234, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) { @@ -140,12 +140,12 @@ TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op f.filestorHandler->schedule(createGet(1234)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) { @@ -155,12 +155,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive f.filestorHandler->schedule(createPut(1234, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } } // namespace storage diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp new file mode 100644 index 00000000000..0ad380937c7 --- /dev/null +++ b/storage/src/tests/persistence/shared_operation_throttler_test.cpp @@ -0,0 +1,116 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/storage/persistence/shared_operation_throttler.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/barrier.h> +#include <chrono> +#include <thread> + +using namespace ::testing; + +namespace storage { + +using ThrottleToken = SharedOperationThrottler::Token; + +TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) { + // We technically can't test that the unlimited throttler _never_ throttles, but at + // least check that it doesn't throttle _twice_, and then induce from this ;) + auto throttler = SharedOperationThrottler::make_unlimited_throttler(); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->blocking_acquire_one(); + EXPECT_TRUE(token2.valid()); + // Window size should be zero (i.e. unlimited) for unlimited throttler + EXPECT_EQ(throttler->current_window_size(), 0); +} + +TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->try_acquire_one(); + EXPECT_FALSE(token2.valid()); + + EXPECT_EQ(throttler->current_window_size(), 1); +} + +TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token = throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + token.reset(); + token = throttler->blocking_acquire_one(600s); // Should never block. + EXPECT_TRUE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + vespalib::Barrier barrier(2); + std::thread t([&] { + auto token = throttler->try_acquire_one(); + assert(token.valid()); + barrier.await(); + while (throttler->waiting_threads() != 1) { + std::this_thread::sleep_for(100us); + } + // Implicit token release at thread scope exit + }); + barrier.await(); + auto token = throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + t.join(); +} + +TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto window_filling_token = throttler->try_acquire_one(); + auto before = std::chrono::steady_clock::now(); + // Will block for at least 1ms. Since no window slot will be available by that time, + // an invalid token should be returned. + auto token = throttler->blocking_acquire_one(1ms); + auto after = std::chrono::steady_clock::now(); + EXPECT_TRUE((after - before) >= 1ms); + EXPECT_FALSE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) { + ThrottleToken token; + EXPECT_FALSE(token.valid()); + token.reset(); // no-op + EXPECT_FALSE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + { + auto token = throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); + } + auto token = throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token1 = throttler->try_acquire_one(); + auto token2 = std::move(token1); // move ctor + EXPECT_TRUE(token2.valid()); + EXPECT_FALSE(token1.valid()); + ThrottleToken token3; + token3 = std::move(token2); // move assignment op + EXPECT_TRUE(token3.valid()); + EXPECT_FALSE(token2.valid()); + + // Trying to fetch new token should not succeed due to active token and win size of 1 + token1 = throttler->try_acquire_one(); + EXPECT_FALSE(token1.valid()); + // Resetting the token should free up the slot in the window + token3.reset(); + token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); +} + +// TODO ideally we'd test that the dynamic throttler has a window size that is actually +// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so +// it's not trivial to know how to do this reliably. + +} |