From 94f53ab6a93aefa007200b97846ce47ea166bd70 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 31 Mar 2022 13:00:32 +0000 Subject: GC unused Context parameter --- .../persistence/common/filestortestfixture.cpp | 7 +- .../common/persistenceproviderwrapper.cpp | 65 +++++++---------- .../common/persistenceproviderwrapper.h | 24 +++---- .../filestorage/filestormanagertest.cpp | 23 ++---- .../filestorage/operationabortingtest.cpp | 10 +-- .../src/tests/persistence/persistencetestutils.cpp | 31 +++----- .../persistence/persistencethread_splittest.cpp | 10 ++- .../src/tests/persistence/splitbitdetectortest.cpp | 82 ++++++++-------------- storage/src/tests/persistence/testandsettest.cpp | 6 +- .../src/vespa/storage/persistence/asynchandler.cpp | 15 ++-- .../vespa/storage/persistence/bucketprocessor.cpp | 14 ++-- .../persistence/filestorage/filestormanager.cpp | 2 +- .../src/vespa/storage/persistence/mergehandler.cpp | 29 ++++---- .../src/vespa/storage/persistence/mergehandler.h | 1 - .../storage/persistence/provider_error_wrapper.cpp | 46 ++++++------ .../storage/persistence/provider_error_wrapper.h | 22 +++--- .../storage/persistence/simplemessagehandler.cpp | 4 +- .../vespa/storage/persistence/splitjoinhandler.cpp | 11 +-- 18 files changed, 161 insertions(+), 241 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index 24504a2006b..0b1eec30b2e 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -51,12 +51,9 @@ FileStorTestFixture::TearDown() void FileStorTestFixture::createBucket(const document::BucketId& bid) { - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket(makeSpiBucket(bid), context); - + _node->getPersistenceProvider().createBucket(makeSpiBucket(bid)); StorBucketDatabase::WrappedEntry entry( - _node->getStorageBucketDatabase().get(bid, "foo", - StorBucketDatabase::CREATE_IF_NONEXISTING)); + _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); entry.write(); } diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 1c47170de6c..7e0b96b1d82 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -81,11 +81,11 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const } void -PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept +PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::OperationComplete::UP onComplete) noexcept { LOG_SPI("createBucket(" << bucket << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete); - return _spi.createBucketAsync(bucket, context, std::move(onComplete)); + return _spi.createBucketAsync(bucket, std::move(onComplete)); } spi::BucketInfoResult @@ -98,47 +98,45 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const void PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::Document::SP doc, - spi::Context& context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_PUT, onComplete); - _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(onComplete)); + _spi.putAsync(bucket, timestamp, std::move(doc), std::move(onComplete)); } void PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, std::vector ids, - spi::Context& context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { for (const TimeStampAndDocumentId & stampedId : ids) { LOG_SPI("remove(" << bucket << ", " << stampedId.first << ", " << stampedId.second << ")"); } CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete); - _spi.removeAsync(bucket, std::move(ids), context, std::move(onComplete)); + _spi.removeAsync(bucket, std::move(ids), std::move(onComplete)); } void PersistenceProviderWrapper::removeIfFoundAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id, - spi::Context& context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { LOG_SPI("removeIfFound(" << bucket << ", " << timestamp << ", " << id << ")"); CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE_IF_FOUND, onComplete); - _spi.removeIfFoundAsync(bucket, timestamp, id, context, std::move(onComplete)); + _spi.removeIfFoundAsync(bucket, timestamp, id, std::move(onComplete)); } void PersistenceProviderWrapper::updateAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::DocumentUpdate::SP upd, - spi::Context& context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")"); CHECK_ERROR_ASYNC(spi::UpdateResult, FAIL_UPDATE, onComplete); - _spi.updateAsync(bucket, timestamp, std::move(upd), context, std::move(onComplete)); + _spi.updateAsync(bucket, timestamp, std::move(upd), std::move(onComplete)); } spi::GetResult -PersistenceProviderWrapper::get(const spi::Bucket& bucket, - const document::FieldSet& fieldSet, - const spi::DocumentId& id, - spi::Context& context) const +PersistenceProviderWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet, + const spi::DocumentId& id, spi::Context& context) const { LOG_SPI("get(" << bucket << ", " << id << ")"); CHECK_ERROR(spi::GetResult, FAIL_GET); @@ -147,8 +145,7 @@ PersistenceProviderWrapper::get(const spi::Bucket& bucket, spi::CreateIteratorResult PersistenceProviderWrapper::createIterator(const spi::Bucket &bucket, FieldSetSP fields, const spi::Selection &sel, - spi::IncludedVersions versions, - spi::Context &context) + spi::IncludedVersions versions, spi::Context &context) { // TODO: proper printing of FieldSet and Selection @@ -159,53 +156,43 @@ PersistenceProviderWrapper::createIterator(const spi::Bucket &bucket, FieldSetSP } spi::IterateResult -PersistenceProviderWrapper::iterate(spi::IteratorId iterId, - uint64_t maxByteSize, - spi::Context& context) const +PersistenceProviderWrapper::iterate(spi::IteratorId iterId, uint64_t maxByteSize) const { LOG_SPI("iterate(" << uint64_t(iterId) << ", " << maxByteSize << ")"); CHECK_ERROR(spi::IterateResult, FAIL_ITERATE); - return _spi.iterate(iterId, maxByteSize, context); + return _spi.iterate(iterId, maxByteSize); } spi::Result -PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, - spi::Context& context) +PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId) { LOG_SPI("destroyIterator(" << uint64_t(iterId) << ")"); CHECK_ERROR(spi::Result, FAIL_DESTROY_ITERATOR); - return _spi.destroyIterator(iterId, context); + return _spi.destroyIterator(iterId); } void -PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, - spi::OperationComplete::UP operationComplete) noexcept +PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::OperationComplete::UP operationComplete) noexcept { LOG_SPI("deleteBucket(" << bucket << ")"); CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); - _spi.deleteBucketAsync(bucket, context, std::move(operationComplete)); + _spi.deleteBucketAsync(bucket, std::move(operationComplete)); } spi::Result -PersistenceProviderWrapper::split(const spi::Bucket& source, - const spi::Bucket& target1, - const spi::Bucket& target2, - spi::Context& context) +PersistenceProviderWrapper::split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2) { LOG_SPI("split(" << source << ", " << target1 << ", " << target2 << ")"); CHECK_ERROR(spi::Result, FAIL_SPLIT); - return _spi.split(source, target1, target2, context); + return _spi.split(source, target1, target2); } spi::Result -PersistenceProviderWrapper::join(const spi::Bucket& source1, - const spi::Bucket& source2, - const spi::Bucket& target, - spi::Context& context) +PersistenceProviderWrapper::join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target) { LOG_SPI("join(" << source1 << ", " << source2 << ", " << target << ")"); CHECK_ERROR(spi::Result, FAIL_JOIN); - return _spi.join(source1, source2, target, context); + return _spi.join(source1, source2, target); } std::unique_ptr @@ -221,13 +208,11 @@ PersistenceProviderWrapper::register_executor(std::shared_ptr ids, spi::Context&, spi::OperationComplete::UP) override; - void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; - void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override; + void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, std::vector ids, spi::OperationComplete::UP) override; + void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::OperationComplete::UP) override; + void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::OperationComplete::UP) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket &bucket, FieldSetSP, const spi::Selection &, spi::IncludedVersions versions, spi::Context &context) override; - spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; - spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; - spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, - const spi::Bucket& target2, spi::Context&) override; - spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, - const spi::Bucket& target, spi::Context&) override; - spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override; + spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize) const override; + spi::Result destroyIterator(spi::IteratorId) override; + void deleteBucketAsync(const spi::Bucket&, spi::OperationComplete::UP) noexcept override; + spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2) override; + spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target) override; + spi::Result removeEntry(const spi::Bucket&, spi::Timestamp) override; std::unique_ptr register_resource_usage_listener(spi::IResourceUsageListener& listener) override; std::unique_ptr register_executor(std::shared_ptr) override; }; diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 46310819a59..304720f52b7 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -100,8 +100,7 @@ struct FileStorTestBase : Test { void TearDown() override; void createBucket(document::BucketId bid) { - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket(makeSpiBucket(bid), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bid)); StorBucketDatabase::WrappedEntry entry( _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); @@ -783,10 +782,7 @@ TEST_F(FileStorManagerTest, priority) { // Create buckets in separate, initial pass to avoid races with puts for (uint32_t i=0; igetId()).getRawId()); - - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); - - _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket)); } // Populate bucket with the given data @@ -845,13 +841,12 @@ TEST_F(FileStorManagerTest, split1) { documents.push_back(doc); } document::BucketIdFactory factory; - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); { // Populate bucket with the given data for (uint32_t i=0; igetId()).getRawId()); - _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket)); auto cmd = std::make_shared(makeDocumentBucket(bucket), documents[i], 100 + i); cmd->setAddress(_Storage3); @@ -950,7 +945,6 @@ TEST_F(FileStorManagerTest, split_single_group) { auto& top = c.top; setClusterState("storage:2 distributor:1"); - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with // splitbit set, and once where all the data ends up in file with @@ -964,18 +958,16 @@ TEST_F(FileStorManagerTest, split_single_group) { std::string content("Here is some content for all documents"); std::ostringstream uri; - uri << "id:footype:testdoctype1:n=" << (state ? 0x10001 : 0x0100001) - << ":mydoc-" << i; + uri << "id:footype:testdoctype1:n=" << (state ? 0x10001 : 0x0100001) << ":mydoc-" << i; documents.emplace_back(createDocument(content, uri.str())); } document::BucketIdFactory factory; // Populate bucket with the given data for (uint32_t i=0; igetId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); - _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket)); auto cmd = std::make_shared(makeDocumentBucket(bucket), documents[i], 100 + i); cmd->setAddress(_Storage3); @@ -1025,12 +1017,11 @@ FileStorTestBase::putDoc(DummyStorageLink& top, const document::BucketId& target, uint32_t docNum) { - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); document::BucketIdFactory factory; document::DocumentId docId(vespalib::make_string("id:ns:testdoctype1:n=%" PRIu64 ":%d", target.getId(), docNum)); document::BucketId bucket(16, factory.getBucketId(docId).getRawId()); //std::cerr << "doc bucket is " << bucket << " vs source " << source << "\n"; - _node->getPersistenceProvider().createBucket(makeSpiBucket(target), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(target)); Document::SP doc(new Document(*_testdoctype1, docId)); auto cmd = std::make_shared(makeDocumentBucket(target), doc, docNum+1); cmd->setAddress(_Storage3); diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 1752de5fb80..ecf4ddde911 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -49,7 +49,7 @@ public: {} void - putAsync(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&, spi::OperationComplete::UP onComplete) override + putAsync(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::OperationComplete::UP onComplete) override { _queueBarrier.await(); // message abort stage with active opertion in disk queue @@ -64,15 +64,15 @@ public: return PersistenceProviderWrapper::getBucketInfo(bucket); } - void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { + void createBucketAsync(const spi::Bucket& bucket, spi::OperationComplete::UP onComplete) noexcept override { ++_createBucketInvocations; - PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete)); + PersistenceProviderWrapper::createBucketAsync(bucket, std::move(onComplete)); } void - deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override { + deleteBucketAsync(const spi::Bucket& bucket, spi::OperationComplete::UP onComplete) noexcept override { ++_deleteBucketInvocations; - PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); + PersistenceProviderWrapper::deleteBucketAsync(bucket, std::move(onComplete)); } }; diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index b0efed07fab..b069ac078ef 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -170,17 +170,13 @@ PersistenceTestUtils::getBucketStatus(const document::BucketId& id) } document::Document::SP -PersistenceTestUtils::doPutOnDisk( - uint32_t location, - spi::Timestamp timestamp, - uint32_t minSize, - uint32_t maxSize) +PersistenceTestUtils::doPutOnDisk(uint32_t location, spi::Timestamp timestamp, uint32_t minSize, uint32_t maxSize) { document::Document::SP doc(createRandomDocumentAtLocation(location, timestamp, minSize, maxSize)); spi::Bucket b(makeSpiBucket(document::BucketId(16, location))); spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); - getPersistenceProvider().createBucket(b, context); - getPersistenceProvider().put(spi::Bucket(b), timestamp, doc, context); + getPersistenceProvider().createBucket(b); + getPersistenceProvider().put(spi::Bucket(b), timestamp, doc); return doc; } @@ -191,12 +187,11 @@ PersistenceTestUtils::doRemoveOnDisk( spi::Timestamp timestamp, bool persistRemove) { - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); if (persistRemove) { - spi::RemoveResult result = getPersistenceProvider().removeIfFound(makeSpiBucket(bucketId),timestamp, docId, context); + spi::RemoveResult result = getPersistenceProvider().removeIfFound(makeSpiBucket(bucketId),timestamp, docId); return result.wasFound(); } - spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId, context); + spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId); return result.wasFound(); } @@ -207,8 +202,7 @@ PersistenceTestUtils::doUnrevertableRemoveOnDisk( const document::DocumentId& docId, spi::Timestamp timestamp) { - spi::Context context(spi::Priority(0),spi::Trace::TraceLevel(0)); - spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId, context); + spi::RemoveResult result = getPersistenceProvider().remove(makeSpiBucket(bucketId), timestamp, docId); return result.wasFound(); } @@ -250,9 +244,8 @@ void PersistenceTestUtils::doPut(const document::Document::SP& doc, document::BucketId bid, spi::Timestamp time) { spi::Bucket b(makeSpiBucket(bid)); - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); - getPersistenceProvider().createBucket(b, context); - getPersistenceProvider().put(b, time, std::move(doc), context); + getPersistenceProvider().createBucket(b); + getPersistenceProvider().put(b, time, std::move(doc)); } spi::UpdateResult @@ -261,7 +254,7 @@ PersistenceTestUtils::doUpdate(document::BucketId bid, spi::Timestamp time) { spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); - return getPersistenceProvider().update(makeSpiBucket(bid), time, update, context); + return getPersistenceProvider().update(makeSpiBucket(bid), time, update); } void @@ -273,11 +266,9 @@ PersistenceTestUtils::doRemove(const document::DocumentId& id, spi::Timestamp ti bucket.setUsedBits(usedBits); spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); if (unrevertableRemove) { - getPersistenceProvider().remove( - makeSpiBucket(bucket), time, id, context); + getPersistenceProvider().remove(makeSpiBucket(bucket), time, id); } else { - spi::RemoveResult result = getPersistenceProvider().removeIfFound( - makeSpiBucket(bucket), time, id, context); + spi::RemoveResult result = getPersistenceProvider().removeIfFound(makeSpiBucket(bucket), time, id); if (!result.wasFound()) { throw vespalib::IllegalStateException( "Attempted to remove non-existing doc " + id.toString(), diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 1739f4dff18..97151a8984e 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -181,11 +181,10 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) uint64_t location = 0; uint64_t splitMask = 1ULL << (splitLevelToDivide - 1); - spi::Context context(spi::Priority(0), spi::Trace::TraceLevel(0)); spi::Bucket bucket(makeSpiBucket(document::BucketId(currentSplitLevel, 1))); spi::PersistenceProvider& spi(getPersistenceProvider()); - spi.deleteBucket(bucket, context); - spi.createBucket(bucket, context); + spi.deleteBucket(bucket); + spi.createBucket(bucket); document::TestDocMan testDocMan; for (uint32_t i=0; i testDoc; document::DocumentId testDocId; - spi::Context context; TestAndSetTest() : persistenceHandler(), - asyncHandler(nullptr), - context(0, 0) + asyncHandler(nullptr) {} void SetUp() override { PersistenceTestUtils::SetUp(); createBucket(BUCKET_ID); - getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context); + getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID)); testDoc = createTestDocument(); testDocId = testDoc->getId(); diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index a55889652a8..f5d29fb32a7 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include @@ -172,7 +171,7 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons tracker->checkForError(*response); tracker->sendReply(); }); - _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), + _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), std::make_unique(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return trackerUP; @@ -194,10 +193,10 @@ AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker:: }); if (cmd.getActive()) { - _spi.createBucketAsync(bucket, tracker->context(), std::make_unique()); + _spi.createBucketAsync(bucket, std::make_unique()); _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique(_sequencedExecutor, bucket, std::move(task))); } else { - _spi.createBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, bucket, std::move(task))); + _spi.createBucketAsync(bucket, std::make_unique(_sequencedExecutor, bucket, std::move(task))); } return tracker; @@ -240,7 +239,7 @@ AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker:: } tracker->sendReply(); }); - _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + _spi.deleteBucketAsync(bucket, std::make_unique(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return tracker; } @@ -301,7 +300,7 @@ AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP } tracker->sendReply(); }); - _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), + _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), std::make_unique(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return trackerUP; } @@ -332,7 +331,7 @@ AsyncHandler::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP } tracker->sendReply(); }); - _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), + _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), std::make_unique(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return trackerUP; } @@ -428,7 +427,7 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack tracker->sendReply(); }); - _spi.removeAsync(bucket, std::move(to_remove), tracker->context(), + _spi.removeAsync(bucket, std::move(to_remove), std::make_unique(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return tracker; diff --git a/storage/src/vespa/storage/persistence/bucketprocessor.cpp b/storage/src/vespa/storage/persistence/bucketprocessor.cpp index 39ad18175e6..e0f9bcebf02 100644 --- a/storage/src/vespa/storage/persistence/bucketprocessor.cpp +++ b/storage/src/vespa/storage/persistence/bucketprocessor.cpp @@ -16,20 +16,16 @@ class IteratorGuard private: spi::PersistenceProvider& _spi; spi::IteratorId _iteratorId; - spi::Context& _context; public: - IteratorGuard(spi::PersistenceProvider& spi, - spi::IteratorId iteratorId, - spi::Context& context) + IteratorGuard(spi::PersistenceProvider& spi, spi::IteratorId iteratorId) : _spi(spi), - _iteratorId(iteratorId), - _context(context) + _iteratorId(iteratorId) { } ~IteratorGuard() { assert(_iteratorId != 0); - _spi.destroyIterator(_iteratorId, _context); + _spi.destroyIterator(_iteratorId); } spi::IteratorId getIteratorId() const { return _iteratorId; } spi::PersistenceProvider& getPersistenceProvider() const { return _spi; } @@ -62,10 +58,10 @@ BucketProcessor::iterateAll(spi::PersistenceProvider& provider, } spi::IteratorId iteratorId(createIterResult.getIteratorId()); - IteratorGuard iteratorGuard(provider, iteratorId, context); + IteratorGuard iteratorGuard(provider, iteratorId); while (true) { - spi::IterateResult result(provider.iterate(iteratorId, UINT64_MAX, context)); + spi::IterateResult result(provider.iterate(iteratorId, UINT64_MAX)); if (result.getErrorCode() != spi::Result::ErrorType::NONE) { vespalib::asciistream ss; ss << "Failed: " << result.getErrorMessage(); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 09bd842c308..cab6c26f2a0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -743,7 +743,7 @@ FileStorManager::onInternal(const shared_ptr& msg) { spi::Context context(msg->getPriority(), msg->getTrace().getLevel()); shared_ptr cmd(std::static_pointer_cast(msg)); - _provider->destroyIterator(cmd->getIteratorId(), context); + _provider->destroyIterator(cmd->getIteratorId()); msg->getTrace().addChild(context.steal_trace()); return true; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 73361c7e8d6..012d5c2619d 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -57,18 +57,14 @@ constexpr int getDeleteFlag() { class IteratorGuard { spi::PersistenceProvider& _spi; spi::IteratorId _iteratorId; - spi::Context& _context; public: - IteratorGuard(spi::PersistenceProvider& spi, - spi::IteratorId iteratorId, - spi::Context& context) + IteratorGuard(spi::PersistenceProvider& spi, spi::IteratorId iteratorId) : _spi(spi), - _iteratorId(iteratorId), - _context(context) + _iteratorId(iteratorId) {} ~IteratorGuard() { assert(_iteratorId != 0); - _spi.destroyIterator(_iteratorId, _context); + _spi.destroyIterator(_iteratorId); } }; @@ -152,10 +148,10 @@ MergeHandler::populateMetaData(const spi::Bucket& bucket, Timestamp maxTimestamp throw std::runtime_error(ss.str()); } spi::IteratorId iteratorId(createIterResult.getIteratorId()); - IteratorGuard iteratorGuard(_spi, iteratorId, context); + IteratorGuard iteratorGuard(_spi, iteratorId); while (true) { - spi::IterateResult result(_spi.iterate(iteratorId, UINT64_MAX, context)); + spi::IterateResult result(_spi.iterate(iteratorId, UINT64_MAX)); if (result.getErrorCode() != spi::Result::ErrorType::NONE) { std::ostringstream ss; ss << "Failed to iterate for " @@ -393,7 +389,7 @@ MergeHandler::fetchLocalData( throw std::runtime_error(ss.str()); } spi::IteratorId iteratorId(createIterResult.getIteratorId()); - IteratorGuard iteratorGuard(_spi, iteratorId, context); + IteratorGuard iteratorGuard(_spi, iteratorId); // Fetch all entries DocEntryList entries; @@ -401,7 +397,7 @@ MergeHandler::fetchLocalData( bool fetchedAllLocalData = false; bool chunkLimitReached = false; while (true) { - spi::IterateResult result(_spi.iterate(iteratorId, remainingSize, context)); + spi::IterateResult result(_spi.iterate(iteratorId, remainingSize)); if (result.getErrorCode() != spi::Result::ErrorType::NONE) { std::ostringstream ss; ss << "Failed to iterate for " @@ -512,7 +508,6 @@ void MergeHandler::applyDiffEntry(std::shared_ptr async_results, const spi::Bucket& bucket, const api::ApplyBucketDiffCommand::Entry& e, - spi::Context& context, const document::DocumentTypeRepo& repo) const { auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one() @@ -525,14 +520,14 @@ MergeHandler::applyDiffEntry(std::shared_ptr async_results auto complete = std::make_unique(std::move(async_results), std::move(docId), std::move(throttle_token), "put", _clock, _env._metrics.merge_handler_metrics.put_latency); - _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); + _spi.putAsync(bucket, timestamp, std::move(doc), std::move(complete)); } else { std::vector ids; ids.emplace_back(timestamp, e._docName); auto complete = std::make_unique(std::move(async_results), ids[0].second, std::move(throttle_token), "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); - _spi.removeAsync(bucket, std::move(ids), context, std::move(complete)); + _spi.removeAsync(bucket, std::move(ids), std::move(complete)); } } @@ -594,7 +589,7 @@ MergeHandler::applyDiffLocally( ++i; LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(async_results, bucket, e, context, repo); + applyDiffEntry(async_results, bucket, e, repo); } else { assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put @@ -607,7 +602,7 @@ MergeHandler::applyDiffLocally( "timestamp in %s. Diff slot: %s. Existing slot: %s", bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); - applyDiffEntry(async_results, bucket, e, context, repo); + applyDiffEntry(async_results, bucket, e, repo); } else { // Duplicate put, just ignore it. LOG(debug, "During diff apply, attempting to add slot " @@ -639,7 +634,7 @@ MergeHandler::applyDiffLocally( LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(async_results, bucket, e, context, repo); + applyDiffEntry(async_results, bucket, e, repo); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } if (byteCount + notNeededByteCount != 0) { diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 3f631acbef5..b579677ac24 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -116,7 +116,6 @@ private: void applyDiffEntry(std::shared_ptr async_results, const spi::Bucket&, const api::ApplyBucketDiffCommand::Entry&, - spi::Context& context, const document::DocumentTypeRepo& repo) const; /** diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 6d15cc06cdf..1be9679c641 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -88,29 +88,29 @@ ProviderErrorWrapper::createIterator(const spi::Bucket &bucket, FieldSetSP field } spi::IterateResult -ProviderErrorWrapper::iterate(spi::IteratorId iteratorId, uint64_t maxByteSize, spi::Context& context) const +ProviderErrorWrapper::iterate(spi::IteratorId iteratorId, uint64_t maxByteSize) const { - return checkResult(_impl.iterate(iteratorId, maxByteSize, context)); + return checkResult(_impl.iterate(iteratorId, maxByteSize)); } spi::Result -ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& context) +ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId) { - return checkResult(_impl.destroyIterator(iteratorId, context)); + return checkResult(_impl.destroyIterator(iteratorId)); } void -ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::OperationComplete::UP onComplete) noexcept { onComplete->addResultHandler(this); - _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); + _impl.deleteBucketAsync(bucket, std::move(onComplete)); } void -ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept +ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::OperationComplete::UP onComplete) noexcept { onComplete->addResultHandler(this); - _impl.createBucketAsync(bucket, context, std::move(onComplete)); + _impl.createBucketAsync(bucket, std::move(onComplete)); } spi::BucketIdListResult @@ -120,17 +120,15 @@ ProviderErrorWrapper::getModifiedBuckets(BucketSpace bucketSpace) const } spi::Result -ProviderErrorWrapper::split(const spi::Bucket& source, const spi::Bucket& target1, - const spi::Bucket& target2, spi::Context& context) +ProviderErrorWrapper::split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2) { - return checkResult(_impl.split(source, target1, target2, context)); + return checkResult(_impl.split(source, target1, target2)); } spi::Result -ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source2, - const spi::Bucket& target, spi::Context& context) +ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target) { - return checkResult(_impl.join(source1, source2, target, context)); + return checkResult(_impl.join(source1, source2, target)); } std::unique_ptr @@ -140,41 +138,41 @@ ProviderErrorWrapper::register_resource_usage_listener(spi::IResourceUsageListen } spi::Result -ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, spi::Context& context) +ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts) { - return checkResult(_impl.removeEntry(bucket, ts, context)); + return checkResult(_impl.removeEntry(bucket, ts)); } void ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc, - spi::Context &context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete)); + _impl.putAsync(bucket, ts, std::move(doc), std::move(onComplete)); } void ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, std::vector ids, - spi::Context & context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.removeAsync(bucket, std::move(ids), context, std::move(onComplete)); + _impl.removeAsync(bucket, std::move(ids), std::move(onComplete)); } void ProviderErrorWrapper::removeIfFoundAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId, - spi::Context & context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.removeIfFoundAsync(bucket, ts, docId, context, std::move(onComplete)); + _impl.removeIfFoundAsync(bucket, ts, docId, std::move(onComplete)); } void ProviderErrorWrapper::updateAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentUpdateSP upd, - spi::Context &context, spi::OperationComplete::UP onComplete) + spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.updateAsync(bucket, ts, std::move(upd), context, std::move(onComplete)); + _impl.updateAsync(bucket, ts, std::move(upd), std::move(onComplete)); } std::unique_ptr diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 7285c405d5c..7bd406a8758 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -47,23 +47,23 @@ public: spi::CreateIteratorResult createIterator(const spi::Bucket &bucket, FieldSetSP, const spi::Selection &, spi::IncludedVersions versions, spi::Context &context) override; - spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; - spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; + spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize) const override; + spi::Result destroyIterator(spi::IteratorId) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; - spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; - spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; + spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2) override; + spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target) override; std::unique_ptr register_resource_usage_listener(spi::IResourceUsageListener& listener) override; - spi::Result removeEntry(const spi::Bucket&, spi::Timestamp, spi::Context&) override; + spi::Result removeEntry(const spi::Bucket&, spi::Timestamp) override; void register_error_listener(std::shared_ptr listener); - void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override; - void removeAsync(const spi::Bucket&, std::vector, spi::Context&, spi::OperationComplete::UP) override; - void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; - void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; + void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, std::vector, spi::OperationComplete::UP) override; + void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::OperationComplete::UP) override; + void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; - void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; - void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override; + void createBucketAsync(const spi::Bucket&, spi::OperationComplete::UP) noexcept override; + void deleteBucketAsync(const spi::Bucket&, spi::OperationComplete::UP) noexcept override; std::unique_ptr register_executor(std::shared_ptr executor) override; private: template diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index 74813e2e891..e83d460f47a 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -83,7 +83,7 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t spi::Bucket b = spi::Bucket(cmd.getBucket()); const std::vector & tokens = cmd.getRevertTokens(); for (const api::Timestamp & token : tokens) { - spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context()); + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token)); } return tracker; } @@ -92,7 +92,7 @@ MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.visit); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context())); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize())); if (tracker->checkForError(result)) { auto reply = std::make_shared(cmd); reply->getEntries() = result.steal_entries(); diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp index d5b44cc1911..f86a65efd91 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -68,8 +68,7 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1)); PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2)); - spi::Result result = _spi.split(spiBucket, spi::Bucket(target1), - spi::Bucket(target2), tracker->context()); + spi::Result result = _spi.split(spiBucket, spi::Bucket(target1), spi::Bucket(target2)); if (result.hasError()) { tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage()); return tracker; @@ -124,7 +123,7 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker spi::Bucket createTarget(target.second.bucket); LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", createTarget.toString().c_str()); - _spi.createBucket(createTarget, tracker->context()); + _spi.createBucket(createTarget); } splitReply.getSplitInfo().emplace_back(target.second.bucket.getBucketId(), target.first->getBucketInfo()); @@ -203,11 +202,7 @@ SplitJoinHandler::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker lock2 = _env.lockAndGetDisk(secondBucket); } - spi::Result result = - _spi.join(spi::Bucket(firstBucket), - spi::Bucket(secondBucket), - spi::Bucket(destBucket), - tracker->context()); + spi::Result result = _spi.join(spi::Bucket(firstBucket), spi::Bucket(secondBucket), spi::Bucket(destBucket)); if (!tracker->checkForError(result)) { return tracker; } -- cgit v1.2.3