diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-06 11:49:57 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-10 14:35:01 +0000 |
commit | 3f15206f090ce36c08282b8d64d6ff9c15c8fd69 (patch) | |
tree | edc117087a4fac59dc6f507e5c88f305845839ca /storage/src/tests/persistence/filestorage/filestormanagertest.cpp | |
parent | 6fc8a76d2062e6176510804be59faffe2e20662d (diff) |
Support dynamic throttling of async persistence operations
Adds an operation throttler that is intended to provide global throttling
of async operations across all persistence stripe threads. A throttler
wraps a logical max pending window size of in-flight operations. Depending
on the throttler implementation, the window size may expand and shrink
dynamically. Exactly how and when this happens is unspecified.
Commit adds two throttler implementations:
* An unlimited throttler that is no-op and never blocks.
* A throttler built around the mbus `DynamicThrottlePolicy` and defers
all window decisions to it.
Current config default is to use the unlimited throttler. Config changes
require a process restart.
Offers both polling and (timed, non-timed) blocking calls for acquiring
a throttle token. If the returned token is valid, the caller may proceed
to invoke the asynchronous operation.
The window slot taken up by a valid throttle token is implicitly freed up
when the token is destroyed.
Diffstat (limited to 'storage/src/tests/persistence/filestorage/filestormanagertest.cpp')
-rw-r--r-- | storage/src/tests/persistence/filestorage/filestormanagertest.cpp | 34 |
1 files changed, 17 insertions, 17 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index cd496605a6c..939e7ae7b6a 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } class MessagePusherThread : public document::Runnable { @@ -570,7 +570,7 @@ public: void run() override { while (!_done) { FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId); - if (msg.second.get()) { + if (msg.msg.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; std::this_thread::sleep_for(5ms); @@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr); + ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr); } - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } TEST_F(FileStorManagerTest, remap_split) { @@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) { std::this_thread::sleep_for(51ms); for (;;) { auto lock = filestorHandler.getNextMessage(stripeId); - if (lock.first.get()) { - ASSERT_EQ(200, lock.second->getPriority()); + if (lock.lock.get()) { + ASSERT_EQ(200, lock.msg->getPriority()); break; } } @@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri, { EXPECT_TRUE(result.was_scheduled()); ASSERT_TRUE(result.has_async_message()); - EXPECT_EQ(exp_pri, result.async_message().second->getPriority()); + EXPECT_EQ(exp_pri, result.async_message().msg->getPriority()); } void @@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule) auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_async_message(20, result); } - EXPECT_EQ(30, get_next_message().second->getPriority()); - EXPECT_EQ(40, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); + EXPECT_EQ(40, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async) @@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_empty_async_message(result); - EXPECT_EQ(20, get_next_message().second->getPriority()); - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(20, get_next_message().msg->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) @@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) expect_async_message(40, result); } } - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } } // storage |