diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-18 19:21:33 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-19 11:49:07 +0000 |
commit | 7dff0a9879800c9da8643d1f5c7d4f39fc910467 (patch) | |
tree | 1d4ab8c864c73b1d13f2fdf8e9373eb5b56b086f /storage/src/tests/persistence | |
parent | 779aeaf46753f97c8cc7221774a1c7c91a797c80 (diff) |
Split the persistence thread and the message handler.
- Let FileStorManager own and control the Component and PersistenceHandler
separately from the Persistence thread.
- Let FileStorManager allocate and control stripe assignment.
Diffstat (limited to 'storage/src/tests/persistence')
6 files changed, 69 insertions, 84 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 761888e9f9b..3f60189204b 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -194,17 +194,11 @@ bool fileExistsWithin(const std::string& path, const std::string& file) { } std::unique_ptr<DiskThread> -createThread(vdstestlib::DirConfig& config, - TestServiceLayerApp& node, - spi::PersistenceProvider& provider, +createThread(PersistenceHandler & persistenceHandler, FileStorHandler& filestorHandler, - BucketOwnershipNotifier & notifier, - FileStorThreadMetrics& metrics) + framework::Component & component) { - (void) config; - vespa::config::content::StorFilestorConfig cfg; - return std::make_unique<PersistenceThread>(node.executor(), node.getComponentRegister(), cfg, - provider, filestorHandler, notifier, metrics); + return std::make_unique<PersistenceThread>(persistenceHandler, filestorHandler, 0, component); } namespace { @@ -402,8 +396,7 @@ TEST_F(FileStorManagerTest, handler_priority) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(); - ASSERT_EQ(0u, stripeId); + uint32_t stripeId = 0; std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -469,7 +462,7 @@ public: std::atomic<bool> _threadDone; explicit MessageFetchingThread(FileStorHandler& handler) - : _threadId(handler.getNextStripeId()), _handler(handler), _config(0), _fetchedCount(0), _done(false), + : _threadId(0), _handler(handler), _config(0), _fetchedCount(0), _done(false), _failed(false), _threadDone(false) {} @@ -556,7 +549,7 @@ TEST_F(FileStorManagerTest, handler_pause) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(); + uint32_t stripeId = 0; std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -660,7 +653,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); - uint32_t stripeId = filestorHandler.getNextStripeId(); + uint32_t stripeId = 0; std::string content("Here is some content which is in all documents"); std::ostringstream uri; @@ -721,12 +714,14 @@ TEST_F(FileStorManagerTest, priority) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); - std::unique_ptr<DiskThread> thread2(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); + + PersistenceHandler persistenceHandler2(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[1]); + std::unique_ptr<DiskThread> thread2(createThread(persistenceHandler2, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; @@ -803,9 +798,10 @@ TEST_F(FileStorManagerTest, split1) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -814,8 +810,7 @@ TEST_F(FileStorManagerTest, split1) { uri << "id:footype:testdoctype1:n=" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i; - Document::SP doc(createDocument( - content, uri.str()).release()); + Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } document::BucketIdFactory factory; @@ -824,11 +819,9 @@ TEST_F(FileStorManagerTest, split1) { { // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i); auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); @@ -946,15 +939,16 @@ TEST_F(FileStorManagerTest, split_single_group) { ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with // splitbit set, and once where all the data ends up in file with // splitbit unset bool state = (j == 0); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP> documents; for (uint32_t i=0; i<20; ++i) { @@ -1059,9 +1053,10 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); document::BucketId source(16, 0x10001); @@ -1126,9 +1121,10 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); document::BucketId source(getFirstBucketNotOwnedByDistributor(0)); createBucket(source, 0); @@ -1158,8 +1154,7 @@ TEST_F(FileStorManagerTest, join) { // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); @@ -1169,9 +1164,10 @@ TEST_F(FileStorManagerTest, join) { FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); ServiceLayerComponent component(_node->getComponentRegister(), "test"); BucketOwnershipNotifier bucketOwnershipNotifier(component, messageSender); - std::unique_ptr<DiskThread> thread(createThread( - *config, *_node, _node->getPersistenceProvider(), - filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0])); + vespa::config::content::StorFilestorConfig cfg; + PersistenceHandler persistenceHandler(_node->executor(), component, cfg, _node->getPersistenceProvider(), + filestorHandler, bucketOwnershipNotifier, *metrics.disks[0]->threads[0]); + std::unique_ptr<DiskThread> thread(createThread(persistenceHandler, filestorHandler, component)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1259,8 +1255,7 @@ createIterator(DummyStorageLink& link, { spi::Bucket bucket(makeSpiBucket(bucketId)); - spi::Selection selection = - spi::Selection(spi::DocumentSelection(docSel)); + spi::Selection selection = spi::Selection(spi::DocumentSelection(docSel)); selection.setFromTimestamp(spi::Timestamp(fromTime.getTime())); selection.setToTimestamp(spi::Timestamp(toTime.getTime())); auto createIterCmd = std::make_shared<CreateIteratorCommand>( diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 4737809a926..a19d060474b 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -42,7 +42,8 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) dummyManager(std::make_unique<DummyStorageLink>()), messageSender(*dummyManager), loadTypes("raw:"), - metrics(loadTypes.getMetricLoadTypes()) + metrics(loadTypes.getMetricLoadTypes()), + stripeId(0) { top.push_back(std::move(dummyManager)); top.open(); @@ -55,8 +56,6 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) // that is large enough to fail tests with high probability if this is not the case, // and small enough to not slow down testing too much. filestorHandler->setGetNextMessageTimeout(20ms); - - stripeId = filestorHandler->getNextStripeId(); } PersistenceQueueTest::Fixture::~Fixture() = default; diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index e2d61b9db2c..80ba4c19384 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -70,9 +70,14 @@ PersistenceTestEnvironment::~PersistenceTestEnvironment() { PersistenceTestUtils::PersistenceTestUtils() : _env(std::make_unique<PersistenceTestEnvironment>("todo-make-unique-persistencetestutils")), _replySender(), - _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler) + _bucketOwnershipNotifier(getEnv()._component, getEnv()._fileStorHandler), + _persistenceHandler() { setupExecutor(1); + vespa::config::content::StorFilestorConfig cfg; + _persistenceHandler = std::make_unique<PersistenceHandler>(*_sequenceTaskExecutor, _env->_component, cfg, + getPersistenceProvider(), getEnv()._fileStorHandler, + _bucketOwnershipNotifier, getEnv()._metrics); } PersistenceTestUtils::~PersistenceTestUtils() = default; @@ -86,15 +91,6 @@ PersistenceTestUtils::setupExecutor(uint32_t numThreads) { _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); } -std::unique_ptr<PersistenceThread> -PersistenceTestUtils::createPersistenceThread() -{ - vespa::config::content::StorFilestorConfig cfg; - return std::make_unique<PersistenceThread>(*_sequenceTaskExecutor, _env->_node.getComponentRegister(), - cfg, getPersistenceProvider(), - getEnv()._fileStorHandler, _bucketOwnershipNotifier, getEnv()._metrics); -} - document::Document::SP PersistenceTestUtils::schedulePut( uint32_t location, diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 29a18db413b..332a30393b1 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -73,7 +73,7 @@ public: std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor; ReplySender _replySender; BucketOwnershipNotifier _bucketOwnershipNotifier; - + std::unique_ptr<PersistenceHandler> _persistenceHandler; PersistenceTestUtils(); ~PersistenceTestUtils() override; @@ -225,11 +225,6 @@ public: void createTestBucket(const document::Bucket&); /** - * Create a new persistence thread. - */ - std::unique_ptr<PersistenceThread> createPersistenceThread(); - - /** * In-place modify doc so that it has no more body fields. */ void clearBody(document::Document& doc); diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index e266d367eab..9ac0c7bbfc8 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -204,7 +204,6 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context); } - std::unique_ptr<PersistenceThread> thread(createPersistenceThread()); getNode().getStateUpdater().setClusterState( std::make_shared<lib::ClusterState>("distributor:1 storage:1")); document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1)); @@ -214,7 +213,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) cmd->setMinByteSize(maxSize); cmd->setMinDocCount(maxCount); cmd->setSourceIndex(0); - MessageTracker::UP result = thread->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket)); + MessageTracker::UP result = _persistenceHandler->splitjoinHandler().handleSplitBucket(*cmd, createTracker(cmd, docBucket)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 3d0dd183232..228ea29ab42 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -32,13 +32,16 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { const document::StringFieldValue NEW_CONTENT{"Freshly pressed and squeezed content"}; const document::Bucket BUCKET = makeDocumentBucket(BUCKET_ID); - unique_ptr<PersistenceThread> thread; + unique_ptr<PersistenceHandler> persistenceHandler; + const AsyncHandler * asyncHandler; shared_ptr<document::Document> testDoc; document::DocumentId testDocId; spi::Context context; TestAndSetTest() - : context(spi::LoadType(0, "default"), 0, 0) + : persistenceHandler(), + asyncHandler(nullptr), + context(spi::LoadType(0, "default"), 0, 0) {} void SetUp() override { @@ -47,14 +50,12 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { createBucket(BUCKET_ID); getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context); - thread = createPersistenceThread(); testDoc = createTestDocument(); testDocId = testDoc->getId(); + asyncHandler = &_persistenceHandler->asyncHandler(); } void TearDown() override { - thread->flush(); - thread.reset(); SingleDiskPersistenceTestUtils::TearDown(); } @@ -86,7 +87,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -106,7 +107,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -126,7 +127,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -146,7 +147,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); @@ -172,7 +173,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -185,7 +186,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -197,7 +198,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -206,7 +207,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(fetchResult(thread->asyncHandler().handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(asyncHandler->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } @@ -218,7 +219,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -228,9 +229,9 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { api::Timestamp timestamp = 0; auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); setTestCondition(*put); - thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET)); + asyncHandler->handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))).getResult(), + ASSERT_EQ(fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -255,7 +256,7 @@ document::Document::SP TestAndSetTest::retrieveTestDocument() { auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, document::AllFields::NAME); - auto tracker = thread->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET)); + auto tracker = _persistenceHandler->simpleMessageHandler().handleGet(*get, createTracker(get, BUCKET)); assert(tracker->getResult() == api::ReturnCode::Result::OK); auto & reply = static_cast<api::GetReply &>(tracker->getReply()); @@ -275,7 +276,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta } auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); - fetchResult(thread->asyncHandler().handlePut(*put, createTracker(put, BUCKET))); + fetchResult(asyncHandler->handlePut(*put, createTracker(put, BUCKET))); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) |