diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-28 08:24:57 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-29 14:14:03 +0000 |
commit | eb7b71781ca079b5577a13b300beafee388bc1ce (patch) | |
tree | 8a5194ed759a8fc8433fef14118e67ac6bfc2632 /storage | |
parent | 9499865f8a43aa097841606795a2bea8d0273ef9 (diff) |
- Add async interface to put
- Use MessageTracker for keeping context.
- implement putAsync, but still use it synchronously.
Diffstat (limited to 'storage')
22 files changed, 403 insertions, 441 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index fb80c25bfb7..862d1fb758a 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -91,14 +91,12 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const } spi::Result -PersistenceProviderWrapper::put(const spi::Bucket& bucket, - spi::Timestamp timestamp, - const document::Document::SP& doc, - spi::Context& context) +PersistenceProviderWrapper::put(const spi::Bucket& bucket, spi::Timestamp timestamp, + document::Document::SP doc, spi::Context& context) { LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")"); CHECK_ERROR(spi::Result, FAIL_PUT); - return _spi.put(bucket, timestamp, doc, context); + return _spi.put(bucket, timestamp, std::move(doc), context); } spi::RemoveResult diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 9bd3653e8a1..25365e64bfc 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -88,7 +88,7 @@ public: spi::PartitionStateListResult getPartitionStates() const override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace, spi::PartitionId) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; - spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override; + spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; diff --git a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp index 0dd3285e5f3..c10681405e7 100644 --- a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp +++ b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp @@ -27,13 +27,11 @@ TEST_F(DiskMoveOperationHandlerTest, simple) { doPutOnDisk(3, 4, spi::Timestamp(1000 + i)); } - DiskMoveOperationHandler diskMoveHandler( - getEnv(3), - getPersistenceProvider()); - BucketDiskMoveCommand move(makeDocumentBucket(document::BucketId(16, 4)), 3, 4); - + DiskMoveOperationHandler diskMoveHandler(getEnv(3),getPersistenceProvider()); + document::Bucket bucket = makeDocumentBucket(document::BucketId(16, 4)); + auto move = std::make_shared<BucketDiskMoveCommand>(bucket, 3, 4); spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - diskMoveHandler.handleBucketDiskMove(move, context); + diskMoveHandler.handleBucketDiskMove(*move, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), move)); EXPECT_EQ("BucketId(0x4000000000000004): 10,4", getBucketStatus(document::BucketId(16,4))); diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index ba344971c3b..e9f878bfe1e 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -46,13 +46,8 @@ public: _deleteBucketInvocations(0) {} - spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp, - const document::Document::SP& doc, spi::Context& context) override + spi::Result put(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&) override { - (void) bucket; - (void) timestamp; - (void) doc; - (void) context; _queueBarrier.await(); // message abort stage with active opertion in disk queue std::this_thread::sleep_for(75ms); diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index dffd4ef1768..035da326d48 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -149,6 +149,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; }; + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); + } + std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -203,9 +208,9 @@ TEST_F(MergeHandlerTest, merge_bucket_command) { MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - cmd.setSourceIndex(1234); - MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + cmd->setSourceIndex(1234); + MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); @@ -217,7 +222,7 @@ TEST_F(MergeHandlerTest, merge_bucket_command) { EXPECT_EQ(1, cmd2.getAddress()->getIndex()); EXPECT_EQ(1234, cmd2.getSourceIndex()); - tracker->generateReply(cmd); + tracker->generateReply(*cmd); EXPECT_FALSE(tracker->hasReply()); } @@ -228,8 +233,8 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Verifying that get bucket diff is sent on"); - api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); - MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context); + auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); + MessageTracker::UP tracker1 = handler.handleGetBucketDiff(*cmd, createTracker(cmd, _bucket)); api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP(); if (midChain) { @@ -277,8 +282,8 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Verifying that apply bucket diff is sent on"); - api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); - MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context); + auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); + MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP(); if (midChain) { @@ -324,9 +329,9 @@ TEST_F(MergeHandlerTest, master_message_flow) { MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); @@ -425,8 +430,8 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); LOG(debug, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd); @@ -505,9 +510,8 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize); applyBucketDiffCmd->getDiff() = applyDiff; - MergeHandler handler( - getPersistenceProvider(), getEnv(), maxChunkSize); - handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); + MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); + handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); // Should not fill up more than chunk size allows for @@ -520,8 +524,8 @@ TEST_F(MergeHandlerTest, max_timestamp) { MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); @@ -534,8 +538,7 @@ MergeHandlerTest::fillDummyApplyDiff( std::vector<api::ApplyBucketDiffCommand::Entry>& diff) { document::TestDocMan docMan; - document::Document::SP doc( - docMan.createRandomDocumentAtLocation(_location)); + document::Document::SP doc(docMan.createRandomDocumentAtLocation(_location)); std::vector<char> headerBlob; { vespalib::nbostream stream; @@ -638,7 +641,8 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { providerWrapper.clearOperationLog(); try { - handler.handleApplyBucketDiff(*createDummyApplyDiff(6000), *_context); + auto cmd = createDummyApplyDiff(6000); + handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); @@ -648,15 +652,15 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { TEST_F(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler(getPersistenceProvider(), getEnv()); // Send merge for unknown bucket - api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); - MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); + MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } TEST_F(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd); @@ -682,8 +686,8 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { _nodes.emplace_back(0, false); _nodes.emplace_back(1, false); _nodes.emplace_back(2, false); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd); @@ -722,7 +726,7 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024); applyBucketDiffCmd->getDiff() = applyDiff; - auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); + auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); ASSERT_TRUE(applyBucketDiffReply.get()); @@ -809,10 +813,10 @@ void MergeHandlerTest::HandleMergeBucketInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { - api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleMergeBucket(cmd, context); + auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { @@ -841,17 +845,16 @@ void MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context& ) { - api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleGetBucketDiff(cmd, context); + auto cmd = std::make_shared<api::GetBucketDiffCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); - providerWrapper.setResult( - spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); + providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { @@ -874,12 +877,11 @@ void MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { ++_counter; - std::shared_ptr<api::ApplyBucketDiffCommand> cmd( - test.createDummyApplyDiff(100000 * _counter)); - handler.handleApplyBucketDiff(*cmd, context); + auto cmd = test.createDummyApplyDiff(100000 * _counter); + handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { @@ -904,8 +906,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); // Casual, in-place testing of bug 6752085. // This will fail if we give NaN to the metric in question. - EXPECT_TRUE(std::isfinite(getEnv()._metrics - .mergeAverageDataReceivedNeeded.getLast())); + EXPECT_TRUE(std::isfinite(getEnv()._metrics.mergeAverageDataReceivedNeeded.getLast())); } } @@ -913,10 +914,10 @@ void MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { - api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleMergeBucket(cmd, context); + auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); _diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); } @@ -974,13 +975,13 @@ void MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { ++_counter; _stub.clear(); if (getChainPos() == FRONT) { - api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleMergeBucket(cmd, context); + auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); auto diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4); diffCmd->getDiff() = dummyDiff->getDiff(); @@ -995,7 +996,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( // Pretend last node in chain has data and that it will be fetched when // chain is unwinded. auto cmd = test.createDummyApplyDiff(100000 * _counter, 0x4, false); - handler.handleApplyBucketDiff(*cmd, context); + handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); _applyCmd = test.fetchSingleMessage<api::ApplyBucketDiffCommand>(); } } @@ -1147,13 +1148,13 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024); applyBucketDiffCmd->getDiff() = applyDiff; - auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); + auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); ASSERT_TRUE(applyBucketDiffReply.get()); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 25c0a36a7f5..4ac9dfd7765 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -283,7 +283,7 @@ PersistenceTestUtils::doPut(const document::Document::SP& doc, spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); getPersistenceProvider().createBucket(b, context); - getPersistenceProvider().put(b, time, doc, context); + getPersistenceProvider().put(b, time, std::move(doc), context); } spi::UpdateResult diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index e418765ecac..3121bef61e5 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -37,6 +37,23 @@ struct PersistenceTestEnvironment { class PersistenceTestUtils : public testing::Test { public: + class NoBucketLock : public FileStorHandler::BucketLockInterface + { + public: + NoBucketLock(document::Bucket bucket) : _bucket(bucket) { } + const document::Bucket &getBucket() const override { + return _bucket; + } + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Shared; + } + static std::shared_ptr<NoBucketLock> make(document::Bucket bucket) { + return std::make_shared<NoBucketLock>(bucket); + } + private: + document::Bucket _bucket; + }; + std::unique_ptr<PersistenceTestEnvironment> _env; PersistenceTestUtils(); diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 98a2be6880d..1ec6a35fb1d 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -201,19 +201,20 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) } document::Document::SP doc(testDocMan.createRandomDocumentAtLocation( docloc, seed, docSize, docSize)); - spi.put(bucket, spi::Timestamp(1000 + i), doc, context); + spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context); } std::unique_ptr<PersistenceThread> thread(createPersistenceThread(0)); getNode().getStateUpdater().setClusterState( std::make_shared<lib::ClusterState>("distributor:1 storage:1")); - api::SplitBucketCommand cmd(makeDocumentBucket(document::BucketId(currentSplitLevel, 1))); - cmd.setMaxSplitBits(maxBits); - cmd.setMinSplitBits(minBits); - cmd.setMinByteSize(maxSize); - cmd.setMinDocCount(maxCount); - cmd.setSourceIndex(0); - MessageTracker::UP result(thread->handleSplitBucket(cmd, context)); + document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1)); + auto cmd = std::make_shared<api::SplitBucketCommand>(docBucket); + cmd->setMaxSplitBits(maxBits); + cmd->setMinSplitBits(minBits); + cmd->setMinByteSize(maxSize); + cmd->setMinDocCount(maxCount); + cmd->setSourceIndex(0); + MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { @@ -222,8 +223,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) auto& reply = dynamic_cast<api::SplitBucketReply&>(result->getReply()); std::set<std::string> expected; for (uint32_t i=0; i<resultBuckets; ++i) { - document::BucketId b(resultSplitLevel, - location | (i == 0 ? 0 : splitMask)); + document::BucketId b(resultSplitLevel, location | (i == 0 ? 0 : splitMask)); std::ostringstream ost; ost << b << " - " << b.getUsedBits(); expected.insert(ost.str()); diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 5462b4a5b0a..0d482ebe5b7 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -20,10 +20,10 @@ TEST_F(ProcessAllHandlerTest, remove_location) { doPut(4, spi::Timestamp(1234)); doPut(4, spi::Timestamp(2345)); - api::RemoveLocationCommand removeLocation("id.user == 4", makeDocumentBucket(bucketId)); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - auto tracker = handler.handleRemoveLocation(removeLocation, context); + auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", @@ -45,10 +45,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { doPut(doc, bucketId, spi::Timestamp(100 + i), 0); } - api::RemoveLocationCommand - removeLocation("testdoctype1.headerval % 2 == 0", makeDocumentBucket(bucketId)); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - auto tracker = handler.handleRemoveLocation(removeLocation, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); + auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" @@ -71,12 +70,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty document::BucketId bucketId(16, 4); doPut(4, spi::Timestamp(1234)); - api::RemoveLocationCommand - removeLocation("unknowndoctype.headerval % 2 == 0", makeDocumentBucket(bucketId)); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -86,11 +84,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio document::BucketId bucketId(16, 4); doPut(4, spi::Timestamp(1234)); - api::RemoveLocationCommand removeLocation("id.bogus != badgers", makeDocumentBucket(bucketId)); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -107,10 +105,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc doPut(doc, bucketId, spi::Timestamp(100 + i), 0); } - api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), - "testdoctype1.headerval % 2 == 0"); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0"); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -142,9 +139,9 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) true); } - api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true"); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -188,9 +185,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ doPut(doc, bucketId, spi::Timestamp(100 + i), 0); } - api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true"); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 08555fe0627..864ab320527 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -29,6 +29,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { const document::StringFieldValue MATCHING_HEADER{"Some string with woofy dog as a substring"}; const document::StringFieldValue OLD_CONTENT{"Some old content"}; const document::StringFieldValue NEW_CONTENT{"Freshly pressed and squeezed content"}; + const document::Bucket BUCKET = makeDocumentBucket(BUCKET_ID); unique_ptr<PersistenceThread> thread; shared_ptr<document::Document> testDoc; @@ -39,6 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { : context(spi::LoadType(0, "default"), 0, 0) {} + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); + } + void SetUp() override { SingleDiskPersistenceTestUtils::SetUp(); @@ -57,7 +63,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { SingleDiskPersistenceTestUtils::TearDown(); } - std::unique_ptr<api::UpdateCommand> conditional_update_test( + std::shared_ptr<api::UpdateCommand> conditional_update_test( bool createIfMissing, api::Timestamp updateTimestamp); @@ -82,10 +88,10 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { // Conditionally replace document, but fail due to lack of woofy dog api::Timestamp timestampTwo = 1; - api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo); - setTestCondition(putTwo); + auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); + setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), + ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -102,10 +108,10 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { // Conditionally replace document with updated version, succeed in doing so api::Timestamp timestampTwo = 1; - api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo); - setTestCondition(putTwo); + auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); + setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -122,10 +128,10 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { // Conditionally remove document, fail in doing so api::Timestamp timestampTwo = 1; - api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo); - setTestCondition(remove); + auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); + setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), + ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -142,18 +148,17 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { // Conditionally remove document, succeed in doing so api::Timestamp timestampTwo = 1; - api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo); - setTestCondition(remove); + auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); + setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); } -std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test( - bool createIfMissing, - api::Timestamp updateTimestamp) +std::shared_ptr<api::UpdateCommand> +TestAndSetTest::conditional_update_test(bool createIfMissing, api::Timestamp updateTimestamp) { auto docUpdate = std::make_shared<document::DocumentUpdate>(_env->_testDocMan.getTypeRepo(), testDoc->getType(), testDocId); auto fieldUpdate = document::FieldUpdate(testDoc->getField("content")); @@ -161,7 +166,7 @@ std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test( docUpdate->addUpdate(fieldUpdate); docUpdate->setCreateIfNonExistent(createIfMissing); - auto updateUp = std::make_unique<api::UpdateCommand>(makeDocumentBucket(BUCKET_ID), docUpdate, updateTimestamp); + auto updateUp = std::make_unique<api::UpdateCommand>(BUCKET, docUpdate, updateTimestamp); setTestCondition(*updateUp); return updateUp; } @@ -172,7 +177,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -185,7 +190,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -197,7 +202,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -206,7 +211,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } @@ -215,10 +220,10 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { // Conditionally replace nonexisting document // Fail early since document selection is invalid api::Timestamp timestamp = 0; - api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); - put.setCondition(documentapi::TestAndSetCondition("bjarne")); + auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); + put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -226,11 +231,11 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { // Conditionally replace nonexisting document // Fail since no document exists to match with test and set api::Timestamp timestamp = 0; - api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); - setTestCondition(put); - thread->handlePut(put, context); + auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); + setTestCondition(*put); + thread->handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), + ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -253,8 +258,8 @@ TestAndSetTest::createTestDocument() document::Document::SP TestAndSetTest::retrieveTestDocument() { - api::GetCommand get(makeDocumentBucket(BUCKET_ID), testDocId, "[all]"); - auto tracker = thread->handleGet(get, context); + auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, "[all]"); + auto tracker = thread->handleGet(*get, createTracker(get, BUCKET)); assert(tracker->getResult() == api::ReturnCode::Result::OK); auto & reply = static_cast<api::GetReply &>(tracker->getReply()); @@ -273,8 +278,8 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta testDoc->setValue(testDoc->getField("hstringval"), MATCHING_HEADER); } - api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); - thread->handlePut(put, context); + auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); + thread->handlePut(*put, createTracker(put, BUCKET)); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp index e906cfc624f..18877bdf8f7 100644 --- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp +++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp @@ -7,20 +7,16 @@ LOG_SETUP(".persistence.diskmoveoperationhandler"); namespace storage { -DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env, - spi::PersistenceProvider& provider) +DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env, spi::PersistenceProvider& provider) : _env(env), _provider(provider) { } MessageTracker::UP -DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, - spi::Context& context) +DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, MessageTracker::UP tracker) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.movedBuckets, - _env._component.getClock())); + tracker->setMetric(_env._metrics.movedBuckets); document::Bucket bucket(cmd.getBucket()); uint32_t targetDisk(cmd.getDstDisk()); @@ -46,13 +42,10 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, deviceIndex, targetDisk); spi::Bucket from(bucket, spi::PartitionId(deviceIndex)); - spi::Bucket to(bucket, spi::PartitionId(targetDisk)); - spi::Result result( - _provider.move(from, spi::PartitionId(targetDisk), context)); + spi::Result result(_provider.move(from, spi::PartitionId(targetDisk), tracker->context())); if (result.hasError()) { - tracker->fail(api::ReturnCode::INTERNAL_FAILURE, - result.getErrorMessage()); + tracker->fail(api::ReturnCode::INTERNAL_FAILURE, result.getErrorMessage()); return tracker; } @@ -82,12 +75,7 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, } // Answer message, setting extra info such as filesize - tracker->setReply(std::shared_ptr<BucketDiskMoveReply>( - new BucketDiskMoveReply( - cmd, - bInfo, - sourceFileSize, - sourceFileSize))); + tracker->setReply(std::make_shared<BucketDiskMoveReply>(cmd, bInfo, sourceFileSize, sourceFileSize)); return tracker; } diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h index f0c4bbef66a..9e8d33fc802 100644 --- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h +++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h @@ -12,8 +12,7 @@ public: DiskMoveOperationHandler(PersistenceUtil&, spi::PersistenceProvider& provider); - MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&, - spi::Context&); + MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&, MessageTracker::UP tracker); private: PersistenceUtil& _env; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 4cb687bb753..3483b15dd0e 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -537,9 +537,10 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket, if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); - checkResult(_spi.put(bucket, timestamp, doc, context), + DocumentId docId = doc->getId(); + checkResult(_spi.put(bucket, timestamp, std::move(doc), context), bucket, - doc->getId(), + docId, "put"); } else { DocumentId docId(e._docName); @@ -903,12 +904,9 @@ public: }; MessageTracker::UP -MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, - spi::Context& context) +MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.mergeBuckets, - _env._component.getClock())); + tracker->setMetric(_env._metrics.mergeBuckets); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".", @@ -949,44 +947,33 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, tracker->fail(ReturnCode::BUSY, err); return tracker; } - checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); + checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - MergeStatus::SP s = MergeStatus::SP(new MergeStatus( + auto s = std::make_shared<MergeStatus>( _env._component.getClock(), cmd.getLoadType(), - cmd.getPriority(), cmd.getTrace().getLevel())); + cmd.getPriority(), cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->nodeList = cmd.getNodes(); s->maxTimestamp = Timestamp(cmd.getMaxTimestamp()); s->timeout = cmd.getTimeout(); s->startTime = framework::MilliSecTimer(_env._component.getClock()); - std::shared_ptr<api::GetBucketDiffCommand> cmd2( - new api::GetBucketDiffCommand(bucket.getBucket(), - s->nodeList, - s->maxTimestamp.getTime())); - if (!buildBucketInfoList(bucket, - cmd.getLoadType(), - s->maxTimestamp, - 0, - cmd2->getDiff(), - context)) - { + auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), s->nodeList, s->maxTimestamp.getTime()); + if (!buildBucketInfoList(bucket, cmd.getLoadType(), s->maxTimestamp, 0, cmd2->getDiff(), tracker->context())) { LOG(debug, "Bucket non-existing in db. Failing merge."); tracker->fail(ReturnCode::BUCKET_DELETED, "Bucket not found in buildBucketInfo step"); return tracker; } - _env._metrics.mergeMetadataReadLatency.addValue( - s->startTime.getElapsedTimeAsDouble()); + _env._metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble()); LOG(spam, "Sending GetBucketDiff %" PRIu64 " for %s to next node %u " "with diff of %u entries.", cmd2->getMsgId(), bucket.toString().c_str(), s->nodeList[1].index, uint32_t(cmd2->getDiff().size())); - cmd2->setAddress(createAddress(_env._component.getClusterName(), - s->nodeList[1].index)); + cmd2->setAddress(createAddress(_env._component.getClusterName(), s->nodeList[1].index)); cmd2->setPriority(s->context.getPriority()); cmd2->setTimeout(s->timeout); cmd2->setSourceIndex(cmd.getSourceIndex()); @@ -1129,15 +1116,12 @@ namespace { } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, - spi::Context& context) +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.getBucketDiff, - _env._component.getClock())); + tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); + checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(ReturnCode::BUSY, @@ -1151,7 +1135,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, framework::MilliSecTimer startTime(_env._component.getClock()); if (!buildBucketInfoList(bucket, cmd.getLoadType(), Timestamp(cmd.getMaxTimestamp()), - index, local, context)) + index, local, tracker->context())) { LOG(debug, "Bucket non-existing in db. Failing merge."); tracker->fail(ReturnCode::BUCKET_DELETED, @@ -1186,31 +1170,26 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, cmd.getMsgId(), bucket.toString().c_str(), cmd.getNodes()[index - 1].index, final.size(), local.size()); - api::GetBucketDiffReply* reply = new api::GetBucketDiffReply(cmd); - tracker->setReply(api::StorageReply::SP(reply)); + auto reply = std::make_shared<api::GetBucketDiffReply>(cmd); reply->getDiff().swap(final); + tracker->setReply(std::move(reply)); } else { // When not the last node in merge chain, we must save reply, and // send command on. MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - MergeStatus::SP s(new MergeStatus(_env._component.getClock(), + auto s = std::make_shared<MergeStatus>(_env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), - cmd.getTrace().getLevel())); + cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); - s->pendingGetDiff = - api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd)); + s->pendingGetDiff = std::make_shared<api::GetBucketDiffReply>(cmd); s->pendingGetDiff->setPriority(cmd.getPriority()); - LOG(spam, "Sending GetBucketDiff for %s on to node %d, " - "added %zu new entries to diff.", + LOG(spam, "Sending GetBucketDiff for %s on to node %d, added %zu new entries to diff.", bucket.toString().c_str(), cmd.getNodes()[index + 1].index, local.size() - remote.size()); - std::shared_ptr<api::GetBucketDiffCommand> cmd2( - new api::GetBucketDiffCommand( - bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp())); - cmd2->setAddress(createAddress(_env._component.getClusterName(), - cmd.getNodes()[index + 1].index)); + auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp()); + cmd2->setAddress(createAddress(_env._component.getClusterName(), cmd.getNodes()[index + 1].index)); cmd2->getDiff().swap(local); cmd2->setPriority(cmd.getPriority()); cmd2->setTimeout(cmd.getTimeout()); @@ -1330,10 +1309,9 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, } MessageTracker::UP -MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, - spi::Context& context) +MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.applyBucketDiff, _env._component.getClock()); + tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); LOG(debug, "%s", cmd.toString().c_str()); @@ -1348,10 +1326,8 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, bool lastInChain = index + 1u >= cmd.getNodes().size(); if (applyDiffNeedLocalData(cmd.getDiff(), index, !lastInChain)) { framework::MilliSecTimer startTime(_env._component.getClock()); - fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, - context); - _env._metrics.mergeDataReadLatency.addValue( - startTime.getElapsedTimeAsDouble()); + fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context()); + _env._metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Moving %zu entries, didn't need " "local data on node %u (%u).", @@ -1363,7 +1339,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_env._component.getClock()); api::BucketInfo info(applyDiffLocally(bucket, cmd.getLoadType(), - cmd.getDiff(), index, context)); + cmd.getDiff(), index, tracker->context())); _env._metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); } else { diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 4ea69bd0fdf..7052258ec03 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -55,13 +55,10 @@ public: uint8_t nodeIndex, spi::Context& context); - MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, - spi::Context&); - MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, - spi::Context&); + MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, MessageTracker::UP); + MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTracker::UP); void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&); - MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, - spi::Context&); + MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTracker::UP); void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&); private: diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 422e19a492e..5905d93cc83 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -69,7 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucke bool PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker) { - uint32_t code = _env.convertErrorCode(response); + uint32_t code = PersistenceUtil::convertErrorCode(response); if (code != 0) { tracker.fail(code, response.getErrorMessage()); @@ -80,11 +80,13 @@ PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tr } -bool PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) { +bool +PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) { return cmd.getCondition().isPresent(); } -bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, +bool +PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch) { try { TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch); @@ -104,35 +106,35 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, } MessageTracker::UP -PersistenceThread::handlePut(api::PutCommand& cmd, spi::Context & context) +PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.put[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) { return tracker; } spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), context); + spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker->context()); checkForError(response, *tracker); return tracker; } MessageTracker::UP -PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context) +PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.remove[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) { return tracker; } spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), context); + spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker->context()); if (checkForError(response, *tracker)) { tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } @@ -143,18 +145,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context) } MessageTracker::UP -PersistenceThread::handleUpdate(api::UpdateCommand& cmd, spi::Context & context) +PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.update[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context, cmd.getUpdate()->getCreateIfNonExistent())) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) { return tracker; } spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), context); + spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context()); if (checkForError(response, *tracker)) { auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); @@ -176,17 +178,16 @@ spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency co } MessageTracker::UP -PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context) +PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.get[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); - context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); + document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); + tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); spi::GetResult result = - _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), context); + _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), tracker->context()); if (checkForError(result, *tracker)) { if (!result.hasDocument()) { @@ -199,9 +200,9 @@ PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context) } MessageTracker::UP -PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) +PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock()); + tracker->setMetric(_env._metrics.repairs); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(), (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); @@ -225,28 +226,28 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) } MessageTracker::UP -PersistenceThread::handleRevert(api::RevertCommand& cmd, spi::Context & context) +PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock()); + tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]); spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); for (const api::Timestamp & token : tokens) { - spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), context); + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context()); } return tracker; } MessageTracker::UP -PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context) +PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.createBuckets); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); } spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - _spi.createBucket(spiBucket, context); + _spi.createBucket(spiBucket, tracker->context()); if (cmd.getActive()) { _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); } @@ -295,9 +296,9 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, con } MessageTracker::UP -PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context) +PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.deleteBuckets); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { @@ -308,7 +309,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { return tracker; } - _spi.deleteBucket(bucket, context); + _spi.deleteBucket(bucket, tracker->context()); StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); { StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); @@ -333,12 +334,12 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex } MessageTracker::UP -PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context) +PersistenceThread::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock()); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), context)); + tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context())); if (checkForError(result, *tracker)) { - GetIterReply::SP reply(new GetIterReply(cmd)); + auto reply = std::make_shared<GetIterReply>(cmd); reply->getEntries() = result.steal_entries(); _env._metrics.visit[cmd.getLoadType()]. documentsPerIterate.addValue(reply->getEntries().size()); @@ -351,9 +352,9 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context) } MessageTracker::UP -PersistenceThread::handleReadBucketList(ReadBucketList& cmd) +PersistenceThread::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock()); + tracker->setMetric(_env._metrics.readBucketList); spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition())); if (checkForError(result, *tracker)) { @@ -366,23 +367,22 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd) } MessageTracker::UP -PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd) +PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock()); + tracker->setMetric(_env._metrics.readBucketInfo); _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket())); return tracker; } MessageTracker::UP -PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context) +PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock()); - document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields()); - context.setReadConsistency(cmd.getReadConsistency()); + tracker->setMetric(_env._metrics.createIterator); + document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFields()); + tracker->context().setReadConsistency(cmd.getReadConsistency()); spi::CreateIteratorResult result(_spi.createIterator( spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), context)); + *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), tracker->context())); if (checkForError(result, *tracker)) { tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId()))); } @@ -390,9 +390,9 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context } MessageTracker::UP -PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context) +PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.splitBuckets); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); // Calculate the various bucket ids involved. @@ -411,7 +411,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context SplitBitDetector::Result targetInfo; if (_env._config.enableMultibitSplitOptimalization) { targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(), - context, cmd.getMinDocCount(), cmd.getMinByteSize()); + tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize()); } if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { document::BucketId src(cmd.getBucketId()); @@ -451,9 +451,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context } #endif spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)), - spi::Bucket(target2, spi::PartitionId(lock2.disk)), context); + spi::Bucket(target2, spi::PartitionId(lock2.disk)), tracker->context()); if (result.hasError()) { - tracker->fail(_env.convertErrorCode(result), result.getErrorMessage()); + tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage()); return tracker; } // After split we need to take all bucket db locks to update them. @@ -509,7 +509,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context spi::PartitionId(targets[i].second.diskIndex))); LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", createTarget.toString().c_str()); - _spi.createBucket(createTarget, context); + _spi.createBucket(createTarget, tracker->context()); } splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(), targets[i].first->getBucketInfo()); @@ -529,7 +529,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context } bool -PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const +PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) { if (cmd.getSourceBuckets().size() != 2) { tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, @@ -554,9 +554,9 @@ PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, Messa } MessageTracker::UP -PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context) +PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.joinBuckets); if (!validateJoinCommand(cmd, *tracker)) { return tracker; } @@ -603,7 +603,7 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context _spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)), spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)), spi::Bucket(destBucket, spi::PartitionId(_env._partition)), - context); + tracker->context()); if (!checkForError(result, *tracker)) { return tracker; } @@ -634,9 +634,9 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context } MessageTracker::UP -PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) +PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.setBucketStates,_env._component.getClock()); + tracker->setMetric(_env._metrics.setBucketStates); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); @@ -665,9 +665,9 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) } MessageTracker::UP -PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context) +PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock()); + tracker->setMetric(_env._metrics.internalJoin); document::Bucket destBucket = cmd.getBucket(); { // Create empty bucket for target. @@ -682,7 +682,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi: _spi.join(spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())), - context); + tracker->context()); if (checkForError(result, *tracker)) { tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket()))); } @@ -690,9 +690,9 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi: } MessageTracker::UP -PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd) +PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.recheckBucketInfo, _env._component.getClock()); + tracker->setMetric(_env._metrics.recheckBucketInfo); document::Bucket bucket(cmd.getBucket()); api::BucketInfo info(_env.getBucketInfo(bucket)); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); @@ -720,58 +720,58 @@ PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd) } MessageTracker::UP -PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Context & context) +PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) { switch (msg.getType().getId()) { case api::MessageType::GET_ID: - return handleGet(static_cast<api::GetCommand&>(msg), context); + return handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker)); case api::MessageType::PUT_ID: - return handlePut(static_cast<api::PutCommand&>(msg), context); + return handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker)); case api::MessageType::REMOVE_ID: - return handleRemove(static_cast<api::RemoveCommand&>(msg), context); + return handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker)); case api::MessageType::UPDATE_ID: - return handleUpdate(static_cast<api::UpdateCommand&>(msg), context); + return handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker)); case api::MessageType::REVERT_ID: - return handleRevert(static_cast<api::RevertCommand&>(msg), context); + return handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), context); + return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), context); + return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: - return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), context); + return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: - return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), context); + return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker)); // Depends on iterators case api::MessageType::STATBUCKET_ID: - return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), context); + return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker)); case api::MessageType::REMOVELOCATION_ID: - return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), context); + return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker)); case api::MessageType::MERGEBUCKET_ID: - return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), context); + return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker)); case api::MessageType::GETBUCKETDIFF_ID: - return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), context); + return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::APPLYBUCKETDIFF_ID: - return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), context); + return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::SETBUCKETSTATE_ID: - return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg)); + return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: - return handleGetIter(static_cast<GetIterCommand&>(msg), context); + return handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker)); case CreateIteratorCommand::ID: - return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), context); + return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker)); case ReadBucketList::ID: - return handleReadBucketList(static_cast<ReadBucketList&>(msg)); + return handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker)); case ReadBucketInfo::ID: - return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg)); + return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); case RepairBucketCommand::ID: - return handleRepairBucket(static_cast<RepairBucketCommand&>(msg)); + return handleRepairBucket(static_cast<RepairBucketCommand&>(msg), std::move(tracker)); case BucketDiskMoveCommand::ID: - return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), context); + return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), std::move(tracker)); case InternalBucketJoinCommand::ID: - return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), context); + return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker)); case RecheckBucketInfoCommand::ID: - return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg)); + return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); default: LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str()); break; @@ -782,21 +782,6 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Conte return MessageTracker::UP(); } -MessageTracker::UP -PersistenceThread::handleCommand(api::StorageCommand& msg) -{ - spi::Context context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel()); - MessageTracker::UP mtracker(handleCommandSplitByType(msg, context)); - if (mtracker && ! context.getTrace().getRoot().isEmpty()) { - if (mtracker->hasReply()) { - mtracker->getReply().getTrace().getRoot().addChild(context.getTrace().getRoot()); - } else { - msg.getTrace().getRoot().addChild(context.getTrace().getRoot()); - } - } - return mtracker; -} - void PersistenceThread::handleReply(api::StorageReply& reply) { @@ -813,7 +798,7 @@ PersistenceThread::handleReply(api::StorageReply& reply) } MessageTracker::UP -PersistenceThread::processMessage(api::StorageMessage& msg) +PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) { MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer"); @@ -829,13 +814,12 @@ PersistenceThread::processMessage(api::StorageMessage& msg) } } else { api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg); - try { int64_t startTime(_component->getClock().getTimeInMillis().getTime()); LOG(debug, "Handling command: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - auto tracker(handleCommand(initiatingCommand)); + tracker = handleCommandSplitByType(initiatingCommand, std::move(tracker)); if (!tracker) { LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); } else { @@ -867,47 +851,18 @@ PersistenceThread::processMessage(api::StorageMessage& msg) } } - return MessageTracker::UP(); -} - -namespace { - - -bool isBatchable(api::MessageType::Id id) -{ - return (id == api::MessageType::PUT_ID || - id == api::MessageType::REMOVE_ID || - id == api::MessageType::UPDATE_ID || - id == api::MessageType::REVERT_ID); -} - -bool hasBucketInfo(api::MessageType::Id id) -{ - return (isBatchable(id) || - (id == api::MessageType::REMOVELOCATION_ID || - id == api::MessageType::JOINBUCKETS_ID)); -} - + return tracker; } void -PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) { - std::vector<MessageTracker::UP> trackers; - document::Bucket bucket = lock.first->getBucket(); - +PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) { LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); api::StorageMessage & msg(*lock.second); - std::unique_ptr<MessageTracker> tracker = processMessage(msg); - if (tracker && tracker->hasReply()) { - if (hasBucketInfo(msg.getType().getId())) { - if (tracker->getReply().getResult().success()) { - _env.setBucketInfo(*tracker, bucket); - } - } - LOG(spam, "Sending reply up: %s %" PRIu64, - tracker->getReply().toString().c_str(), tracker->getReply().getMsgId()); - _env._fileStorHandler.sendReply(std::move(*tracker).stealReplySP()); + auto tracker = std::make_unique<MessageTracker>(_env, std::move(lock.first), std::move(lock.second)); + tracker = processMessage(msg, std::move(tracker)); + if (tracker) { + tracker->sendReply(); } } @@ -922,7 +877,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId)); if (lock.first) { - processLockedMessage(lock); + processLockedMessage(std::move(lock)); } vespalib::MonitorGuard flushMonitorGuard(_flushMonitor); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 56414835b7b..a3c8099f228 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -28,23 +28,23 @@ public: void flush() override; framework::Thread& getThread() override { return *_thread; } - MessageTracker::UP handlePut(api::PutCommand& cmd, spi::Context & context); - MessageTracker::UP handleRemove(api::RemoveCommand& cmd, spi::Context & context); - MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, spi::Context & context); - MessageTracker::UP handleGet(api::GetCommand& cmd, spi::Context & context); - MessageTracker::UP handleRevert(api::RevertCommand& cmd, spi::Context & context); - MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context); - MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context); - MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context); - MessageTracker::UP handleGetIter(GetIterCommand& cmd, spi::Context & context); - MessageTracker::UP handleReadBucketList(ReadBucketList& cmd); - MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd); - MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context); - MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd); - MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context); - MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context); - MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd); - MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd); + MessageTracker::UP handlePut(api::PutCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker); private: uint32_t _stripeId; @@ -67,23 +67,22 @@ private: * an appropriate error and returns false iff the command does not validate * OK. Returns true and does not touch the tracker otherwise. */ - bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const; + static bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker); // Message handling functions - MessageTracker::UP handleCommand(api::StorageCommand&); - MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, spi::Context & context); + MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker); void handleReply(api::StorageReply&); - MessageTracker::UP processMessage(api::StorageMessage& msg); - void processLockedMessage(FileStorHandler::LockedMessage & lock); + MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker); + void processLockedMessage(FileStorHandler::LockedMessage lock); // Thread main loop void run(framework::ThreadHandle&) override; - bool checkForError(const spi::Result& response, MessageTracker& tracker); + static bool checkForError(const spi::Result& response, MessageTracker& tracker); spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const; friend class TestAndSetHelper; - bool tasConditionExists(const api::TestAndSetCommand & cmd); + static bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch = false); }; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 9c49dc96750..53679a1a364 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -14,22 +14,69 @@ namespace { ost << "PersistenceUtil(" << p << ")"; return ost.str(); } + + bool isBatchable(api::MessageType::Id id) + { + return (id == api::MessageType::PUT_ID || + id == api::MessageType::REMOVE_ID || + id == api::MessageType::UPDATE_ID || + id == api::MessageType::REVERT_ID); + } + + bool hasBucketInfo(api::MessageType::Id id) + { + return (isBatchable(id) || + (id == api::MessageType::REMOVELOCATION_ID || + id == api::MessageType::JOINBUCKETS_ID)); + } + } -MessageTracker::MessageTracker(FileStorThreadMetrics::Op& metric, - framework::Clock& clock) +MessageTracker::MessageTracker(PersistenceUtil & env, + FileStorHandler::BucketLockInterface::SP bucketLock, + api::StorageMessage::SP msg) : _sendReply(true), - _metric(metric), + _updateBucketInfo(hasBucketInfo(msg->getType().getId())), + _bucketLock(std::move(bucketLock)), + _msg(std::move(msg)), + _context(_msg->getLoadType(), _msg->getPriority(), _msg->getTrace().getLevel()), + _env(env), + _metric(nullptr), _result(api::ReturnCode::OK), - _timer(clock) -{ - _metric.count.inc(); + _timer(_env._component.getClock()) +{ } + +void +MessageTracker::setMetric(FileStorThreadMetrics::Op& metric) { + metric.count.inc(); + _metric = &metric; } MessageTracker::~MessageTracker() { if (_reply.get() && _reply->getResult().success()) { - _metric.latency.addValue(_timer.getElapsedTimeAsDouble()); + _metric->latency.addValue(_timer.getElapsedTimeAsDouble()); + } +} + +void +MessageTracker::sendReply() { + if (hasReply()) { + if ( ! _context.getTrace().getRoot().isEmpty()) { + getReply().getTrace().getRoot().addChild(_context.getTrace().getRoot()); + } + if (_updateBucketInfo) { + if (getReply().getResult().success()) { + _env.setBucketInfo(*this, _bucketLock->getBucket()); + } + } + LOG(spam, "Sending reply up: %s %" PRIu64, + getReply().toString().c_str(), getReply().getMsgId()); + _env._fileStorHandler.sendReply(std::move(_reply)); + } else { + if ( ! _context.getTrace().getRoot().isEmpty()) { + _msg->getTrace().getRoot().addChild(_context.getTrace().getRoot()); + } } } @@ -53,7 +100,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd) } if (!_reply->getResult().success()) { - _metric.failed.inc(); + _metric->failed.inc(); LOGBP(debug, "Failed to handle command %s: %s", cmd.toString().c_str(), _result.toString().c_str()); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index e8e5f947814..c6cb943f0b9 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -13,14 +13,18 @@ namespace storage { +class PersistenceUtil; + class MessageTracker : protected Types { public: typedef std::unique_ptr<MessageTracker> UP; - MessageTracker(FileStorThreadMetrics::Op& metric, framework::Clock& clock); + MessageTracker(PersistenceUtil & env, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); ~MessageTracker(); + void setMetric(FileStorThreadMetrics::Op& metric); + /** * Called by operation handlers to set reply if they need to send a * non-default reply. They should call this function as soon as they create @@ -57,23 +61,32 @@ public: api::ReturnCode getResult() const { return _result; } + spi::Context & context() { return _context; } + + void sendReply(); + private: - bool _sendReply; - FileStorThreadMetrics::Op& _metric; - api::StorageReply::SP _reply; - api::ReturnCode _result; - framework::MilliSecTimer _timer; + bool _sendReply; + bool _updateBucketInfo; + FileStorHandler::BucketLockInterface::SP _bucketLock; + api::StorageMessage::SP _msg; + spi::Context _context; + PersistenceUtil &_env; + FileStorThreadMetrics::Op *_metric; + api::StorageReply::SP _reply; + api::ReturnCode _result; + framework::MilliSecTimer _timer; }; struct PersistenceUtil { vespa::config::content::StorFilestorConfig _config; - ServiceLayerComponentRegister& _compReg; - ServiceLayerComponent _component; - FileStorHandler& _fileStorHandler; - uint16_t _partition; - uint16_t _nodeIndex; - FileStorThreadMetrics& _metrics; - const document::BucketIdFactory& _bucketFactory; + ServiceLayerComponentRegister &_compReg; + ServiceLayerComponent _component; + FileStorHandler &_fileStorHandler; + uint16_t _partition; + uint16_t _nodeIndex; + FileStorThreadMetrics &_metrics; + const document::BucketIdFactory &_bucketFactory; const std::shared_ptr<const document::DocumentTypeRepo> _repo; spi::PersistenceProvider& _spi; diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index f37c6723933..4829bdf4581 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -80,25 +80,18 @@ public: } MessageTracker::UP -ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, - spi::Context& context) +ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>( - _env._metrics.removeLocation[cmd.getLoadType()], - _env._component.getClock()); + tracker->setMetric(_env._metrics.removeLocation[cmd.getLoadType()]); LOG(debug, "RemoveLocation(%s): using selection '%s'", cmd.getBucketId().toString().c_str(), cmd.getDocumentSelection().c_str()); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - UnrevertableRemoveEntryProcessor processor(_spi, bucket, context); - BucketProcessor::iterateAll(_spi, - bucket, - cmd.getDocumentSelection(), - processor, - spi::NEWEST_DOCUMENT_ONLY, - context); + UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context()); + BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), + processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context()); tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed)); @@ -106,12 +99,9 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, } MessageTracker::UP -ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, - spi::Context& context) +ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>( - _env._metrics.statBucket[cmd.getLoadType()], - _env._component.getClock()); + tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]); std::ostringstream ost; ost << "Persistence bucket " << cmd.getBucketId() @@ -119,15 +109,10 @@ ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); StatEntryProcessor processor(ost); - BucketProcessor::iterateAll(_spi, - bucket, - cmd.getDocumentSelection(), - processor, - spi::ALL_VERSIONS, - context); - - api::StatBucketReply::UP reply(new api::StatBucketReply(cmd, ost.str())); - tracker->setReply(api::StorageReply::SP(reply.release())); + BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), + processor, spi::ALL_VERSIONS,tracker->context()); + + tracker->setReply(std::make_shared<api::StatBucketReply>(cmd, ost.str())); return tracker; } diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h index 37b46ffc728..87c3c63b8fe 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.h +++ b/storage/src/vespa/storage/persistence/processallhandler.h @@ -8,11 +8,7 @@ #include <vespa/storageapi/message/stat.h> #include <vespa/persistence/spi/persistenceprovider.h> -namespace document { -namespace select { -class Node; -} -} +namespace document::select { class Node; } namespace storage { @@ -20,9 +16,8 @@ class ProcessAllHandler : public Types { public: ProcessAllHandler(PersistenceUtil&, spi::PersistenceProvider&); - MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, - spi::Context&); - MessageTracker::UP handleStatBucket(api::StatBucketCommand&, spi::Context&); + MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, MessageTracker::UP tracker); + MessageTracker::UP handleStatBucket(api::StatBucketCommand&, MessageTracker::UP tracker); protected: PersistenceUtil& _env; diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 76c420e76e6..ed3de5c7873 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -74,12 +74,9 @@ ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const } spi::Result -ProviderErrorWrapper::put(const spi::Bucket& bucket, - spi::Timestamp ts, - const spi::DocumentSP& doc, - spi::Context& context) +ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::DocumentSP doc, spi::Context& context) { - return checkResult(_impl.put(bucket, ts, doc, context)); + return checkResult(_impl.put(bucket, ts, std::move(doc), context)); } spi::RemoveResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 292eb004223..61664419c69 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -47,7 +47,7 @@ public: spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; - spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override; + spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; |