aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-06 11:49:57 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-10 14:35:01 +0000
commit3f15206f090ce36c08282b8d64d6ff9c15c8fd69 (patch)
treeedc117087a4fac59dc6f507e5c88f305845839ca /storage/src/tests/persistence/filestorage/filestormanagertest.cpp
parent6fc8a76d2062e6176510804be59faffe2e20662d (diff)
Support dynamic throttling of async persistence operations
Adds an operation throttler that is intended to provide global throttling of async operations across all persistence stripe threads. A throttler wraps a logical max pending window size of in-flight operations. Depending on the throttler implementation, the window size may expand and shrink dynamically. Exactly how and when this happens is unspecified. Commit adds two throttler implementations: * An unlimited throttler that is no-op and never blocks. * A throttler built around the mbus `DynamicThrottlePolicy` and defers all window decisions to it. Current config default is to use the unlimited throttler. Config changes require a process restart. Offers both polling and (timed, non-timed) blocking calls for acquiring a throttle token. If the returned token is valid, the caller may proceed to invoke the asynchronous operation. The window slot taken up by a valid throttle token is implicitly freed up when the token is destroyed.
Diffstat (limited to 'storage/src/tests/persistence/filestorage/filestormanagertest.cpp')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp34
1 files changed, 17 insertions, 17 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index cd496605a6c..939e7ae7b6a 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
class MessagePusherThread : public document::Runnable {
@@ -570,7 +570,7 @@ public:
void run() override {
while (!_done) {
FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
- if (msg.second.get()) {
+ if (msg.msg.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
std::this_thread::sleep_for(5ms);
@@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr);
+ ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr);
}
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
TEST_F(FileStorManagerTest, remap_split) {
@@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) {
std::this_thread::sleep_for(51ms);
for (;;) {
auto lock = filestorHandler.getNextMessage(stripeId);
- if (lock.first.get()) {
- ASSERT_EQ(200, lock.second->getPriority());
+ if (lock.lock.get()) {
+ ASSERT_EQ(200, lock.msg->getPriority());
break;
}
}
@@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri,
{
EXPECT_TRUE(result.was_scheduled());
ASSERT_TRUE(result.has_async_message());
- EXPECT_EQ(exp_pri, result.async_message().second->getPriority());
+ EXPECT_EQ(exp_pri, result.async_message().msg->getPriority());
}
void
@@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule)
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_async_message(20, result);
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
- EXPECT_EQ(40, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
+ EXPECT_EQ(40, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async)
@@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_empty_async_message(result);
- EXPECT_EQ(20, get_next_message().second->getPriority());
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(20, get_next_message().msg->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
@@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
expect_async_message(40, result);
}
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
} // storage