diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-04 21:46:32 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-04 22:24:49 +0000 |
commit | 9d769dca411691680c16e73094ef27958b260959 (patch) | |
tree | ef92f32222f18bb53e73d37554e989be3495b360 /storage/src/tests | |
parent | 8bf5ae859e0664c8fd797243328baf6dc1717f7e (diff) |
Implement async put
Implement async remove.
Diffstat (limited to 'storage/src/tests')
7 files changed, 78 insertions, 41 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index fe966d0bbb2..bf0083311fb 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -201,12 +201,8 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, uint16_t deviceIndex) { (void) config; - std::unique_ptr<DiskThread> disk; - disk.reset(new PersistenceThread( - node.getComponentRegister(), config.getConfigId(), provider, - filestorHandler, metrics, - deviceIndex)); - return disk; + return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(), + provider, filestorHandler, metrics, deviceIndex); } namespace { diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 035da326d48..262906a4baf 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -149,11 +149,6 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; }; - MessageTracker::UP - createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { - return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); - } - std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 4ac9dfd7765..cf8bf71708d 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -12,6 +12,8 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <thread> using document::DocumentType; using storage::spi::test::makeSpiBucket; @@ -54,9 +56,9 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const { _node.setupDummyPersistence(); _metrics.initDiskMetrics(numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1); - _handler.reset(new FileStorHandler(_messageKeeper, _metrics, + _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, _node.getPersistenceProvider().getPartitionStates().getList(), - _node.getComponentRegister())); + _node.getComponentRegister()); for (uint32_t i = 0; i < numDisks; i++) { _diskEnvs.push_back( std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, @@ -64,6 +66,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const } } +PersistenceTestEnvironment::~PersistenceTestEnvironment() { + _handler->close(); + while (!_handler->closed(0)) { + std::this_thread::sleep_for(1ms); + } +} + PersistenceTestUtils::PersistenceTestUtils() = default; PersistenceTestUtils::~PersistenceTestUtils() = default; @@ -74,15 +83,21 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) { void PersistenceTestUtils::setupDisks(uint32_t numDisks) { - _env.reset(new PersistenceTestEnvironment(DiskCount(numDisks), "todo-make-unique-persistencetestutils")); + _env = std::make_unique<PersistenceTestEnvironment>(DiskCount(numDisks), "todo-make-unique-persistencetestutils"); + setupExecutor(numDisks); +} + +void +PersistenceTestUtils::setupExecutor(uint32_t numThreads) { + _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); } std::unique_ptr<PersistenceThread> PersistenceTestUtils::createPersistenceThread(uint32_t disk) { - return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(), - getPersistenceProvider(), getEnv()._fileStorHandler, - getEnv()._metrics, disk); + return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(), + _env->_config.getConfigId(),getPersistenceProvider(), + getEnv()._fileStorHandler, getEnv()._metrics, disk); } document::Document::SP diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 3121bef61e5..cdd08d42565 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -10,6 +10,7 @@ #include <vespa/storage/common/storagecomponent.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> +#include <vespa/storage/storageserver/communicationmanager.h> #include <vespa/document/base/testdocman.h> #include <vespa/vespalib/gtest/gtest.h> @@ -24,6 +25,7 @@ struct MessageKeeper : public MessageSender { struct PersistenceTestEnvironment { PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot); + ~PersistenceTestEnvironment(); document::TestDocMan _testDocMan; vdstestlib::DirConfig _config; @@ -54,7 +56,22 @@ public: document::Bucket _bucket; }; + struct ReplySender : public MessageSender { + void sendCommand(const std::shared_ptr<api::StorageCommand> &) override { + abort(); + } + + void sendReply(const std::shared_ptr<api::StorageReply> & ptr) override { + queue.enqueue(std::move(ptr)); + } + + Queue queue; + }; + std::unique_ptr<PersistenceTestEnvironment> _env; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor; + ReplySender _replySender; + PersistenceTestUtils(); virtual ~PersistenceTestUtils(); @@ -67,8 +84,13 @@ public: uint32_t maxSize = 128); void setupDisks(uint32_t disks); + void setupExecutor(uint32_t numThreads); void TearDown() override { + if (_sequenceTaskExecutor) { + _sequenceTaskExecutor->sync(); + _sequenceTaskExecutor.reset(); + } _env.reset(); } @@ -90,6 +112,21 @@ public: spi::PersistenceProvider& getPersistenceProvider(); + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return MessageTracker::createForTesting(getEnv(), _replySender, NoBucketLock::make(bucket), std::move(cmd)); + } + + api::ReturnCode + fetchResult(const MessageTracker::UP & tracker) { + if (tracker) { + return tracker->getResult(); + } + std::shared_ptr<api::StorageMessage> msg; + _replySender.queue.getNext(msg, 60000); + return dynamic_cast<api::StorageReply &>(*msg).getResult(); + } + /** Performs a put to the given disk. Returns the document that was inserted. diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 1ec6a35fb1d..3d7fc70db6a 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -214,7 +214,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) cmd->setMinByteSize(maxSize); cmd->setMinDocCount(maxCount); cmd->setSourceIndex(0); - MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd)); + MessageTracker::UP result = thread->handleSplitBucket(*cmd, createTracker(cmd, docBucket)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 0d482ebe5b7..5174b733334 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -23,7 +23,7 @@ TEST_F(ProcessAllHandlerTest, remove_location) { document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", @@ -47,7 +47,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); - auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" @@ -74,7 +74,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -88,7 +88,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -107,7 +107,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -141,7 +141,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -187,7 +187,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 864ab320527..ca441c71816 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -40,18 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { : context(spi::LoadType(0, "default"), 0, 0) {} - MessageTracker::UP - createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { - return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); - } - void SetUp() override { SingleDiskPersistenceTestUtils::SetUp(); createBucket(BUCKET_ID); - getPersistenceProvider().createBucket( - makeSpiBucket(BUCKET_ID), - context); + getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context); thread = createPersistenceThread(0); testDoc = createTestDocument(); @@ -59,7 +52,8 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { } void TearDown() override { - thread.reset(nullptr); + thread->flush(); + thread.reset(); SingleDiskPersistenceTestUtils::TearDown(); } @@ -91,7 +85,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -111,7 +105,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -131,7 +125,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -151,7 +145,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); @@ -223,7 +217,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -235,7 +229,7 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { setTestCondition(*put); thread->handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -279,7 +273,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta } auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); - thread->handlePut(*put, createTracker(put, BUCKET)); + fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) |