summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-07-16 09:13:26 +0000
committerTor Brede Vekterli <vekterli@oath.com>2018-07-16 09:13:26 +0000
commit3834bd9c83fc2b2a81d0509158f0936e698689bc (patch)
treecbe5015a4316cc6d2c431c0546c33ebcb36a43aa /storage
parent6afadc2824b59fec9498672f475b8184b59a5000 (diff)
Add tests of shared/exclusive persistence queue locking
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.cpp29
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.h8
-rw-r--r--storage/src/tests/persistence/filestorage/mergeblockingtest.cpp2
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp12
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp179
5 files changed, 156 insertions, 74 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp
index c92687f798b..835b8ef1044 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.cpp
+++ b/storage/src/tests/persistence/common/filestortestfixture.cpp
@@ -18,19 +18,17 @@ spi::LoadType FileStorTestFixture::defaultLoadType = spi::LoadType(0, "default")
const uint32_t FileStorTestFixture::MSG_WAIT_TIME;
void
-FileStorTestFixture::setupDisks(uint32_t diskCount)
+FileStorTestFixture::setupPersistenceThreads(uint32_t threads)
{
std::string rootOfRoot = "todo-make-unique-filestorefixture";
- _config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot)));
-
- _config2.reset(new vdstestlib::DirConfig(*_config));
- _config2->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config2->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config2->getConfig("stor-server").set("node_index", "1");
-
- _smallConfig.reset(new vdstestlib::DirConfig(*_config));
- _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(1),
- _config->getConfigId()));
+ _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot));
+ _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2"));
+ _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2"));
+ _config->getConfig("stor-server").set("node_index", "1");
+ _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads));
+
+ _node = std::make_unique<TestServiceLayerApp>(
+ DiskCount(1), NodeIndex(1), _config->getConfigId());
_testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1");
}
@@ -38,16 +36,15 @@ FileStorTestFixture::setupDisks(uint32_t diskCount)
void
FileStorTestFixture::setUp()
{
- setupDisks(1);
+ setupPersistenceThreads(1);
_node->setPersistenceProvider(
- spi::PersistenceProvider::UP(
- new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
+ std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1));
}
void
FileStorTestFixture::tearDown()
{
- _node.reset(0);
+ _node.reset();
}
void
@@ -91,7 +88,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents(
}
api::StorageMessageAddress
-FileStorTestFixture::TestFileStorComponents::makeSelfAddress() const {
+FileStorTestFixture::makeSelfAddress() {
return api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 0);
}
diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h
index c8158d01224..c46f9de24fc 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.h
+++ b/storage/src/tests/persistence/common/filestortestfixture.h
@@ -19,8 +19,6 @@ public:
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<vdstestlib::DirConfig> _config;
- std::unique_ptr<vdstestlib::DirConfig> _config2;
- std::unique_ptr<vdstestlib::DirConfig> _smallConfig;
const document::DocumentType* _testdoctype1;
static const uint32_t MSG_WAIT_TIME = 60 * 1000;
@@ -30,10 +28,12 @@ public:
void setUp() override;
void tearDown() override;
- void setupDisks(uint32_t diskCount);
+ void setupPersistenceThreads(uint32_t diskCount);
void createBucket(const document::BucketId& bid);
bool bucketExistsInDb(const document::BucketId& bucket) const;
+ static api::StorageMessageAddress makeSelfAddress();
+
api::ReturnCode::Result resultOf(const api::StorageReply& reply) const {
return reply.getResult().getResult();
}
@@ -99,8 +99,6 @@ public:
const char* testName,
const StorageLinkInjector& i = NoOpStorageLinkInjector());
- api::StorageMessageAddress makeSelfAddress() const;
-
void sendDummyGet(const document::BucketId& bid);
void sendPut(const document::BucketId& bid,
uint32_t docIdx,
diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
index 50999f5883e..d4cec415937 100644
--- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
@@ -16,7 +16,7 @@ class MergeBlockingTest : public FileStorTestFixture
{
public:
void setupDisks() {
- FileStorTestFixture::setupDisks(1);
+ FileStorTestFixture::setupPersistenceThreads(1);
_node->setPersistenceProvider(
spi::PersistenceProvider::UP(
new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 64ef48b5719..e12f48bcdea 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -86,9 +86,9 @@ public:
std::unique_ptr<vespalib::Barrier> _queueBarrier;
std::unique_ptr<vespalib::Barrier> _completionBarrier;
- void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) {
- FileStorTestFixture::setupDisks(diskCount);
- _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount));
+ void setupProviderAndBarriers(uint32_t queueBarrierThreads) {
+ FileStorTestFixture::setupPersistenceThreads(1);
+ _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1));
_queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads));
_completionBarrier.reset(new vespalib::Barrier(2));
_blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier);
@@ -219,7 +219,7 @@ makeAbortCmd(const Container& buckets)
void
OperationAbortingTest::testAbortMessageClearsRelevantQueuedOperations()
{
- setupDisks(1, 2);
+ setupProviderAndBarriers(2);
TestFileStorComponents c(*this, "testAbortMessageClearsRelevantQueuedOperations");
document::BucketId bucket(16, 1);
createBucket(bucket);
@@ -305,7 +305,7 @@ public:
void
OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket()
{
- setupDisks(1, 3);
+ setupProviderAndBarriers(3);
TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket");
document::BucketId bucket(16, 1);
@@ -386,7 +386,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName,
const std::vector<api::StorageMessage::SP>& msgs,
bool shouldCreateBucketInitially)
{
- setupDisks(1, 2);
+ setupProviderAndBarriers(2);
TestFileStorComponents c(*this, testName);
document::BucketId bucket(16, 1);
document::BucketId blockerBucket(16, 2);
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index f31623eed61..746ae770cb3 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -15,86 +15,173 @@ using document::test::makeDocumentBucket;
namespace storage {
-class PersistenceQueueTest : public FileStorTestFixture
-{
+class PersistenceQueueTest : public FileStorTestFixture {
public:
void testFetchNextUnlockedMessageIfBucketLocked();
+ void shared_locked_operations_allow_concurrent_bucket_access();
+ void exclusive_locked_operation_not_started_if_shared_op_active();
+ void shared_locked_operation_not_started_if_exclusive_op_active();
+ void exclusive_locked_operation_not_started_if_exclusive_op_active();
- std::shared_ptr<api::StorageMessage>
- createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const;
void setUp() override;
- void tearDown() override;
CPPUNIT_TEST_SUITE(PersistenceQueueTest);
CPPUNIT_TEST(testFetchNextUnlockedMessageIfBucketLocked);
+ CPPUNIT_TEST(shared_locked_operations_allow_concurrent_bucket_access);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_shared_op_active);
+ CPPUNIT_TEST(shared_locked_operation_not_started_if_exclusive_op_active);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_exclusive_op_active);
CPPUNIT_TEST_SUITE_END();
+
+ struct Fixture {
+ FileStorTestFixture& parent;
+ DummyStorageLink top;
+ std::unique_ptr<DummyStorageLink> dummyManager;
+ ForwardingMessageSender messageSender;
+ documentapi::LoadTypeSet loadTypes;
+ FileStorMetrics metrics;
+ std::unique_ptr<FileStorHandler> filestorHandler;
+ uint32_t stripeId;
+
+ explicit Fixture(FileStorTestFixture& parent);
+ ~Fixture();
+ };
+
+ static constexpr uint16_t _disk = 0;
};
CPPUNIT_TEST_SUITE_REGISTRATION(PersistenceQueueTest);
-void
-PersistenceQueueTest::setUp()
+PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
+ : parent(parent_),
+ top(),
+ dummyManager(std::make_unique<DummyStorageLink>()),
+ messageSender(*dummyManager),
+ loadTypes("raw:"),
+ metrics(loadTypes.getMetricLoadTypes())
{
- setupDisks(1);
- _node->setPersistenceProvider(
- spi::PersistenceProvider::UP(
- new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
+ top.push_back(std::move(dummyManager));
+ top.open();
+
+ metrics.initDiskMetrics(parent._node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
+
+ filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, parent._node->getPartitions(),
+ parent._node->getComponentRegister());
+ // getNextMessage will time out if no unlocked buckets are present. Choose a timeout
+ // that is large enough to fail tests with high probability if this is not the case,
+ // and small enough to not slow down testing too much.
+ filestorHandler->setGetNextMessageTimeout(20);
+
+ stripeId = filestorHandler->getNextStripeId(0);
}
-void
-PersistenceQueueTest::tearDown()
-{
- _node.reset(0);
+PersistenceQueueTest::Fixture::~Fixture() = default;
+
+void PersistenceQueueTest::setUp() {
+ setupPersistenceThreads(1);
+ _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1));
}
-std::shared_ptr<api::StorageMessage>
-PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx)
-{
- std::ostringstream id;
- id << "id:foo:testdoctype1:n=" << bucket << ":" << docIdx;
- document::Document::SP doc(
- _node->getTestDocMan().createDocument("foobar", id.str()));
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234));
- cmd->setAddress(api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 0));
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) {
+ std::shared_ptr<document::Document> doc = _node->getTestDocMan().createDocument(
+ "foobar", vespalib::make_string("id:foo:testdoctype1:n=%zu:%zu", bucket, docIdx));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234);
+ cmd->setAddress(makeSelfAddress());
return cmd;
}
-void
-PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
-{
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createGet(uint64_t bucket) const {
+ auto cmd = std::make_shared<api::GetCommand>(
+ makeDocumentBucket(document::BucketId(16, bucket)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%zu:0", bucket)), "[all]");
+ cmd->setAddress(makeSelfAddress());
+ return cmd;
+}
+void PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() {
+ Fixture f(*this);
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
// then subsequently on the second, skipping the already locked bucket.
// Puts all have same pri, so order is well defined.
- filestorHandler.schedule(createPut(1234, 0), 0);
- filestorHandler.schedule(createPut(1234, 1), 0);
- filestorHandler.schedule(createPut(5432, 0), 0);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 1), _disk);
+ f.filestorHandler->schedule(createPut(5432, 0), _disk);
- auto lock0 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock0.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock1.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
}
+void PersistenceQueueTest::shared_locked_operations_allow_concurrent_bucket_access() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Even though we already have a lock on the bucket, Gets allow shared locking and we
+ // should therefore be able to get another lock.
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock1.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_shared_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::shared_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
} // namespace storage