summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.cpp29
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.h8
-rw-r--r--storage/src/tests/persistence/filestorage/mergeblockingtest.cpp2
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp12
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp196
-rw-r--r--storage/src/tests/visiting/visitortest.cpp41
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp138
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h47
-rw-r--r--storage/src/vespa/storage/persistence/messages.h6
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp7
-rw-r--r--storage/src/vespa/storage/visiting/stor-visitor.def1
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp7
14 files changed, 343 insertions, 160 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp
index c92687f798b..835b8ef1044 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.cpp
+++ b/storage/src/tests/persistence/common/filestortestfixture.cpp
@@ -18,19 +18,17 @@ spi::LoadType FileStorTestFixture::defaultLoadType = spi::LoadType(0, "default")
const uint32_t FileStorTestFixture::MSG_WAIT_TIME;
void
-FileStorTestFixture::setupDisks(uint32_t diskCount)
+FileStorTestFixture::setupPersistenceThreads(uint32_t threads)
{
std::string rootOfRoot = "todo-make-unique-filestorefixture";
- _config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot)));
-
- _config2.reset(new vdstestlib::DirConfig(*_config));
- _config2->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config2->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2"));
- _config2->getConfig("stor-server").set("node_index", "1");
-
- _smallConfig.reset(new vdstestlib::DirConfig(*_config));
- _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(1),
- _config->getConfigId()));
+ _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot));
+ _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2"));
+ _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2"));
+ _config->getConfig("stor-server").set("node_index", "1");
+ _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads));
+
+ _node = std::make_unique<TestServiceLayerApp>(
+ DiskCount(1), NodeIndex(1), _config->getConfigId());
_testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1");
}
@@ -38,16 +36,15 @@ FileStorTestFixture::setupDisks(uint32_t diskCount)
void
FileStorTestFixture::setUp()
{
- setupDisks(1);
+ setupPersistenceThreads(1);
_node->setPersistenceProvider(
- spi::PersistenceProvider::UP(
- new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
+ std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1));
}
void
FileStorTestFixture::tearDown()
{
- _node.reset(0);
+ _node.reset();
}
void
@@ -91,7 +88,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents(
}
api::StorageMessageAddress
-FileStorTestFixture::TestFileStorComponents::makeSelfAddress() const {
+FileStorTestFixture::makeSelfAddress() {
return api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 0);
}
diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h
index c8158d01224..c46f9de24fc 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.h
+++ b/storage/src/tests/persistence/common/filestortestfixture.h
@@ -19,8 +19,6 @@ public:
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<vdstestlib::DirConfig> _config;
- std::unique_ptr<vdstestlib::DirConfig> _config2;
- std::unique_ptr<vdstestlib::DirConfig> _smallConfig;
const document::DocumentType* _testdoctype1;
static const uint32_t MSG_WAIT_TIME = 60 * 1000;
@@ -30,10 +28,12 @@ public:
void setUp() override;
void tearDown() override;
- void setupDisks(uint32_t diskCount);
+ void setupPersistenceThreads(uint32_t diskCount);
void createBucket(const document::BucketId& bid);
bool bucketExistsInDb(const document::BucketId& bucket) const;
+ static api::StorageMessageAddress makeSelfAddress();
+
api::ReturnCode::Result resultOf(const api::StorageReply& reply) const {
return reply.getResult().getResult();
}
@@ -99,8 +99,6 @@ public:
const char* testName,
const StorageLinkInjector& i = NoOpStorageLinkInjector());
- api::StorageMessageAddress makeSelfAddress() const;
-
void sendDummyGet(const document::BucketId& bid);
void sendPut(const document::BucketId& bid,
uint32_t docIdx,
diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
index 50999f5883e..d4cec415937 100644
--- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp
@@ -16,7 +16,7 @@ class MergeBlockingTest : public FileStorTestFixture
{
public:
void setupDisks() {
- FileStorTestFixture::setupDisks(1);
+ FileStorTestFixture::setupPersistenceThreads(1);
_node->setPersistenceProvider(
spi::PersistenceProvider::UP(
new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 64ef48b5719..e12f48bcdea 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -86,9 +86,9 @@ public:
std::unique_ptr<vespalib::Barrier> _queueBarrier;
std::unique_ptr<vespalib::Barrier> _completionBarrier;
- void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) {
- FileStorTestFixture::setupDisks(diskCount);
- _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount));
+ void setupProviderAndBarriers(uint32_t queueBarrierThreads) {
+ FileStorTestFixture::setupPersistenceThreads(1);
+ _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1));
_queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads));
_completionBarrier.reset(new vespalib::Barrier(2));
_blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier);
@@ -219,7 +219,7 @@ makeAbortCmd(const Container& buckets)
void
OperationAbortingTest::testAbortMessageClearsRelevantQueuedOperations()
{
- setupDisks(1, 2);
+ setupProviderAndBarriers(2);
TestFileStorComponents c(*this, "testAbortMessageClearsRelevantQueuedOperations");
document::BucketId bucket(16, 1);
createBucket(bucket);
@@ -305,7 +305,7 @@ public:
void
OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket()
{
- setupDisks(1, 3);
+ setupProviderAndBarriers(3);
TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket");
document::BucketId bucket(16, 1);
@@ -386,7 +386,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName,
const std::vector<api::StorageMessage::SP>& msgs,
bool shouldCreateBucketInitially)
{
- setupDisks(1, 2);
+ setupProviderAndBarriers(2);
TestFileStorComponents c(*this, testName);
document::BucketId bucket(16, 1);
document::BucketId blockerBucket(16, 2);
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index f31623eed61..e96ad013923 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -15,86 +15,190 @@ using document::test::makeDocumentBucket;
namespace storage {
-class PersistenceQueueTest : public FileStorTestFixture
-{
+class PersistenceQueueTest : public FileStorTestFixture {
public:
void testFetchNextUnlockedMessageIfBucketLocked();
+ void shared_locked_operations_allow_concurrent_bucket_access();
+ void exclusive_locked_operation_not_started_if_shared_op_active();
+ void shared_locked_operation_not_started_if_exclusive_op_active();
+ void exclusive_locked_operation_not_started_if_exclusive_op_active();
+ void operation_batching_not_allowed_across_different_lock_modes();
- std::shared_ptr<api::StorageMessage>
- createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const;
void setUp() override;
- void tearDown() override;
CPPUNIT_TEST_SUITE(PersistenceQueueTest);
CPPUNIT_TEST(testFetchNextUnlockedMessageIfBucketLocked);
+ CPPUNIT_TEST(shared_locked_operations_allow_concurrent_bucket_access);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_shared_op_active);
+ CPPUNIT_TEST(shared_locked_operation_not_started_if_exclusive_op_active);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_exclusive_op_active);
+ CPPUNIT_TEST(operation_batching_not_allowed_across_different_lock_modes);
CPPUNIT_TEST_SUITE_END();
+
+ struct Fixture {
+ FileStorTestFixture& parent;
+ DummyStorageLink top;
+ std::unique_ptr<DummyStorageLink> dummyManager;
+ ForwardingMessageSender messageSender;
+ documentapi::LoadTypeSet loadTypes;
+ FileStorMetrics metrics;
+ std::unique_ptr<FileStorHandler> filestorHandler;
+ uint32_t stripeId;
+
+ explicit Fixture(FileStorTestFixture& parent);
+ ~Fixture();
+ };
+
+ static constexpr uint16_t _disk = 0;
};
CPPUNIT_TEST_SUITE_REGISTRATION(PersistenceQueueTest);
-void
-PersistenceQueueTest::setUp()
+PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
+ : parent(parent_),
+ top(),
+ dummyManager(std::make_unique<DummyStorageLink>()),
+ messageSender(*dummyManager),
+ loadTypes("raw:"),
+ metrics(loadTypes.getMetricLoadTypes())
{
- setupDisks(1);
- _node->setPersistenceProvider(
- spi::PersistenceProvider::UP(
- new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
+ top.push_back(std::move(dummyManager));
+ top.open();
+
+ metrics.initDiskMetrics(parent._node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
+
+ filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, parent._node->getPartitions(),
+ parent._node->getComponentRegister());
+ // getNextMessage will time out if no unlocked buckets are present. Choose a timeout
+ // that is large enough to fail tests with high probability if this is not the case,
+ // and small enough to not slow down testing too much.
+ filestorHandler->setGetNextMessageTimeout(20);
+
+ stripeId = filestorHandler->getNextStripeId(0);
}
-void
-PersistenceQueueTest::tearDown()
-{
- _node.reset(0);
+PersistenceQueueTest::Fixture::~Fixture() = default;
+
+void PersistenceQueueTest::setUp() {
+ setupPersistenceThreads(1);
+ _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1));
}
-std::shared_ptr<api::StorageMessage>
-PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx)
-{
- std::ostringstream id;
- id << "id:foo:testdoctype1:n=" << bucket << ":" << docIdx;
- document::Document::SP doc(
- _node->getTestDocMan().createDocument("foobar", id.str()));
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234));
- cmd->setAddress(api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 0));
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) {
+ std::shared_ptr<document::Document> doc = _node->getTestDocMan().createDocument(
+ "foobar", vespalib::make_string("id:foo:testdoctype1:n=%zu:%zu", bucket, docIdx));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234);
+ cmd->setAddress(makeSelfAddress());
return cmd;
}
-void
-PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
-{
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createGet(uint64_t bucket) const {
+ auto cmd = std::make_shared<api::GetCommand>(
+ makeDocumentBucket(document::BucketId(16, bucket)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%zu:0", bucket)), "[all]");
+ cmd->setAddress(makeSelfAddress());
+ return cmd;
+}
+void PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() {
+ Fixture f(*this);
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
// then subsequently on the second, skipping the already locked bucket.
// Puts all have same pri, so order is well defined.
- filestorHandler.schedule(createPut(1234, 0), 0);
- filestorHandler.schedule(createPut(1234, 1), 0);
- filestorHandler.schedule(createPut(5432, 0), 0);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 1), _disk);
+ f.filestorHandler->schedule(createPut(5432, 0), _disk);
- auto lock0 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock0.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock1.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
}
+void PersistenceQueueTest::shared_locked_operations_allow_concurrent_bucket_access() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Even though we already have a lock on the bucket, Gets allow shared locking and we
+ // should therefore be able to get another lock.
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock1.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_shared_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::shared_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::operation_batching_not_allowed_across_different_lock_modes() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first);
+ CPPUNIT_ASSERT(lock0.second);
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0);
+ CPPUNIT_ASSERT(!lock0.second);
+}
+
} // namespace storage
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index 27281d9b95f..4fc577226ca 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -62,7 +62,7 @@ private:
CPPUNIT_TEST(testNormalUsage);
CPPUNIT_TEST(testFailedCreateIterator);
CPPUNIT_TEST(testFailedGetIter);
- CPPUNIT_TEST(testMultipleFailedGetIter);
+ CPPUNIT_TEST(iterators_per_bucket_config_is_ignored_and_hardcoded_to_1);
CPPUNIT_TEST(testDocumentAPIClientError);
CPPUNIT_TEST(testNoDocumentAPIResendingForFailedVisitor);
CPPUNIT_TEST(testIteratorCreatedForFailedVisitor);
@@ -90,7 +90,7 @@ public:
void testNormalUsage();
void testFailedCreateIterator();
void testFailedGetIter();
- void testMultipleFailedGetIter();
+ void iterators_per_bucket_config_is_ignored_and_hardcoded_to_1();
void testDocumentAPIClientError();
void testNoDocumentAPIResendingForFailedVisitor();
void testIteratorCreatedForFailedVisitor();
@@ -592,36 +592,31 @@ VisitorTest::testFailedGetIter()
CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
}
-void
-VisitorTest::testMultipleFailedGetIter()
-{
- initializeTest(TestParams().iteratorsPerBucket(2));
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- makeCreateVisitor());
+void VisitorTest::iterators_per_bucket_config_is_ignored_and_hardcoded_to_1() {
+ initializeTest(TestParams().iteratorsPerBucket(20));
+ auto cmd = makeCreateVisitor();
_top->sendDown(cmd);
sendCreateIteratorReply();
- std::vector<GetIterCommand::SP> getIterCmds(
- fetchMultipleCommands<GetIterCommand>(*_bottom, 2));
-
- sendGetIterReply(*getIterCmds[0],
- api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND));
-
- // Wait for an "appropriate" amount of time so that wrongful logic
- // will send a DestroyIteratorCommand before all pending GetIters
- // have been replied to.
- std::this_thread::sleep_for(100ms);
+ auto getIterCmd = fetchSingleCommand<GetIterCommand>(*_bottom);
+ CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234),
+ getIterCmd->getIteratorId());
+ sendGetIterReply(*getIterCmd);
CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands());
- sendGetIterReply(*getIterCmds[1],
- api::ReturnCode(api::ReturnCode::BUCKET_DELETED));
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> docIds;
+ std::vector<std::string> infoMessages;
+ getMessagesAndReply(_documents.size(), getSession(0), docs, docIds, infoMessages);
+ CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), docIds.size());
- DestroyIteratorCommand::SP destroyIterCmd(
- fetchSingleCommand<DestroyIteratorCommand>(*_bottom));
+ auto destroyIterCmd = fetchSingleCommand<DestroyIteratorCommand>(*_bottom);
- verifyCreateVisitorReply(api::ReturnCode::BUCKET_DELETED, 0, 0);
+ verifyCreateVisitorReply(api::ReturnCode::OK);
CPPUNIT_ASSERT(waitUntilNoActiveVisitors());
+ CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount());
}
void
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 74baecbf026..0da0fd5ce66 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -71,9 +71,9 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage&
}
FileStorHandler::BucketLockInterface::SP
-FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk)
+FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq)
{
- return _impl->lock(bucket, disk);
+ return _impl->lock(bucket, disk, lockReq);
}
void
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index b74765b17d2..02c959df2f0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -58,8 +58,9 @@ public:
typedef std::shared_ptr<BucketLockInterface> SP;
virtual const document::Bucket &getBucket() const = 0;
+ virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
- virtual ~BucketLockInterface() {};
+ virtual ~BucketLockInterface() = default;
};
typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage;
@@ -139,7 +140,7 @@ public:
*
*
*/
- BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk);
+ BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq);
/**
* Called by FileStorThread::onBucketDiskMove() after moving file, in case
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index a01881b6fbe..f9571228ef9 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -370,16 +370,16 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId)
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
-FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket)
-{
+FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) {
vespalib::MonitorGuard guard(_lock);
- while (isLocked(guard, bucket)) {
- LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str());
+ while (isLocked(guard, bucket, lockReq)) {
+ LOG(spam, "Contending for filestor lock for %s with %s access",
+ bucket.getBucketId().toString().c_str(), api::to_string(lockReq));
guard.wait(100);
}
- auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0);
+ auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0, lockReq);
guard.broadcast();
return locker;
@@ -388,9 +388,9 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket)
namespace {
struct MultiLockGuard {
std::map<uint16_t, vespalib::Monitor*> monitors;
- std::vector<std::shared_ptr<vespalib::MonitorGuard> > guards;
+ std::vector<std::shared_ptr<vespalib::MonitorGuard>> guards;
- MultiLockGuard() {}
+ MultiLockGuard() = default;
void addLock(vespalib::Monitor& monitor, uint16_t index) {
monitors[index] = &monitor;
@@ -931,7 +931,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
PriorityIdx& idx(bmi::get<1>(_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
- while (iter != end && isLocked(guard, iter->_bucket)) {
+ while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) {
iter++;
}
if (iter != end) {
@@ -959,6 +959,13 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck)
}
api::StorageMessage & m(*range.first->_command);
+ // For now, don't allow batching of operations across lock requirement modes.
+ // We might relax this requirement later once we're 100% sure it can't trigger
+ // any unfortunate edge cases.
+ if (lck.first->lockingRequirements() != m.lockingRequirements()) {
+ lck.second.reset();
+ return lck;
+ }
uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]));
@@ -992,7 +999,8 @@ FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Priority
if (!messageTimedOutInQueue(*msg, waitTime)) {
auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(),
- msg->getType().getId(), msg->getMsgId());
+ msg->getType().getId(), msg->getMsgId(),
+ msg->lockingRequirements());
guard.unlock();
return FileStorHandler::LockedMessage(std::move(locker), std::move(msg));
} else {
@@ -1090,10 +1098,65 @@ FileStorHandlerImpl::Stripe::flush()
lockGuard.wait(100);
}
}
+
+void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
+ api::LockingRequirements reqOfReleasedLock,
+ api::StorageMessage::Id lockMsgId) {
+ vespalib::MonitorGuard guard(_lock);
+ auto iter = _lockedBuckets.find(bucket);
+ assert(iter != _lockedBuckets.end());
+ auto& entry = iter->second;
+
+ if (reqOfReleasedLock == api::LockingRequirements::Exclusive) {
+ assert(entry._exclusiveLock);
+ assert(entry._exclusiveLock->msgId == lockMsgId);
+ entry._exclusiveLock.reset();
+ } else {
+ assert(!entry._exclusiveLock);
+ auto shared_iter = entry._sharedLocks.find(lockMsgId);
+ assert(shared_iter != entry._sharedLocks.end());
+ entry._sharedLocks.erase(shared_iter);
+ }
+
+ if (!entry._exclusiveLock && entry._sharedLocks.empty()) {
+ _lockedBuckets.erase(iter); // No more locks held
+ }
+ guard.broadcast();
+}
+
+void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const document::Bucket & bucket,
+ api::LockingRequirements lockReq, const LockEntry & lockEntry) {
+ auto& entry = _lockedBuckets[bucket];
+ assert(!entry._exclusiveLock);
+ if (lockReq == api::LockingRequirements::Exclusive) {
+ assert(entry._sharedLocks.empty());
+ entry._exclusiveLock = lockEntry;
+ } else {
+ // TODO use a hash set with a custom comparator/hasher instead...?
+ auto inserted = entry._sharedLocks.insert(std::make_pair(lockEntry.msgId, lockEntry));
+ (void) inserted;
+ assert(inserted.second);
+ }
+}
+
bool
-FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept
+FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket,
+ api::LockingRequirements lockReq) const noexcept
{
- return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end());
+ if (bucket.getBucketId().getRawId() == 0) {
+ return false;
+ }
+ auto iter = _lockedBuckets.find(bucket);
+ if (iter == _lockedBuckets.end()) {
+ return false;
+ }
+ if (iter->second._exclusiveLock) {
+ return true;
+ }
+ // Shared locks can be taken alongside other shared locks, but exclusive locks
+ // require that no shared locks are currently present.
+ return ((lockReq == api::LockingRequirements::Exclusive)
+ && !iter->second._sharedLocks.empty());
}
uint32_t
@@ -1114,33 +1177,26 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const
FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe,
const document::Bucket &bucket, uint8_t priority,
- api::MessageType::Id msgType, api::StorageMessage::Id msgId)
+ api::MessageType::Id msgType, api::StorageMessage::Id msgId,
+ api::LockingRequirements lockReq)
: _stripe(stripe),
- _bucket(bucket)
+ _bucket(bucket),
+ _uniqueMsgId(msgId),
+ _lockReq(lockReq)
{
- (void) guard;
if (_bucket.getBucketId().getRawId() != 0) {
- // Lock the bucket and wait until it is not the current operation for
- // the disk itself.
- _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, msgType, msgId));
- LOG(debug, "Locked bucket %s with priority %u",
- bucket.getBucketId().toString().c_str(), priority);
-
- LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _bucket.getBucketId(), "acquired filestor lock", false,
- debug::BucketOperationLogger::State::BUCKET_LOCKED);
+ _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId));
+ LOG(debug, "Locked bucket %s for message %zu with priority %u in mode %s",
+ bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq));
}
}
-FileStorHandlerImpl::BucketLock::~BucketLock()
-{
+FileStorHandlerImpl::BucketLock::~BucketLock() {
if (_bucket.getBucketId().getRawId() != 0) {
- _stripe.release(_bucket);
- LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str());
- LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _bucket.getBucketId(), "released filestor lock", true,
- debug::BucketOperationLogger::State::BUCKET_UNLOCKED);
+ _stripe.release(_bucket, _lockReq, _uniqueMsgId);
+ LOG(debug, "Unlocked bucket %s for message %zu in mode %s",
+ _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq));
}
}
@@ -1182,14 +1238,31 @@ FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const
}
}
+namespace {
+
+void dump_lock_entry(const document::BucketId& bucketId, const FileStorHandlerImpl::Stripe::LockEntry& entry,
+ api::LockingRequirements lock_mode, uint32_t now_ts, std::ostream& os) {
+ os << api::MessageType::get(entry.msgType).getName() << ":" << entry.msgId << " ("
+ << bucketId << ", " << api::to_string(lock_mode)
+ << " lock) Running for " << (now_ts - entry.timestamp) << " secs<br/>\n";
+}
+
+}
+
void
FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const
{
uint32_t now = time(nullptr);
vespalib::MonitorGuard guard(_lock);
for (const auto & e : _lockedBuckets) {
- os << api::MessageType::get(e.second.msgType).getName() << ":" << e.second.msgId << " (" << e.first.getBucketId()
- << ") Running for " << (now - e.second.timestamp) << " secs<br/>\n";
+ if (e.second._exclusiveLock) {
+ dump_lock_entry(e.first.getBucketId(), *e.second._exclusiveLock,
+ api::LockingRequirements::Exclusive, now, os);
+ }
+ for (const auto& shared : e.second._sharedLocks) {
+ dump_lock_entry(e.first.getBucketId(), shared.second,
+ api::LockingRequirements::Shared, now, os);
+ }
}
}
@@ -1238,7 +1311,6 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
}
for (auto & entry : _mergeStates) {
out << "<b>" << entry.first.toString() << "</b><br>\n";
- // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here..
}
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 45ac5ded47f..f9dcca4315b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -30,6 +30,7 @@
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <atomic>
+#include <optional>
namespace storage {
@@ -82,13 +83,19 @@ public:
api::MessageType::Id msgType;
api::StorageMessage::Id msgId;
-
LockEntry() : timestamp(0), priority(0), msgType(), msgId(0) { }
LockEntry(uint8_t priority_, api::MessageType::Id msgType_, api::StorageMessage::Id msgId_)
: timestamp(time(nullptr)), priority(priority_), msgType(msgType_), msgId(msgId_)
{ }
};
+
+ struct MultiLockEntry {
+ std::optional<LockEntry> _exclusiveLock;
+ using SharedLocks = vespalib::hash_map<api::StorageMessage::Id, LockEntry>;
+ SharedLocks _sharedLocks;
+ };
+
Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender);
~Stripe();
void flush();
@@ -105,19 +112,16 @@ public:
vespalib::MonitorGuard guard(_lock);
return _queue.size();
}
- void release(const document::Bucket & bucket){
- vespalib::MonitorGuard guard(_lock);
- _lockedBuckets.erase(bucket);
- guard.broadcast();
- }
+ void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock,
+ api::StorageMessage::Id lockMsgId);
- bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept;
+ bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&,
+ api::LockingRequirements lockReq) const noexcept;
- void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) {
- _lockedBuckets.insert(std::make_pair(bucket, lockEntry));
- }
+ void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket,
+ api::LockingRequirements lockReq, const LockEntry & lockEntry);
- std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket);
+ std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk);
@@ -131,9 +135,11 @@ public:
void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; }
private:
bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const;
+ // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
+ // with its locking requirements.
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx,
PriorityIdx::iterator iter);
- typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets;
+ using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
FileStorStripeMetrics *_metrics;
@@ -178,8 +184,8 @@ public:
return _stripes[stripeId].getNextMessage(lck);
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket) {
- return stripe(bucket).lock(bucket);
+ lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
+ return stripe(bucket).lock(bucket, lockReq);
}
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
stripe(bucket).failOperations(bucket, code);
@@ -194,7 +200,7 @@ public:
// Disperse bucket bits by multiplying with the 64-bit FNV-1 prime.
// This avoids an inherent affinity between the LSB of a bucket's bits
// and the stripe an operation ends up on.
- return bucket.getBucketId().getRawId() * 1099511628211ULL;
+ return bucket.getBucketId().getId() * 1099511628211ULL;
}
Stripe & stripe(const document::Bucket & bucket) {
return _stripes[dispersed_bucket_bits(bucket) % _stripes.size()];
@@ -208,15 +214,20 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
+ // TODO refactor, too many params
BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket,
- uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id);
+ uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id,
+ api::LockingRequirements lockReq);
~BucketLock();
const document::Bucket &getBucket() const override { return _bucket; }
+ api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; }
private:
Stripe & _stripe;
document::Bucket _bucket;
+ api::StorageMessage::Id _uniqueMsgId;
+ api::LockingRequirements _lockReq;
};
FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&,
@@ -253,8 +264,8 @@ public:
uint32_t getNextStripeId(uint32_t disk);
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket, uint16_t disk) {
- return _diskInfo[disk].lock(bucket);
+ lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) {
+ return _diskInfo[disk].lock(bucket, lockReq);
}
void addMergeStatus(const document::Bucket&, MergeStatus::SP);
diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h
index ba7f5979569..d0572e7dbf8 100644
--- a/storage/src/vespa/storage/persistence/messages.h
+++ b/storage/src/vespa/storage/persistence/messages.h
@@ -38,6 +38,9 @@ public:
void setMaxByteSize(uint32_t maxByteSize) { _maxByteSize = maxByteSize; }
uint32_t getMaxByteSize() const { return _maxByteSize; }
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Shared;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
private:
@@ -105,6 +108,9 @@ public:
spi::ReadConsistency getReadConsistency() const noexcept {
return _readConsistency;
}
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Shared;
+ }
std::unique_ptr<api::StorageReply> makeReply() override;
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index c2dcb8e2a29..888dc93dd82 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -122,9 +122,14 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
result.disk = getPreferredAvailableDisk(bucket);
while (true) {
+ // This function is only called in a context where we require exclusive
+ // locking (split/join). Refactor if this no longer the case.
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(
- _fileStorHandler.lock(bucket, result.disk));
+ _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive));
+ // TODO disks are no longer used in practice, can we safely discard this?
+ // Might need it for synchronization purposes if something has taken the
+ // disk lock _and_ the bucket lock...?
StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "join-lockAndGetDisk-1", flags));
if (entry.exist() && entry->disk != result.disk) {
diff --git a/storage/src/vespa/storage/visiting/stor-visitor.def b/storage/src/vespa/storage/visiting/stor-visitor.def
index 1e80f2993a5..6f16bcb60a2 100644
--- a/storage/src/vespa/storage/visiting/stor-visitor.def
+++ b/storage/src/vespa/storage/visiting/stor-visitor.def
@@ -24,6 +24,7 @@ defaultparalleliterators int default=8
## will be 16 requests to persistence layer, but only 8 will be able to execute
## at the same time, since only one operation can be executed at the same time
## for one bucket)
+## DEPRECATED: ignored by backend, 1 is always used.
iterators_per_bucket int default=1
## Default number of maximum client replies pending.
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index a8f31514eb1..b12a1eb6e7f 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -637,7 +637,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd)
_ignoreNonExistingVisitorTimeLimit
= config.ignorenonexistingvisitortimelimit;
_defaultParallelIterators = config.defaultparalleliterators;
- _iteratorsPerBucket = config.iteratorsPerBucket;
_defaultPendingMessages = config.defaultpendingmessages;
_defaultDocBlockSize = config.defaultdocblocksize;
_visitorMemoryUsageLimit = config.visitorMemoryUsageLimit;
@@ -647,12 +646,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd)
LOG(config, "Cannot use value of defaultParallelIterators < 1");
_defaultParallelIterators = 1;
}
- if (_iteratorsPerBucket < 1 && _iteratorsPerBucket > 10) {
- if (_iteratorsPerBucket < 1) _iteratorsPerBucket = 1;
- else _iteratorsPerBucket = 10;
- LOG(config, "Invalid value of iterators per bucket %u using %u",
- config.iteratorsPerBucket, _iteratorsPerBucket);
- }
if (_defaultPendingMessages < 1) {
LOG(config, "Cannot use value of defaultPendingMessages < 1");
_defaultPendingMessages = 1;