aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/persistence
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/persistence')
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/active_operations_stats_test.cpp10
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp34
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp34
-rw-r--r--storage/src/tests/persistence/shared_operation_throttler_test.cpp116
5 files changed, 156 insertions, 39 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index 7b165e11b66..fb8120210c1 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -12,6 +12,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST
persistencethread_splittest.cpp
processalltest.cpp
provider_error_wrapper_test.cpp
+ shared_operation_throttler_test.cpp
splitbitdetectortest.cpp
testandsettest.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp
index a5dd3d929db..8caa84977ce 100644
--- a/storage/src/tests/persistence/active_operations_stats_test.cpp
+++ b/storage/src/tests/persistence/active_operations_stats_test.cpp
@@ -96,17 +96,17 @@ ActiveOperationsStatsTest::test_active_operations_stats()
auto lock0 = filestorHandler->getNextMessage(stripeId);
auto lock1 = filestorHandler->getNextMessage(stripeId);
auto lock2 = filestorHandler->getNextMessage(stripeId);
- ASSERT_TRUE(lock0.first);
- ASSERT_TRUE(lock1.first);
- ASSERT_FALSE(lock2.first);
+ ASSERT_TRUE(lock0.lock);
+ ASSERT_TRUE(lock1.lock);
+ ASSERT_FALSE(lock2.lock);
auto stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("during");
assert_active_operations_stats(stats, 2, 2, 0);
}
EXPECT_EQ(3, stats.get_total_size());
- lock0.first.reset();
- lock1.first.reset();
+ lock0.lock.reset();
+ lock1.lock.reset();
stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("after");
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index cd496605a6c..939e7ae7b6a 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
class MessagePusherThread : public document::Runnable {
@@ -570,7 +570,7 @@ public:
void run() override {
while (!_done) {
FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
- if (msg.second.get()) {
+ if (msg.msg.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
std::this_thread::sleep_for(5ms);
@@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr);
+ ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr);
}
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
TEST_F(FileStorManagerTest, remap_split) {
@@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) {
std::this_thread::sleep_for(51ms);
for (;;) {
auto lock = filestorHandler.getNextMessage(stripeId);
- if (lock.first.get()) {
- ASSERT_EQ(200, lock.second->getPriority());
+ if (lock.lock.get()) {
+ ASSERT_EQ(200, lock.msg->getPriority());
break;
}
}
@@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri,
{
EXPECT_TRUE(result.was_scheduled());
ASSERT_TRUE(result.has_async_message());
- EXPECT_EQ(exp_pri, result.async_message().second->getPriority());
+ EXPECT_EQ(exp_pri, result.async_message().msg->getPriority());
}
void
@@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule)
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_async_message(20, result);
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
- EXPECT_EQ(40, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
+ EXPECT_EQ(40, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async)
@@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_empty_async_message(result);
- EXPECT_EQ(20, get_next_message().second->getPriority());
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(20, get_next_message().msg->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
@@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
expect_async_message(40, result);
}
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
} // storage
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 2557fa537f5..5f3836727dd 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -91,14 +91,14 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) {
f.filestorHandler->schedule(createPut(5432, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
+ ASSERT_TRUE(lock0.lock.get());
EXPECT_EQ(document::BucketId(16, 1234),
- dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock0.msg).getBucketId());
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
+ ASSERT_TRUE(lock1.lock.get());
EXPECT_EQ(document::BucketId(16, 5432),
- dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock1.msg).getBucketId());
}
TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) {
@@ -108,14 +108,14 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Even though we already have a lock on the bucket, Gets allow shared locking and we
// should therefore be able to get another lock.
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+ ASSERT_TRUE(lock1.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock1.lock->lockingRequirements());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) {
@@ -125,12 +125,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) {
@@ -140,12 +140,12 @@ TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) {
@@ -155,12 +155,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
new file mode 100644
index 00000000000..0ad380937c7
--- /dev/null
+++ b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
@@ -0,0 +1,116 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/persistence/shared_operation_throttler.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <chrono>
+#include <thread>
+
+using namespace ::testing;
+
+namespace storage {
+
+using ThrottleToken = SharedOperationThrottler::Token;
+
+TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQ(throttler->current_window_size(), 0);
+}
+
+TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQ(throttler->current_window_size(), 1);
+}
+
+TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto window_filling_token = throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ {
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
+// TODO ideally we'd test that the dynamic throttler has a window size that is actually
+// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so
+// it's not trivial to know how to do this reliably.
+
+}