summaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-10-26 09:57:17 +0000
committerGeir Storli <geirst@verizonmedia.com>2020-10-26 09:57:17 +0000
commitf41815b4118b3e98dce00a5a94be94334fe6e29c (patch)
tree821038e740a38a7917f65cd50d5b8ebf7f948e22 /storage/src/tests
parentae99f6bd8f0c28ae24fd814c9ee164de86b649b8 (diff)
Add support for async message handling when scheduling storage messages in FileStorManager.
When turned on, the calling thread (e.g. FNET network thread when using Storage API RPC) gets the next async message to handle (if any) as part of scheduling a storage message. This async message is then handled by the calling thread immediately, instead of going via a persistence thread.
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp159
1 files changed, 142 insertions, 17 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 9142a03ab85..9de47b4a8a9 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -60,9 +60,17 @@ spi::LoadType defaultLoadType(0, "default");
struct TestFileStorComponents;
+document::Bucket
+make_bucket_for_doc(const document::DocumentId& docid)
+{
+ document::BucketIdFactory factory;
+ document::BucketId bucket_id(16, factory.getBucketId(docid).getRawId());
+ return makeDocumentBucket(bucket_id);
+}
+
}
-struct FileStorManagerTest : Test{
+struct FileStorTestBase : Test {
enum {LONG_WAITTIME=60};
unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<vdstestlib::DirConfig> config;
@@ -71,13 +79,13 @@ struct FileStorManagerTest : Test{
const uint32_t _waitTime;
const document::DocumentType* _testdoctype1;
- FileStorManagerTest() : _node(), _waitTime(LONG_WAITTIME) {}
+ FileStorTestBase() : _node(), _waitTime(LONG_WAITTIME) {}
+ ~FileStorTestBase();
void SetUp() override;
void TearDown() override;
- void createBucket(document::BucketId bid, uint16_t disk)
- {
+ void createBucket(document::BucketId bid, uint16_t disk) {
spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
assert(disk == 0u);
_node->getPersistenceProvider().createBucket(makeSpiBucket(bid), context);
@@ -88,11 +96,29 @@ struct FileStorManagerTest : Test{
entry.write();
}
- document::Document::UP createDocument(const std::string& content, const std::string& id)
- {
+ document::Document::UP createDocument(const std::string& content, const std::string& id) {
return _node->getTestDocMan().createDocument(content, id);
}
+ std::shared_ptr<api::PutCommand> make_put_command(StorageMessage::Priority pri = 20,
+ const std::string& docid = "id:foo:testdoctype1::bar",
+ Timestamp timestamp = 100) {
+ Document::SP doc(createDocument("my content", docid).release());
+ auto bucket = make_bucket_for_doc(doc->getId());
+ auto cmd = std::make_shared<api::PutCommand>(bucket, std::move(doc), timestamp);
+ cmd->setPriority(pri);
+ return cmd;
+ }
+
+ std::shared_ptr<api::GetCommand> make_get_command(StorageMessage::Priority pri,
+ const std::string& docid = "id:foo:testdoctype1::bar") {
+ document::DocumentId did(docid);
+ auto bucket = make_bucket_for_doc(did);
+ auto cmd = std::make_shared<api::GetCommand>(bucket, did, document::AllFields::NAME);
+ cmd->setPriority(pri);
+ return cmd;
+ }
+
bool ownsBucket(uint16_t distributorIndex,
const document::BucketId& bucket) const
{
@@ -163,10 +189,12 @@ struct FileStorManagerTest : Test{
const Metric& metric);
auto& thread_metrics_of(FileStorManager& manager) {
- return manager._metrics->disk->threads[0];
+ return manager.get_metrics().disk->threads[0];
}
};
+FileStorTestBase::~FileStorTestBase() = default;
+
std::string findFile(const std::string& path, const std::string& file) {
FastOS_DirectoryScan dirScan(path.c_str());
while (dirScan.ReadNext()) {
@@ -207,7 +235,7 @@ struct TestFileStorComponents {
DummyStorageLink top;
FileStorManager* manager;
- explicit TestFileStorComponents(FileStorManagerTest& test,
+ explicit TestFileStorComponents(FileStorTestBase& test,
bool use_small_config = false)
: manager(new FileStorManager((use_small_config ? test.smallConfig : test.config)->getConfigId(),
test._node->getPersistenceProvider(),
@@ -227,7 +255,7 @@ struct FileStorHandlerComponents {
FileStorMetrics metrics;
std::unique_ptr<FileStorHandler> filestorHandler;
- FileStorHandlerComponents(FileStorManagerTest& test, uint32_t threadsPerDisk = 1)
+ FileStorHandlerComponents(FileStorTestBase& test, uint32_t threadsPerDisk = 1)
: top(),
dummyManager(new DummyStorageLink),
messageSender(*dummyManager),
@@ -253,7 +281,7 @@ struct PersistenceHandlerComponents : public FileStorHandlerComponents {
BucketOwnershipNotifier bucketOwnershipNotifier;
std::unique_ptr<PersistenceHandler> persistenceHandler;
- PersistenceHandlerComponents(FileStorManagerTest& test)
+ PersistenceHandlerComponents(FileStorTestBase& test)
: FileStorHandlerComponents(test),
component(test._node->getComponentRegister(), "test"),
bucketOwnershipNotifier(component, messageSender),
@@ -277,17 +305,21 @@ PersistenceHandlerComponents::~PersistenceHandlerComponents() = default;
}
void
-FileStorManagerTest::SetUp()
+FileStorTestBase::SetUp()
{
setupDisks();
}
void
-FileStorManagerTest::TearDown()
+FileStorTestBase::TearDown()
{
_node.reset(0);
}
+struct FileStorManagerTest : public FileStorTestBase {
+
+};
+
TEST_F(FileStorManagerTest, header_only_put) {
TestFileStorComponents c(*this);
auto& top = c.top;
@@ -947,10 +979,10 @@ TEST_F(FileStorManagerTest, split_single_group) {
}
void
-FileStorManagerTest::putDoc(DummyStorageLink& top,
- FileStorHandler& filestorHandler,
- const document::BucketId& target,
- uint32_t docNum)
+FileStorTestBase::putDoc(DummyStorageLink& top,
+ FileStorHandler& filestorHandler,
+ const document::BucketId& target,
+ uint32_t docNum)
{
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
spi::Context context(defaultLoadType, spi::Priority(0),
@@ -1838,7 +1870,7 @@ TEST_F(FileStorManagerTest, create_bucket_sets_active_flag_in_database_and_reply
}
template <typename Metric>
-void FileStorManagerTest::assert_request_size_set(TestFileStorComponents& c, std::shared_ptr<api::StorageMessage> cmd, const Metric& metric) {
+void FileStorTestBase::assert_request_size_set(TestFileStorComponents& c, std::shared_ptr<api::StorageMessage> cmd, const Metric& metric) {
api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3);
cmd->setApproxByteSize(54321);
cmd->setAddress(address);
@@ -1965,4 +1997,97 @@ TEST_F(FileStorManagerTest, bucket_db_is_populated_from_provider_when_initialize
EXPECT_EQ(reported_state->getState(), lib::State::UP);
}
+struct FileStorHandlerTest : public FileStorTestBase {
+ std::unique_ptr<FileStorHandlerComponents> c;
+ FileStorHandler* handler;
+ FileStorHandlerTest()
+ : FileStorTestBase(),
+ c(),
+ handler()
+ {}
+ void SetUp() override {
+ FileStorTestBase::SetUp();
+ c = std::make_unique<FileStorHandlerComponents>(*this);
+ handler = c->filestorHandler.get();
+ }
+ FileStorHandler::LockedMessage get_next_message() {
+ return handler->getNextMessage(0);
+ }
+};
+
+void
+expect_async_message(StorageMessage::Priority exp_pri,
+ const FileStorHandler::ScheduleAsyncResult& result)
+{
+ EXPECT_TRUE(result.was_scheduled());
+ ASSERT_TRUE(result.has_async_message());
+ EXPECT_EQ(exp_pri, result.async_message().second->getPriority());
+}
+
+void
+expect_empty_async_message(const FileStorHandler::ScheduleAsyncResult& result)
+{
+ EXPECT_TRUE(result.was_scheduled());
+ EXPECT_FALSE(result.has_async_message());
+}
+
+TEST_F(FileStorHandlerTest, message_not_scheduled_if_handler_is_closed)
+{
+ handler->setDiskState(FileStorHandler::DiskState::CLOSED);
+ auto result = handler->schedule_and_get_next_async_message(make_put_command());
+ EXPECT_FALSE(result.was_scheduled());
+}
+
+TEST_F(FileStorHandlerTest, no_async_message_returned_if_handler_is_paused)
+{
+ auto guard = handler->pause();
+ auto result = handler->schedule_and_get_next_async_message(make_put_command());
+ expect_empty_async_message(result);
+}
+
+TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule)
+{
+ handler->schedule(make_put_command(20));
+ handler->schedule(make_put_command(40));
+ {
+ auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
+ expect_async_message(20, result);
+ }
+ EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(40, get_next_message().second->getPriority());
+}
+
+TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async)
+{
+ // GET is not an async message.
+ handler->schedule(make_get_command(20));
+
+ auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
+ expect_empty_async_message(result);
+
+ EXPECT_EQ(20, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().second->getPriority());
+}
+
+TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
+{
+ std::string docid_a = "id:foo:testdoctype1::a";
+ std::string docid_b = "id:foo:testdoctype1::b";
+ handler->schedule(make_put_command(20, docid_a));
+ {
+ auto locked_msg = get_next_message();
+ {
+ // Bucket for docid_a is locked and put command for same bucket is inhibited.
+ auto result = handler->schedule_and_get_next_async_message(make_put_command(30, docid_a));
+ expect_empty_async_message(result);
+ }
+ {
+ // Put command for another bucket is ok.
+ auto result = handler->schedule_and_get_next_async_message(make_put_command(40, docid_b));
+ expect_async_message(40, result);
+ }
+ }
+ EXPECT_EQ(30, get_next_message().second->getPriority());
+}
+
} // storage