diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-28 08:24:57 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-29 14:14:03 +0000 |
commit | eb7b71781ca079b5577a13b300beafee388bc1ce (patch) | |
tree | 8a5194ed759a8fc8433fef14118e67ac6bfc2632 | |
parent | 9499865f8a43aa097841606795a2bea8d0273ef9 (diff) |
- Add async interface to put
- Use MessageTracker for keeping context.
- implement putAsync, but still use it synchronously.
49 files changed, 681 insertions, 611 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 6834f453695..3f05ed36802 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -435,8 +435,7 @@ DummyPersistence::getBucketInfo(const Bucket& b) const } Result -DummyPersistence::put(const Bucket& b, Timestamp t, const Document::SP& doc, - Context&) +DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "put(%s, %" PRIu64 ", %s)", @@ -461,14 +460,13 @@ DummyPersistence::put(const Bucket& b, Timestamp t, const Document::SP& doc, LOG(spam, "Inserting document %s", doc->toString(true).c_str()); - DocEntry::UP entry(new DocEntry(t, NONE, Document::UP(doc->clone()))); + auto entry = std::make_unique<DocEntry>(t, NONE, Document::UP(doc->clone())); (*bc)->insert(std::move(entry)); return Result(); } Result -DummyPersistence::maintain(const Bucket& b, - MaintenanceLevel) +DummyPersistence::maintain(const Bucket& b, MaintenanceLevel) { assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); if (_simulateMaintainFailure) { @@ -489,10 +487,7 @@ DummyPersistence::maintain(const Bucket& b, } RemoveResult -DummyPersistence::remove(const Bucket& b, - Timestamp t, - const DocumentId& did, - Context&) +DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "remove(%s, %" PRIu64 ", %s)", @@ -518,10 +513,7 @@ DummyPersistence::remove(const Bucket& b, } GetResult -DummyPersistence::get(const Bucket& b, - const document::FieldSet& fieldSet, - const DocumentId& did, - Context&) const +DummyPersistence::get(const Bucket& b, const document::FieldSet& fieldSet, const DocumentId& did, Context&) const { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "get(%s, %s)", diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c97aab822ac..88e17a90a98 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -153,16 +153,9 @@ public: Result setClusterState(BucketSpace bucketSpace, const ClusterState& newState) override; Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; BucketInfoResult getBucketInfo(const Bucket&) const override; - Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) override; - GetResult get(const Bucket&, - const document::FieldSet& fieldSet, - const DocumentId&, - Context&) const override; - - RemoveResult remove(const Bucket& b, - Timestamp t, - const DocumentId& did, - Context&) override; + Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; + GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override; + RemoveResult remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fs, @@ -176,15 +169,9 @@ public: Result createBucket(const Bucket&, Context&) override; Result deleteBucket(const Bucket&, Context&) override; - Result split(const Bucket& source, - const Bucket& target1, - const Bucket& target2, - Context&) override; + Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; - Result join(const Bucket& source1, - const Bucket& source2, - const Bucket& target, - Context&) override; + Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; Result revert(const Bucket&, Timestamp, Context&); Result maintain(const Bucket& bucket, MaintenanceLevel level) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index e35a6a74bde..e7abe137b89 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -31,7 +31,7 @@ AbstractPersistenceProvider::update(const Bucket& bucket, Timestamp ts, upd->applyTo(*docToUpdate); - Result putResult = put(bucket, ts, docToUpdate, context); + Result putResult = put(bucket, ts, std::move(docToUpdate), context); if (putResult.hasError()) { return UpdateResult(putResult.getErrorCode(), diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h new file mode 100644 index 00000000000..1a548e613dd --- /dev/null +++ b/persistence/src/vespa/persistence/spi/operationcomplete.h @@ -0,0 +1,22 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <memory> + +namespace storage::spi { + +class Result; + +/** + * This is the callback interface when using the async operations + * in the persistence provider. + */ +class OperationComplete +{ +public: + using UP = std::unique_ptr<OperationComplete>; + virtual ~OperationComplete() = default; + virtual void onComplete(std::unique_ptr<Result> result) = 0; +}; + +}
\ No newline at end of file diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 61d141c0229..02fb1bb4719 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -1,9 +1,35 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistenceprovider.h" +#include <future> namespace storage::spi { PersistenceProvider::~PersistenceProvider() = default; +class CatchResult : public OperationComplete { +public: + std::future<Result::UP> waitResult() { + return promisedResult.get_future(); + } + void onComplete(Result::UP result) override { + promisedResult.set_value(std::move(result)); + } +private: + std::promise<Result::UP> promisedResult; +}; +Result +PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->waitResult(); + putAsync(bucket, timestamp, std::move(doc), context, std::move(catcher)); + return *future.get(); +} +void +PersistenceProvider::putAsync(const Bucket &bucket, Timestamp timestamp, DocumentSP doc, Context &context, + OperationComplete::UP onComplete) { + Result result = put(bucket, timestamp, std::move(doc), context); + onComplete->onComplete(std::make_unique<Result>(result)); +} + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index c70d5e3f1c3..70645b31902 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -10,6 +10,7 @@ #include "result.h" #include "selection.h" #include "clusterstate.h" +#include "operationcomplete.h" namespace document { class FieldSet; } @@ -109,7 +110,8 @@ struct PersistenceProvider /** * Store the given document at the given microsecond time. */ - virtual Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) = 0; + virtual Result put(const Bucket&, Timestamp, DocumentSP, Context&); + virtual void putAsync(const Bucket &, Timestamp , DocumentSP , Context &, OperationComplete::UP ); /** * This remove function assumes that there exist something to be removed. diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp index d4ec9cee395..511d0e3fee8 100644 --- a/searchcore/src/apps/proton/downpersistence.cpp +++ b/searchcore/src/apps/proton/downpersistence.cpp @@ -19,9 +19,7 @@ DownPersistence::DownPersistence(const vespalib::string &downReason) { } -DownPersistence::~DownPersistence() -{ -} +DownPersistence::~DownPersistence() = default; Result DownPersistence::initialize() @@ -40,8 +38,7 @@ DownPersistence::getPartitionStates() const BucketIdListResult DownPersistence::listBuckets(BucketSpace, PartitionId) const { - return BucketIdListResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result @@ -59,30 +56,25 @@ DownPersistence:: setActiveState(const Bucket&, BucketInfo::ActiveState) BucketInfoResult DownPersistence:: getBucketInfo(const Bucket&) const { - return BucketInfoResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return BucketInfoResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result -DownPersistence::put(const Bucket&, Timestamp, const Document::SP&, Context&) +DownPersistence::put(const Bucket&, Timestamp, Document::SP, Context&) { return errorResult; } RemoveResult -DownPersistence:: remove(const Bucket&, Timestamp, - const DocumentId&, Context&) +DownPersistence:: remove(const Bucket&, Timestamp, const DocumentId&, Context&) { - return RemoveResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } RemoveResult -DownPersistence::removeIfFound(const Bucket&, Timestamp, - const DocumentId&, Context&) +DownPersistence::removeIfFound(const Bucket&, Timestamp,const DocumentId&, Context&) { - return RemoveResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result @@ -91,35 +83,28 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&) return errorResult; } -UpdateResult DownPersistence::update(const Bucket&, Timestamp, - const DocumentUpdate::SP&, Context&) +UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&) { - return UpdateResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } GetResult -DownPersistence::get(const Bucket&, const document::FieldSet&, - const DocumentId&, Context&) const +DownPersistence::get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const { - return GetResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return GetResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } CreateIteratorResult DownPersistence::createIterator(const Bucket&, const document::FieldSet&, - const Selection&, IncludedVersions, - Context&) + const Selection&, IncludedVersions, Context&) { - return CreateIteratorResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return CreateIteratorResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } IterateResult DownPersistence::iterate(IteratorId, uint64_t, Context&) const { - return IterateResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return IterateResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result @@ -144,8 +129,7 @@ DownPersistence::deleteBucket(const Bucket&, Context&) BucketIdListResult DownPersistence::getModifiedBuckets(BucketSpace) const { - return BucketIdListResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h index 8cdac7aaa1b..2ae0605fdb6 100644 --- a/searchcore/src/apps/proton/downpersistence.h +++ b/searchcore/src/apps/proton/downpersistence.h @@ -32,7 +32,7 @@ public: Result setClusterState(BucketSpace, const ClusterState&) override; Result setActiveState(const Bucket&, BucketInfo::ActiveState) override; BucketInfoResult getBucketInfo(const Bucket&) const override; - Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) override; + Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; Result removeEntry(const Bucket&, Timestamp, Context&) override; diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 55d524519b8..e1785e1e48d 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -255,8 +255,7 @@ public: storage::spi::Timestamp ts(0); DbDocumentId dbdId(lid); DbDocumentId prevDbdId(0); - document::Document::SP xdoc(new document::Document(doc)); - PutOperation op(bucketId, ts, xdoc); + PutOperation op(bucketId, ts, std::make_shared<document::Document>(doc)); op.setSerialNum(serialNum); op.setDbDocumentId(dbdId); op.setPrevDbDocumentId(prevDbdId); diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 796ec4436fe..18235116d27 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -544,7 +544,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture) TEST_F("require that outdated put is ignored", FeedHandlerFixture) { DocumentContext doc_context("id:ns:searchdocument::foo", *f.schema.builder); - auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), doc_context.doc); + auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), std::move(doc_context.doc)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token), std::move(op)); @@ -673,7 +673,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF f.writeFilter._message = "Attribute resource limit reached"; DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); - auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc); + auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), std::move(docCtx.doc)); FeedTokenContext token; f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.put_count); @@ -801,7 +801,7 @@ TEST_F("require that put with different document type repo is ok", FeedHandlerFi TwoFieldsSchemaContext schema; DocumentContext doc_context("id:ns:searchdocument::foo", *schema.builder); auto op = std::make_unique<PutOperation>(doc_context.bucketId, - Timestamp(10), doc_context.doc); + Timestamp(10), std::move(doc_context.doc)); FeedTokenContext token_context; EXPECT_EQUAL(schema.getRepo().get(), op->getDocument()->getRepo()); EXPECT_NOT_EQUAL(f.schema.getRepo().get(), op->getDocument()->getRepo()); diff --git a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp index 5fffd70f11d..53fdcb7757f 100644 --- a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp +++ b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp @@ -190,7 +190,7 @@ TEST("require that toString() on derived classes are meaningful") PutOperation().toString()); EXPECT_EQUAL("Put(id::::, BucketId(0x000000000000002a), timestamp=10, dbdId=(subDbId=0, lid=0), " "prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)", - PutOperation(bucket_id1, timestamp, doc).toString()); + PutOperation(bucket_id1, timestamp, std::move(doc)).toString()); EXPECT_EQUAL("Remove(id::::, BucketId(0x0000000000000000), timestamp=0, dbdId=(subDbId=0, lid=0), " "prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)", diff --git a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp index 35590cc68f6..0c72d11899a 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp @@ -15,7 +15,7 @@ using HandlerSnapshot = PersistenceHandlerMap::HandlerSnapshot; struct DummyPersistenceHandler : public IPersistenceHandler { using SP = std::shared_ptr<DummyPersistenceHandler>; void initialize() override {} - void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::Document::SP &) override {} + void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, DocumentSP) override {} void handleUpdate(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentUpdate::SP &) override {} void handleRemove(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentId &) override {} void handleListBuckets(IBucketIdListResultHandler &) override {} diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 19d9d41c3e4..4fb4abcb7c5 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -9,7 +9,6 @@ #include <vespa/document/update/documentupdate.h> #include <vespa/persistence/spi/documentselection.h> #include <vespa/persistence/spi/test.h> -#include <vespa/persistence/spi/test.h> #include <vespa/searchcore/proton/persistenceengine/bucket_guard.h> #include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> @@ -199,7 +198,7 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer { void initialize() override { initialized = true; } void handlePut(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const document::Document::SP& doc) override { + Timestamp timestamp, DocumentSP doc) override { token->setResult(ResultUP(new storage::spi::Result()), false); handle(token, bucket, timestamp, doc->getId()); } diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index b6223dd41ab..e0bc7cb551f 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -35,4 +35,8 @@ State::fail() } } +OwningState::~OwningState() { + ack(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index ab3fdea3345..363886b9f31 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -15,36 +15,53 @@ typedef std::unique_ptr<storage::spi::Result> ResultUP; * instance of this class is passed to every invokation of the IFeedHandler. */ namespace feedtoken { - class ITransport { - public: - virtual ~ITransport() { } - virtual void send(ResultUP result, bool documentWasFound) = 0; - }; - - class State : public search::IDestructorCallback { - public: - State(const State &) = delete; - State & operator = (const State &) = delete; - State(ITransport & transport); - ~State() override; - void fail(); - void setResult(ResultUP result, bool documentWasFound) { - _documentWasFound = documentWasFound; - _result = std::move(result); - } - const storage::spi::Result &getResult() { return *_result; } - private: - void ack(); - ITransport &_transport; - ResultUP _result; - bool _documentWasFound; - std::atomic<bool> _alreadySent; - }; - - inline std::shared_ptr<State> - make(ITransport & latch) { - return std::make_shared<State>(latch); + +class ITransport { +public: + virtual ~ITransport() { } + virtual void send(ResultUP result, bool documentWasFound) = 0; +}; + +class State : public search::IDestructorCallback { +public: + State(const State &) = delete; + State & operator = (const State &) = delete; + State(ITransport & transport); + ~State() override; + void fail(); + void setResult(ResultUP result, bool documentWasFound) { + _documentWasFound = documentWasFound; + _result = std::move(result); } + const storage::spi::Result &getResult() { return *_result; } +protected: + void ack(); +private: + ITransport &_transport; + ResultUP _result; + bool _documentWasFound; + std::atomic<bool> _alreadySent; +}; +class OwningState : public State { +public: + OwningState(std::unique_ptr<ITransport> transport) + : State(*transport), + _owned(std::move(transport)) + {} + ~OwningState() override; +private: + std::unique_ptr<ITransport> _owned; +}; + +inline std::shared_ptr<State> +make(ITransport & latch) { + return std::make_shared<State>(latch); +} +inline std::shared_ptr<State> +make(std::unique_ptr<ITransport> transport) { + return std::make_shared<OwningState>(std::move(transport)); +} + } using FeedToken = std::shared_ptr<feedtoken::State>; diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp index f1161c8ebdd..56224889b78 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp @@ -26,7 +26,7 @@ DocumentOperation::DocumentOperation(Type type) } -DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const Timestamp ×tamp) +DocumentOperation::DocumentOperation(Type type, BucketId bucketId, Timestamp timestamp) : FeedOperation(type), _bucketId(bucketId), _timestamp(timestamp), @@ -38,6 +38,8 @@ DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const { } +DocumentOperation::~DocumentOperation() = default; + void DocumentOperation::assertValidBucketId(const document::DocumentId &docId) const { diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h index 6847dbfd943..044d44b8276 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h @@ -22,15 +22,14 @@ protected: DocumentOperation(Type type); - DocumentOperation(Type type, const document::BucketId &bucketId, - const storage::spi::Timestamp ×tamp); + DocumentOperation(Type type, document::BucketId bucketId, storage::spi::Timestamp timestamp); void assertValidBucketId(const document::DocumentId &docId) const; void assertValidBucketId(const document::GlobalId &docId) const; vespalib::string docArgsToString() const; public: - ~DocumentOperation() override {} + ~DocumentOperation() override; const document::BucketId &getBucketId() const { return _bucketId; } storage::spi::Timestamp getTimestamp() const { return _timestamp; } diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp index efac7297c67..dd001348093 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp @@ -17,16 +17,12 @@ PutOperation::PutOperation() { } -PutOperation::PutOperation(const BucketId &bucketId, - const Timestamp ×tamp, - const Document::SP &doc) - : DocumentOperation(FeedOperation::PUT, - bucketId, - timestamp), - _doc(doc) +PutOperation::PutOperation(BucketId bucketId, Timestamp timestamp, Document::SP doc) + : DocumentOperation(FeedOperation::PUT, bucketId, timestamp), + _doc(std::move(doc)) { } -PutOperation::~PutOperation() { } +PutOperation::~PutOperation() = default; void PutOperation::serialize(vespalib::nbostream &os) const @@ -40,8 +36,7 @@ PutOperation::serialize(vespalib::nbostream &os) const void -PutOperation::deserialize(vespalib::nbostream &is, - const DocumentTypeRepo &repo) +PutOperation::deserialize(vespalib::nbostream &is, const DocumentTypeRepo &repo) { DocumentOperation::deserialize(is, repo); size_t oldSize = is.size(); diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h index 33330692fab..5ca6d755e49 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h @@ -12,17 +12,16 @@ class PutOperation : public DocumentOperation public: PutOperation(); - PutOperation(const document::BucketId &bucketId, - const storage::spi::Timestamp ×tamp, - const DocumentSP &doc); - virtual ~PutOperation(); + PutOperation(document::BucketId bucketId, + storage::spi::Timestamp timestamp, + DocumentSP doc); + ~PutOperation() override; const DocumentSP &getDocument() const { return _doc; } void assertValid() const; - virtual void serialize(vespalib::nbostream &os) const override; - virtual void deserialize(vespalib::nbostream &is, - const document::DocumentTypeRepo &repo) override; + void serialize(vespalib::nbostream &os) const override; + void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override; void deserializeDocument(const document::DocumentTypeRepo &repo); - virtual vespalib::string toString() const override; + vespalib::string toString() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h index 82a9c8174fa..c95f51b29dc 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h @@ -42,7 +42,7 @@ public: virtual void initialize() = 0; virtual void handlePut(FeedToken token, const storage::spi::Bucket &bucket, - storage::spi::Timestamp timestamp, const DocumentSP &doc) = 0; + storage::spi::Timestamp timestamp, DocumentSP doc) = 0; virtual void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket, storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) = 0; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 2ede5d45f7e..48074d491c8 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -64,7 +64,7 @@ public: if (result.hasError()) { std::lock_guard<std::mutex> guard(_lock); if (_result.hasError()) { - _result = TransportLatch::mergeErrorResults(_result, result); + _result = TransportMerger::mergeErrorResults(_result, result); } else { _result = result; } @@ -319,34 +319,32 @@ PersistenceEngine::getBucketInfo(const Bucket& b) const } -Result -PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::SP& doc, Context&) +void +PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::DocumentSP doc, Context &, OperationComplete::UP onComplete) { if (!_writeFilter.acceptWriteOperation()) { IResourceWriteFilter::State state = _writeFilter.getAcceptState(); if (!state.acceptWriteOperation()) { - return Result(Result::ErrorType::RESOURCE_EXHAUSTED, + return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::RESOURCE_EXHAUSTED, make_string("Put operation rejected for document '%s': '%s'", - doc->getId().toString().c_str(), state.message().c_str())); + doc->getId().toString().c_str(), state.message().c_str()))); } } std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); DocTypeName docType(doc->getType()); - LOG(spam, "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))", b.toString().c_str(), static_cast<uint64_t>(t.getValue()), + LOG(spam, "putAsync(%s, %" PRIu64 ", (\"%s\", \"%s\"))", bucket.toString().c_str(), static_cast<uint64_t>(ts.getValue()), docType.toString().c_str(), doc->getId().toString().c_str()); if (!doc->getId().hasDocType()) { - return Result(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())); + return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, + make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str()))); } - IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); + IPersistenceHandler * handler = getHandler(rguard, bucket.getBucketSpace(), docType); if (!handler) { - return Result(Result::ErrorType::PERMANENT_ERROR, - make_string("No handler for document type '%s'", docType.toString().c_str())); + return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, + make_string("No handler for document type '%s'", docType.toString().c_str()))); } - TransportLatch latch(1); - handler->handlePut(feedtoken::make(latch), b, t, doc); - latch.await(); - return latch.getResult(); + auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc)); } PersistenceEngine::RemoveResult diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 5d3be07c532..d28a00b909e 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -38,6 +38,7 @@ private: using Timestamp = storage::spi::Timestamp; using TimestampList = storage::spi::TimestampList; using UpdateResult = storage::spi::UpdateResult; + using OperationComplete = storage::spi::OperationComplete; struct IteratorEntry { PersistenceHandlerSequence handler_sequence; @@ -100,7 +101,7 @@ public: Result setClusterState(BucketSpace bucketSpace, const ClusterState& calc) override; Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; BucketInfoResult getBucketInfo(const Bucket&) const override; - Result put(const Bucket&, Timestamp, const std::shared_ptr<document::Document>&, Context&) override; + void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; RemoveResult remove(const Bucket&, Timestamp, const document::DocumentId&, Context&) override; UpdateResult update(const Bucket&, Timestamp, const std::shared_ptr<document::DocumentUpdate>&, Context&) override; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp index 4a5dfb0a2a5..64159841efe 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp @@ -8,10 +8,50 @@ using storage::spi::Result; namespace proton { +std::unique_ptr<std::mutex> createOptionalLock(bool needLocking) { + return needLocking + ? std::make_unique<std::mutex>() + : std::unique_ptr<std::mutex>(); +} +TransportMerger::TransportMerger(bool needLocking) + : _result(), + _lock(createOptionalLock(needLocking)) +{ +} +TransportMerger::~TransportMerger() = default; + +void +TransportMerger::mergeResult(ResultUP result, bool documentWasFound) { + if (_lock) { + std::lock_guard<std::mutex> guard(*_lock); + mergeWithLock(std::move(result), documentWasFound); + } else { + mergeWithLock(std::move(result), documentWasFound); + } +} + +void +TransportMerger::mergeWithLock(ResultUP result, bool documentWasFound) { + if (!_result) { + _result = std::move(result); + } else if (result->hasError()) { + _result = std::make_unique<Result>(mergeErrorResults(*_result, *result)); + } else if (documentWasFound) { + _result = std::move(result); + } + completeIfDone(); +} + +Result +TransportMerger::mergeErrorResults(const Result &lhs, const Result &rhs) +{ + Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode(); + return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str())); +} + TransportLatch::TransportLatch(uint32_t cnt) - : _latch(cnt), - _lock(), - _result() + : TransportMerger(cnt > 1), + _latch(cnt) { if (cnt == 0u) { _result = std::make_unique<Result>(); @@ -23,24 +63,33 @@ TransportLatch::~TransportLatch() = default; void TransportLatch::send(ResultUP result, bool documentWasFound) { - { - std::lock_guard<std::mutex> guard(_lock); - if (!_result) { - _result = std::move(result); - } else if (result->hasError()) { - _result.reset(new Result(mergeErrorResults(*_result, *result))); - } else if (documentWasFound) { - _result = std::move(result); - } - } + mergeResult(std::move(result), documentWasFound); _latch.countDown(); } -Result -TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs) +AsyncTranportContext::AsyncTranportContext(uint32_t cnt, OperationComplete::UP onComplete) + : TransportMerger(cnt > 1), + _countDown(cnt), + _onComplete(std::move(onComplete)) { - Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode(); - return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str())); + if (cnt == 0u) { + _onComplete->onComplete(std::make_unique<Result>()); + } +} + +void +AsyncTranportContext::completeIfDone() { + _countDown--; + if (_countDown == 0) { + _onComplete->onComplete(std::make_unique<Result>()); + } +} +AsyncTranportContext::~AsyncTranportContext() = default; + +void +AsyncTranportContext::send(ResultUP result, bool documentWasFound) +{ + mergeResult(std::move(result), documentWasFound); } } // proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h index 2fb55b14fdf..b3b3bc43aa7 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h @@ -13,18 +13,33 @@ namespace proton { * Implementation of FeedToken::ITransport for handling the async reply for an operation. * Uses an internal count down latch to keep track the number of outstanding replies. */ -class TransportLatch : public feedtoken::ITransport { -private: + +class TransportMerger : public feedtoken::ITransport { +public: using Result = storage::spi::Result; + static Result mergeErrorResults(const Result &lhs, const Result &rhs); +protected: + TransportMerger(bool needLocking); + ~TransportMerger() override; + void mergeResult(ResultUP result, bool documentWasFound); + virtual void completeIfDone() { } // Called with lock held if necessary on every merge + ResultUP _result; + +private: + void mergeWithLock(ResultUP result, bool documentWasFound); + std::unique_ptr<std::mutex> _lock; +}; +class TransportLatch : public TransportMerger { +private: + using UpdateResult = storage::spi::UpdateResult; using RemoveResult = storage::spi::RemoveResult; vespalib::CountDownLatch _latch; - std::mutex _lock; - ResultUP _result; + public: TransportLatch(uint32_t cnt); - ~TransportLatch(); + ~TransportLatch() override; void send(ResultUP result, bool documentWasFound) override; void await() { _latch.await(); @@ -38,7 +53,25 @@ public: const RemoveResult &getRemoveResult() const { return dynamic_cast<const RemoveResult &>(*_result); } - static Result mergeErrorResults(const Result &lhs, const Result &rhs); + +}; + +/** + * Implementation of FeedToken::ITransport for async handling of the async reply for an operation. + * Uses an internal count to keep track the number of the outstanding replies. + */ +class AsyncTranportContext : public TransportMerger { +private: + using Result = storage::spi::Result; + using OperationComplete = storage::spi::OperationComplete; + + int _countDown; + OperationComplete::UP _onComplete; + void completeIfDone() override; +public: + AsyncTranportContext(uint32_t cnt, OperationComplete::UP); + ~AsyncTranportContext() override; + void send(ResultUP result, bool documentWasFound) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 362de7ee780..4a87ee0009c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -154,7 +154,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o auto doc = make_shared<Document>(op.getUpdate()->getType(), op.getUpdate()->getId()); doc->setRepo(*_activeFeedView->getDocumentTypeRepo()); op.getUpdate()->applyTo(*doc); - PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); + PutOperation putOp(op.getBucketId(), op.getTimestamp(), std::move(doc)); _activeFeedView->preparePut(putOp); storeOperation(putOp, token); if (token) { diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp index 16af4e87795..ba0db2bb77d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp @@ -39,9 +39,9 @@ PersistenceHandlerProxy::initialize() } void -PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentSP &doc) +PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, DocumentSP doc) { - auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, doc); + auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, std::move(doc)); _feedHandler.handleOperation(token, std::move(op)); } diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h index 3e3b3dd6fb6..5a82b46f91c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h @@ -25,7 +25,7 @@ public: void initialize() override; void handlePut(FeedToken token, const storage::spi::Bucket &bucket, - storage::spi::Timestamp timestamp, const DocumentSP &doc) override; + storage::spi::Timestamp timestamp, DocumentSP doc) override; void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket, storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) override; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index fb80c25bfb7..862d1fb758a 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -91,14 +91,12 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const } spi::Result -PersistenceProviderWrapper::put(const spi::Bucket& bucket, - spi::Timestamp timestamp, - const document::Document::SP& doc, - spi::Context& context) +PersistenceProviderWrapper::put(const spi::Bucket& bucket, spi::Timestamp timestamp, + document::Document::SP doc, spi::Context& context) { LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")"); CHECK_ERROR(spi::Result, FAIL_PUT); - return _spi.put(bucket, timestamp, doc, context); + return _spi.put(bucket, timestamp, std::move(doc), context); } spi::RemoveResult diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 9bd3653e8a1..25365e64bfc 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -88,7 +88,7 @@ public: spi::PartitionStateListResult getPartitionStates() const override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace, spi::PartitionId) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; - spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override; + spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; diff --git a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp index 0dd3285e5f3..c10681405e7 100644 --- a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp +++ b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp @@ -27,13 +27,11 @@ TEST_F(DiskMoveOperationHandlerTest, simple) { doPutOnDisk(3, 4, spi::Timestamp(1000 + i)); } - DiskMoveOperationHandler diskMoveHandler( - getEnv(3), - getPersistenceProvider()); - BucketDiskMoveCommand move(makeDocumentBucket(document::BucketId(16, 4)), 3, 4); - + DiskMoveOperationHandler diskMoveHandler(getEnv(3),getPersistenceProvider()); + document::Bucket bucket = makeDocumentBucket(document::BucketId(16, 4)); + auto move = std::make_shared<BucketDiskMoveCommand>(bucket, 3, 4); spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - diskMoveHandler.handleBucketDiskMove(move, context); + diskMoveHandler.handleBucketDiskMove(*move, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), move)); EXPECT_EQ("BucketId(0x4000000000000004): 10,4", getBucketStatus(document::BucketId(16,4))); diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index ba344971c3b..e9f878bfe1e 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -46,13 +46,8 @@ public: _deleteBucketInvocations(0) {} - spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp, - const document::Document::SP& doc, spi::Context& context) override + spi::Result put(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&) override { - (void) bucket; - (void) timestamp; - (void) doc; - (void) context; _queueBarrier.await(); // message abort stage with active opertion in disk queue std::this_thread::sleep_for(75ms); diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index dffd4ef1768..035da326d48 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -149,6 +149,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; }; + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); + } + std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -203,9 +208,9 @@ TEST_F(MergeHandlerTest, merge_bucket_command) { MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - cmd.setSourceIndex(1234); - MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + cmd->setSourceIndex(1234); + MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); @@ -217,7 +222,7 @@ TEST_F(MergeHandlerTest, merge_bucket_command) { EXPECT_EQ(1, cmd2.getAddress()->getIndex()); EXPECT_EQ(1234, cmd2.getSourceIndex()); - tracker->generateReply(cmd); + tracker->generateReply(*cmd); EXPECT_FALSE(tracker->hasReply()); } @@ -228,8 +233,8 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Verifying that get bucket diff is sent on"); - api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); - MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context); + auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); + MessageTracker::UP tracker1 = handler.handleGetBucketDiff(*cmd, createTracker(cmd, _bucket)); api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP(); if (midChain) { @@ -277,8 +282,8 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Verifying that apply bucket diff is sent on"); - api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); - MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context); + auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp); + MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP(); if (midChain) { @@ -324,9 +329,9 @@ TEST_F(MergeHandlerTest, master_message_flow) { MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); @@ -425,8 +430,8 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); LOG(debug, "Handle a merge bucket command"); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd); @@ -505,9 +510,8 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize); applyBucketDiffCmd->getDiff() = applyDiff; - MergeHandler handler( - getPersistenceProvider(), getEnv(), maxChunkSize); - handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); + MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize); + handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); // Should not fill up more than chunk size allows for @@ -520,8 +524,8 @@ TEST_F(MergeHandlerTest, max_timestamp) { MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); @@ -534,8 +538,7 @@ MergeHandlerTest::fillDummyApplyDiff( std::vector<api::ApplyBucketDiffCommand::Entry>& diff) { document::TestDocMan docMan; - document::Document::SP doc( - docMan.createRandomDocumentAtLocation(_location)); + document::Document::SP doc(docMan.createRandomDocumentAtLocation(_location)); std::vector<char> headerBlob; { vespalib::nbostream stream; @@ -638,7 +641,8 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { providerWrapper.clearOperationLog(); try { - handler.handleApplyBucketDiff(*createDummyApplyDiff(6000), *_context); + auto cmd = createDummyApplyDiff(6000); + handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); @@ -648,15 +652,15 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { TEST_F(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler(getPersistenceProvider(), getEnv()); // Send merge for unknown bucket - api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); - MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); + MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } TEST_F(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler(getPersistenceProvider(), getEnv()); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd); @@ -682,8 +686,8 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { _nodes.emplace_back(0, false); _nodes.emplace_back(1, false); _nodes.emplace_back(2, false); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd); @@ -722,7 +726,7 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024); applyBucketDiffCmd->getDiff() = applyDiff; - auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); + auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); ASSERT_TRUE(applyBucketDiffReply.get()); @@ -809,10 +813,10 @@ void MergeHandlerTest::HandleMergeBucketInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { - api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleMergeBucket(cmd, context); + auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { @@ -841,17 +845,16 @@ void MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context& ) { - api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleGetBucketDiff(cmd, context); + auto cmd = std::make_shared<api::GetBucketDiffCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); - providerWrapper.setResult( - spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); + providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { @@ -874,12 +877,11 @@ void MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { ++_counter; - std::shared_ptr<api::ApplyBucketDiffCommand> cmd( - test.createDummyApplyDiff(100000 * _counter)); - handler.handleApplyBucketDiff(*cmd, context); + auto cmd = test.createDummyApplyDiff(100000 * _counter); + handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { @@ -904,8 +906,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); // Casual, in-place testing of bug 6752085. // This will fail if we give NaN to the metric in question. - EXPECT_TRUE(std::isfinite(getEnv()._metrics - .mergeAverageDataReceivedNeeded.getLast())); + EXPECT_TRUE(std::isfinite(getEnv()._metrics.mergeAverageDataReceivedNeeded.getLast())); } } @@ -913,10 +914,10 @@ void MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { - api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleMergeBucket(cmd, context); + auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); _diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); } @@ -974,13 +975,13 @@ void MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( MergeHandlerTest& test, MergeHandler& handler, - spi::Context& context) + spi::Context&) { ++_counter; _stub.clear(); if (getChainPos() == FRONT) { - api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); - handler.handleMergeBucket(cmd, context); + auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp); + handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); auto diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>(); auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4); diffCmd->getDiff() = dummyDiff->getDiff(); @@ -995,7 +996,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( // Pretend last node in chain has data and that it will be fetched when // chain is unwinded. auto cmd = test.createDummyApplyDiff(100000 * _counter, 0x4, false); - handler.handleApplyBucketDiff(*cmd, context); + handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); _applyCmd = test.fetchSingleMessage<api::ApplyBucketDiffCommand>(); } } @@ -1147,13 +1148,13 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024); applyBucketDiffCmd->getDiff() = applyDiff; - auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); + auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); ASSERT_TRUE(applyBucketDiffReply.get()); - api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); - handler.handleMergeBucket(cmd, *_context); + auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); + handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>(); diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 25c0a36a7f5..4ac9dfd7765 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -283,7 +283,7 @@ PersistenceTestUtils::doPut(const document::Document::SP& doc, spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); getPersistenceProvider().createBucket(b, context); - getPersistenceProvider().put(b, time, doc, context); + getPersistenceProvider().put(b, time, std::move(doc), context); } spi::UpdateResult diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index e418765ecac..3121bef61e5 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -37,6 +37,23 @@ struct PersistenceTestEnvironment { class PersistenceTestUtils : public testing::Test { public: + class NoBucketLock : public FileStorHandler::BucketLockInterface + { + public: + NoBucketLock(document::Bucket bucket) : _bucket(bucket) { } + const document::Bucket &getBucket() const override { + return _bucket; + } + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Shared; + } + static std::shared_ptr<NoBucketLock> make(document::Bucket bucket) { + return std::make_shared<NoBucketLock>(bucket); + } + private: + document::Bucket _bucket; + }; + std::unique_ptr<PersistenceTestEnvironment> _env; PersistenceTestUtils(); diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 98a2be6880d..1ec6a35fb1d 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -201,19 +201,20 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) } document::Document::SP doc(testDocMan.createRandomDocumentAtLocation( docloc, seed, docSize, docSize)); - spi.put(bucket, spi::Timestamp(1000 + i), doc, context); + spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context); } std::unique_ptr<PersistenceThread> thread(createPersistenceThread(0)); getNode().getStateUpdater().setClusterState( std::make_shared<lib::ClusterState>("distributor:1 storage:1")); - api::SplitBucketCommand cmd(makeDocumentBucket(document::BucketId(currentSplitLevel, 1))); - cmd.setMaxSplitBits(maxBits); - cmd.setMinSplitBits(minBits); - cmd.setMinByteSize(maxSize); - cmd.setMinDocCount(maxCount); - cmd.setSourceIndex(0); - MessageTracker::UP result(thread->handleSplitBucket(cmd, context)); + document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1)); + auto cmd = std::make_shared<api::SplitBucketCommand>(docBucket); + cmd->setMaxSplitBits(maxBits); + cmd->setMinSplitBits(minBits); + cmd->setMinByteSize(maxSize); + cmd->setMinDocCount(maxCount); + cmd->setSourceIndex(0); + MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { @@ -222,8 +223,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) auto& reply = dynamic_cast<api::SplitBucketReply&>(result->getReply()); std::set<std::string> expected; for (uint32_t i=0; i<resultBuckets; ++i) { - document::BucketId b(resultSplitLevel, - location | (i == 0 ? 0 : splitMask)); + document::BucketId b(resultSplitLevel, location | (i == 0 ? 0 : splitMask)); std::ostringstream ost; ost << b << " - " << b.getUsedBits(); expected.insert(ost.str()); diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 5462b4a5b0a..0d482ebe5b7 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -20,10 +20,10 @@ TEST_F(ProcessAllHandlerTest, remove_location) { doPut(4, spi::Timestamp(1234)); doPut(4, spi::Timestamp(2345)); - api::RemoveLocationCommand removeLocation("id.user == 4", makeDocumentBucket(bucketId)); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - auto tracker = handler.handleRemoveLocation(removeLocation, context); + auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", @@ -45,10 +45,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { doPut(doc, bucketId, spi::Timestamp(100 + i), 0); } - api::RemoveLocationCommand - removeLocation("testdoctype1.headerval % 2 == 0", makeDocumentBucket(bucketId)); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - auto tracker = handler.handleRemoveLocation(removeLocation, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); + auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" @@ -71,12 +70,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty document::BucketId bucketId(16, 4); doPut(4, spi::Timestamp(1234)); - api::RemoveLocationCommand - removeLocation("unknowndoctype.headerval % 2 == 0", makeDocumentBucket(bucketId)); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -86,11 +84,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio document::BucketId bucketId(16, 4); doPut(4, spi::Timestamp(1234)); - api::RemoveLocationCommand removeLocation("id.bogus != badgers", makeDocumentBucket(bucketId)); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -107,10 +105,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc doPut(doc, bucketId, spi::Timestamp(100 + i), 0); } - api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), - "testdoctype1.headerval % 2 == 0"); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0"); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -142,9 +139,9 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) true); } - api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true"); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -188,9 +185,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ doPut(doc, bucketId, spi::Timestamp(100 + i), 0); } - api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true"); - spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context); + document::Bucket bucket = makeDocumentBucket(bucketId); + auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 08555fe0627..864ab320527 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -29,6 +29,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { const document::StringFieldValue MATCHING_HEADER{"Some string with woofy dog as a substring"}; const document::StringFieldValue OLD_CONTENT{"Some old content"}; const document::StringFieldValue NEW_CONTENT{"Freshly pressed and squeezed content"}; + const document::Bucket BUCKET = makeDocumentBucket(BUCKET_ID); unique_ptr<PersistenceThread> thread; shared_ptr<document::Document> testDoc; @@ -39,6 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { : context(spi::LoadType(0, "default"), 0, 0) {} + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); + } + void SetUp() override { SingleDiskPersistenceTestUtils::SetUp(); @@ -57,7 +63,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { SingleDiskPersistenceTestUtils::TearDown(); } - std::unique_ptr<api::UpdateCommand> conditional_update_test( + std::shared_ptr<api::UpdateCommand> conditional_update_test( bool createIfMissing, api::Timestamp updateTimestamp); @@ -82,10 +88,10 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { // Conditionally replace document, but fail due to lack of woofy dog api::Timestamp timestampTwo = 1; - api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo); - setTestCondition(putTwo); + auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); + setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), + ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -102,10 +108,10 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { // Conditionally replace document with updated version, succeed in doing so api::Timestamp timestampTwo = 1; - api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo); - setTestCondition(putTwo); + auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); + setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -122,10 +128,10 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { // Conditionally remove document, fail in doing so api::Timestamp timestampTwo = 1; - api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo); - setTestCondition(remove); + auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); + setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), + ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -142,18 +148,17 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { // Conditionally remove document, succeed in doing so api::Timestamp timestampTwo = 1; - api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo); - setTestCondition(remove); + auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); + setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); } -std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test( - bool createIfMissing, - api::Timestamp updateTimestamp) +std::shared_ptr<api::UpdateCommand> +TestAndSetTest::conditional_update_test(bool createIfMissing, api::Timestamp updateTimestamp) { auto docUpdate = std::make_shared<document::DocumentUpdate>(_env->_testDocMan.getTypeRepo(), testDoc->getType(), testDocId); auto fieldUpdate = document::FieldUpdate(testDoc->getField("content")); @@ -161,7 +166,7 @@ std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test( docUpdate->addUpdate(fieldUpdate); docUpdate->setCreateIfNonExistent(createIfMissing); - auto updateUp = std::make_unique<api::UpdateCommand>(makeDocumentBucket(BUCKET_ID), docUpdate, updateTimestamp); + auto updateUp = std::make_unique<api::UpdateCommand>(BUCKET, docUpdate, updateTimestamp); setTestCondition(*updateUp); return updateUp; } @@ -172,7 +177,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -185,7 +190,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -197,7 +202,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -206,7 +211,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } @@ -215,10 +220,10 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { // Conditionally replace nonexisting document // Fail early since document selection is invalid api::Timestamp timestamp = 0; - api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); - put.setCondition(documentapi::TestAndSetCondition("bjarne")); + auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); + put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -226,11 +231,11 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { // Conditionally replace nonexisting document // Fail since no document exists to match with test and set api::Timestamp timestamp = 0; - api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); - setTestCondition(put); - thread->handlePut(put, context); + auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); + setTestCondition(*put); + thread->handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), + ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -253,8 +258,8 @@ TestAndSetTest::createTestDocument() document::Document::SP TestAndSetTest::retrieveTestDocument() { - api::GetCommand get(makeDocumentBucket(BUCKET_ID), testDocId, "[all]"); - auto tracker = thread->handleGet(get, context); + auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, "[all]"); + auto tracker = thread->handleGet(*get, createTracker(get, BUCKET)); assert(tracker->getResult() == api::ReturnCode::Result::OK); auto & reply = static_cast<api::GetReply &>(tracker->getReply()); @@ -273,8 +278,8 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta testDoc->setValue(testDoc->getField("hstringval"), MATCHING_HEADER); } - api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); - thread->handlePut(put, context); + auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); + thread->handlePut(*put, createTracker(put, BUCKET)); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp index e906cfc624f..18877bdf8f7 100644 --- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp +++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp @@ -7,20 +7,16 @@ LOG_SETUP(".persistence.diskmoveoperationhandler"); namespace storage { -DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env, - spi::PersistenceProvider& provider) +DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env, spi::PersistenceProvider& provider) : _env(env), _provider(provider) { } MessageTracker::UP -DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, - spi::Context& context) +DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, MessageTracker::UP tracker) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.movedBuckets, - _env._component.getClock())); + tracker->setMetric(_env._metrics.movedBuckets); document::Bucket bucket(cmd.getBucket()); uint32_t targetDisk(cmd.getDstDisk()); @@ -46,13 +42,10 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, deviceIndex, targetDisk); spi::Bucket from(bucket, spi::PartitionId(deviceIndex)); - spi::Bucket to(bucket, spi::PartitionId(targetDisk)); - spi::Result result( - _provider.move(from, spi::PartitionId(targetDisk), context)); + spi::Result result(_provider.move(from, spi::PartitionId(targetDisk), tracker->context())); if (result.hasError()) { - tracker->fail(api::ReturnCode::INTERNAL_FAILURE, - result.getErrorMessage()); + tracker->fail(api::ReturnCode::INTERNAL_FAILURE, result.getErrorMessage()); return tracker; } @@ -82,12 +75,7 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, } // Answer message, setting extra info such as filesize - tracker->setReply(std::shared_ptr<BucketDiskMoveReply>( - new BucketDiskMoveReply( - cmd, - bInfo, - sourceFileSize, - sourceFileSize))); + tracker->setReply(std::make_shared<BucketDiskMoveReply>(cmd, bInfo, sourceFileSize, sourceFileSize)); return tracker; } diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h index f0c4bbef66a..9e8d33fc802 100644 --- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h +++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h @@ -12,8 +12,7 @@ public: DiskMoveOperationHandler(PersistenceUtil&, spi::PersistenceProvider& provider); - MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&, - spi::Context&); + MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&, MessageTracker::UP tracker); private: PersistenceUtil& _env; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 4cb687bb753..3483b15dd0e 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -537,9 +537,10 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket, if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); - checkResult(_spi.put(bucket, timestamp, doc, context), + DocumentId docId = doc->getId(); + checkResult(_spi.put(bucket, timestamp, std::move(doc), context), bucket, - doc->getId(), + docId, "put"); } else { DocumentId docId(e._docName); @@ -903,12 +904,9 @@ public: }; MessageTracker::UP -MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, - spi::Context& context) +MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.mergeBuckets, - _env._component.getClock())); + tracker->setMetric(_env._metrics.mergeBuckets); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".", @@ -949,44 +947,33 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, tracker->fail(ReturnCode::BUSY, err); return tracker; } - checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); + checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - MergeStatus::SP s = MergeStatus::SP(new MergeStatus( + auto s = std::make_shared<MergeStatus>( _env._component.getClock(), cmd.getLoadType(), - cmd.getPriority(), cmd.getTrace().getLevel())); + cmd.getPriority(), cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->nodeList = cmd.getNodes(); s->maxTimestamp = Timestamp(cmd.getMaxTimestamp()); s->timeout = cmd.getTimeout(); s->startTime = framework::MilliSecTimer(_env._component.getClock()); - std::shared_ptr<api::GetBucketDiffCommand> cmd2( - new api::GetBucketDiffCommand(bucket.getBucket(), - s->nodeList, - s->maxTimestamp.getTime())); - if (!buildBucketInfoList(bucket, - cmd.getLoadType(), - s->maxTimestamp, - 0, - cmd2->getDiff(), - context)) - { + auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), s->nodeList, s->maxTimestamp.getTime()); + if (!buildBucketInfoList(bucket, cmd.getLoadType(), s->maxTimestamp, 0, cmd2->getDiff(), tracker->context())) { LOG(debug, "Bucket non-existing in db. Failing merge."); tracker->fail(ReturnCode::BUCKET_DELETED, "Bucket not found in buildBucketInfo step"); return tracker; } - _env._metrics.mergeMetadataReadLatency.addValue( - s->startTime.getElapsedTimeAsDouble()); + _env._metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble()); LOG(spam, "Sending GetBucketDiff %" PRIu64 " for %s to next node %u " "with diff of %u entries.", cmd2->getMsgId(), bucket.toString().c_str(), s->nodeList[1].index, uint32_t(cmd2->getDiff().size())); - cmd2->setAddress(createAddress(_env._component.getClusterName(), - s->nodeList[1].index)); + cmd2->setAddress(createAddress(_env._component.getClusterName(), s->nodeList[1].index)); cmd2->setPriority(s->context.getPriority()); cmd2->setTimeout(s->timeout); cmd2->setSourceIndex(cmd.getSourceIndex()); @@ -1129,15 +1116,12 @@ namespace { } MessageTracker::UP -MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, - spi::Context& context) +MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.getBucketDiff, - _env._component.getClock())); + tracker->setMetric(_env._metrics.getBucketDiff); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str()); - checkResult(_spi.createBucket(bucket, context), bucket, "create bucket"); + checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { tracker->fail(ReturnCode::BUSY, @@ -1151,7 +1135,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, framework::MilliSecTimer startTime(_env._component.getClock()); if (!buildBucketInfoList(bucket, cmd.getLoadType(), Timestamp(cmd.getMaxTimestamp()), - index, local, context)) + index, local, tracker->context())) { LOG(debug, "Bucket non-existing in db. Failing merge."); tracker->fail(ReturnCode::BUCKET_DELETED, @@ -1186,31 +1170,26 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, cmd.getMsgId(), bucket.toString().c_str(), cmd.getNodes()[index - 1].index, final.size(), local.size()); - api::GetBucketDiffReply* reply = new api::GetBucketDiffReply(cmd); - tracker->setReply(api::StorageReply::SP(reply)); + auto reply = std::make_shared<api::GetBucketDiffReply>(cmd); reply->getDiff().swap(final); + tracker->setReply(std::move(reply)); } else { // When not the last node in merge chain, we must save reply, and // send command on. MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - MergeStatus::SP s(new MergeStatus(_env._component.getClock(), + auto s = std::make_shared<MergeStatus>(_env._component.getClock(), cmd.getLoadType(), cmd.getPriority(), - cmd.getTrace().getLevel())); + cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); - s->pendingGetDiff = - api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd)); + s->pendingGetDiff = std::make_shared<api::GetBucketDiffReply>(cmd); s->pendingGetDiff->setPriority(cmd.getPriority()); - LOG(spam, "Sending GetBucketDiff for %s on to node %d, " - "added %zu new entries to diff.", + LOG(spam, "Sending GetBucketDiff for %s on to node %d, added %zu new entries to diff.", bucket.toString().c_str(), cmd.getNodes()[index + 1].index, local.size() - remote.size()); - std::shared_ptr<api::GetBucketDiffCommand> cmd2( - new api::GetBucketDiffCommand( - bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp())); - cmd2->setAddress(createAddress(_env._component.getClusterName(), - cmd.getNodes()[index + 1].index)); + auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp()); + cmd2->setAddress(createAddress(_env._component.getClusterName(), cmd.getNodes()[index + 1].index)); cmd2->getDiff().swap(local); cmd2->setPriority(cmd.getPriority()); cmd2->setTimeout(cmd.getTimeout()); @@ -1330,10 +1309,9 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, } MessageTracker::UP -MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, - spi::Context& context) +MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.applyBucketDiff, _env._component.getClock()); + tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); LOG(debug, "%s", cmd.toString().c_str()); @@ -1348,10 +1326,8 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, bool lastInChain = index + 1u >= cmd.getNodes().size(); if (applyDiffNeedLocalData(cmd.getDiff(), index, !lastInChain)) { framework::MilliSecTimer startTime(_env._component.getClock()); - fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, - context); - _env._metrics.mergeDataReadLatency.addValue( - startTime.getElapsedTimeAsDouble()); + fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context()); + _env._metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Moving %zu entries, didn't need " "local data on node %u (%u).", @@ -1363,7 +1339,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_env._component.getClock()); api::BucketInfo info(applyDiffLocally(bucket, cmd.getLoadType(), - cmd.getDiff(), index, context)); + cmd.getDiff(), index, tracker->context())); _env._metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); } else { diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 4ea69bd0fdf..7052258ec03 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -55,13 +55,10 @@ public: uint8_t nodeIndex, spi::Context& context); - MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, - spi::Context&); - MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, - spi::Context&); + MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, MessageTracker::UP); + MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTracker::UP); void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&); - MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, - spi::Context&); + MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTracker::UP); void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&); private: diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 422e19a492e..5905d93cc83 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -69,7 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucke bool PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker) { - uint32_t code = _env.convertErrorCode(response); + uint32_t code = PersistenceUtil::convertErrorCode(response); if (code != 0) { tracker.fail(code, response.getErrorMessage()); @@ -80,11 +80,13 @@ PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tr } -bool PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) { +bool +PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) { return cmd.getCondition().isPresent(); } -bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, +bool +PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch) { try { TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch); @@ -104,35 +106,35 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, } MessageTracker::UP -PersistenceThread::handlePut(api::PutCommand& cmd, spi::Context & context) +PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.put[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) { return tracker; } spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), context); + spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker->context()); checkForError(response, *tracker); return tracker; } MessageTracker::UP -PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context) +PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.remove[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) { return tracker; } spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), context); + spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker->context()); if (checkForError(response, *tracker)) { tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } @@ -143,18 +145,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context) } MessageTracker::UP -PersistenceThread::handleUpdate(api::UpdateCommand& cmd, spi::Context & context) +PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.update[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context, cmd.getUpdate()->getCreateIfNonExistent())) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) { return tracker; } spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), context); + spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context()); if (checkForError(response, *tracker)) { auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); @@ -176,17 +178,16 @@ spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency co } MessageTracker::UP -PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context) +PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker) { auto& metrics = _env._metrics.get[cmd.getLoadType()]; - auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock()); + tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); - context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); + document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); + tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); spi::GetResult result = - _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), context); + _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), tracker->context()); if (checkForError(result, *tracker)) { if (!result.hasDocument()) { @@ -199,9 +200,9 @@ PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context) } MessageTracker::UP -PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) +PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock()); + tracker->setMetric(_env._metrics.repairs); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(), (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); @@ -225,28 +226,28 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) } MessageTracker::UP -PersistenceThread::handleRevert(api::RevertCommand& cmd, spi::Context & context) +PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock()); + tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]); spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); for (const api::Timestamp & token : tokens) { - spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), context); + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context()); } return tracker; } MessageTracker::UP -PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context) +PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.createBuckets); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str()); DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); } spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - _spi.createBucket(spiBucket, context); + _spi.createBucket(spiBucket, tracker->context()); if (cmd.getActive()) { _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); } @@ -295,9 +296,9 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, con } MessageTracker::UP -PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context) +PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.deleteBuckets); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { @@ -308,7 +309,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { return tracker; } - _spi.deleteBucket(bucket, context); + _spi.deleteBucket(bucket, tracker->context()); StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); { StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); @@ -333,12 +334,12 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex } MessageTracker::UP -PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context) +PersistenceThread::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock()); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), context)); + tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context())); if (checkForError(result, *tracker)) { - GetIterReply::SP reply(new GetIterReply(cmd)); + auto reply = std::make_shared<GetIterReply>(cmd); reply->getEntries() = result.steal_entries(); _env._metrics.visit[cmd.getLoadType()]. documentsPerIterate.addValue(reply->getEntries().size()); @@ -351,9 +352,9 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context) } MessageTracker::UP -PersistenceThread::handleReadBucketList(ReadBucketList& cmd) +PersistenceThread::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock()); + tracker->setMetric(_env._metrics.readBucketList); spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition())); if (checkForError(result, *tracker)) { @@ -366,23 +367,22 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd) } MessageTracker::UP -PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd) +PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock()); + tracker->setMetric(_env._metrics.readBucketInfo); _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket())); return tracker; } MessageTracker::UP -PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context) +PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock()); - document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields()); - context.setReadConsistency(cmd.getReadConsistency()); + tracker->setMetric(_env._metrics.createIterator); + document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFields()); + tracker->context().setReadConsistency(cmd.getReadConsistency()); spi::CreateIteratorResult result(_spi.createIterator( spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), context)); + *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), tracker->context())); if (checkForError(result, *tracker)) { tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId()))); } @@ -390,9 +390,9 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context } MessageTracker::UP -PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context) +PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.splitBuckets); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); // Calculate the various bucket ids involved. @@ -411,7 +411,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context SplitBitDetector::Result targetInfo; if (_env._config.enableMultibitSplitOptimalization) { targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(), - context, cmd.getMinDocCount(), cmd.getMinByteSize()); + tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize()); } if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { document::BucketId src(cmd.getBucketId()); @@ -451,9 +451,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context } #endif spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)), - spi::Bucket(target2, spi::PartitionId(lock2.disk)), context); + spi::Bucket(target2, spi::PartitionId(lock2.disk)), tracker->context()); if (result.hasError()) { - tracker->fail(_env.convertErrorCode(result), result.getErrorMessage()); + tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage()); return tracker; } // After split we need to take all bucket db locks to update them. @@ -509,7 +509,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context spi::PartitionId(targets[i].second.diskIndex))); LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", createTarget.toString().c_str()); - _spi.createBucket(createTarget, context); + _spi.createBucket(createTarget, tracker->context()); } splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(), targets[i].first->getBucketInfo()); @@ -529,7 +529,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context } bool -PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const +PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) { if (cmd.getSourceBuckets().size() != 2) { tracker.fail(ReturnCode::ILLEGAL_PARAMETERS, @@ -554,9 +554,9 @@ PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, Messa } MessageTracker::UP -PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context) +PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock()); + tracker->setMetric(_env._metrics.joinBuckets); if (!validateJoinCommand(cmd, *tracker)) { return tracker; } @@ -603,7 +603,7 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context _spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)), spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)), spi::Bucket(destBucket, spi::PartitionId(_env._partition)), - context); + tracker->context()); if (!checkForError(result, *tracker)) { return tracker; } @@ -634,9 +634,9 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context } MessageTracker::UP -PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) +PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.setBucketStates,_env._component.getClock()); + tracker->setMetric(_env._metrics.setBucketStates); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); @@ -665,9 +665,9 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) } MessageTracker::UP -PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context) +PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock()); + tracker->setMetric(_env._metrics.internalJoin); document::Bucket destBucket = cmd.getBucket(); { // Create empty bucket for target. @@ -682,7 +682,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi: _spi.join(spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())), - context); + tracker->context()); if (checkForError(result, *tracker)) { tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket()))); } @@ -690,9 +690,9 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi: } MessageTracker::UP -PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd) +PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>(_env._metrics.recheckBucketInfo, _env._component.getClock()); + tracker->setMetric(_env._metrics.recheckBucketInfo); document::Bucket bucket(cmd.getBucket()); api::BucketInfo info(_env.getBucketInfo(bucket)); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); @@ -720,58 +720,58 @@ PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd) } MessageTracker::UP -PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Context & context) +PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) { switch (msg.getType().getId()) { case api::MessageType::GET_ID: - return handleGet(static_cast<api::GetCommand&>(msg), context); + return handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker)); case api::MessageType::PUT_ID: - return handlePut(static_cast<api::PutCommand&>(msg), context); + return handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker)); case api::MessageType::REMOVE_ID: - return handleRemove(static_cast<api::RemoveCommand&>(msg), context); + return handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker)); case api::MessageType::UPDATE_ID: - return handleUpdate(static_cast<api::UpdateCommand&>(msg), context); + return handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker)); case api::MessageType::REVERT_ID: - return handleRevert(static_cast<api::RevertCommand&>(msg), context); + return handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker)); case api::MessageType::CREATEBUCKET_ID: - return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), context); + return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), context); + return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: - return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), context); + return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: - return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), context); + return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker)); // Depends on iterators case api::MessageType::STATBUCKET_ID: - return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), context); + return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker)); case api::MessageType::REMOVELOCATION_ID: - return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), context); + return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker)); case api::MessageType::MERGEBUCKET_ID: - return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), context); + return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker)); case api::MessageType::GETBUCKETDIFF_ID: - return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), context); + return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::APPLYBUCKETDIFF_ID: - return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), context); + return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::SETBUCKETSTATE_ID: - return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg)); + return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: - return handleGetIter(static_cast<GetIterCommand&>(msg), context); + return handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker)); case CreateIteratorCommand::ID: - return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), context); + return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker)); case ReadBucketList::ID: - return handleReadBucketList(static_cast<ReadBucketList&>(msg)); + return handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker)); case ReadBucketInfo::ID: - return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg)); + return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker)); case RepairBucketCommand::ID: - return handleRepairBucket(static_cast<RepairBucketCommand&>(msg)); + return handleRepairBucket(static_cast<RepairBucketCommand&>(msg), std::move(tracker)); case BucketDiskMoveCommand::ID: - return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), context); + return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), std::move(tracker)); case InternalBucketJoinCommand::ID: - return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), context); + return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker)); case RecheckBucketInfoCommand::ID: - return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg)); + return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker)); default: LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str()); break; @@ -782,21 +782,6 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Conte return MessageTracker::UP(); } -MessageTracker::UP -PersistenceThread::handleCommand(api::StorageCommand& msg) -{ - spi::Context context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel()); - MessageTracker::UP mtracker(handleCommandSplitByType(msg, context)); - if (mtracker && ! context.getTrace().getRoot().isEmpty()) { - if (mtracker->hasReply()) { - mtracker->getReply().getTrace().getRoot().addChild(context.getTrace().getRoot()); - } else { - msg.getTrace().getRoot().addChild(context.getTrace().getRoot()); - } - } - return mtracker; -} - void PersistenceThread::handleReply(api::StorageReply& reply) { @@ -813,7 +798,7 @@ PersistenceThread::handleReply(api::StorageReply& reply) } MessageTracker::UP -PersistenceThread::processMessage(api::StorageMessage& msg) +PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) { MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer"); @@ -829,13 +814,12 @@ PersistenceThread::processMessage(api::StorageMessage& msg) } } else { api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg); - try { int64_t startTime(_component->getClock().getTimeInMillis().getTime()); LOG(debug, "Handling command: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - auto tracker(handleCommand(initiatingCommand)); + tracker = handleCommandSplitByType(initiatingCommand, std::move(tracker)); if (!tracker) { LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); } else { @@ -867,47 +851,18 @@ PersistenceThread::processMessage(api::StorageMessage& msg) } } - return MessageTracker::UP(); -} - -namespace { - - -bool isBatchable(api::MessageType::Id id) -{ - return (id == api::MessageType::PUT_ID || - id == api::MessageType::REMOVE_ID || - id == api::MessageType::UPDATE_ID || - id == api::MessageType::REVERT_ID); -} - -bool hasBucketInfo(api::MessageType::Id id) -{ - return (isBatchable(id) || - (id == api::MessageType::REMOVELOCATION_ID || - id == api::MessageType::JOINBUCKETS_ID)); -} - + return tracker; } void -PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) { - std::vector<MessageTracker::UP> trackers; - document::Bucket bucket = lock.first->getBucket(); - +PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) { LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); api::StorageMessage & msg(*lock.second); - std::unique_ptr<MessageTracker> tracker = processMessage(msg); - if (tracker && tracker->hasReply()) { - if (hasBucketInfo(msg.getType().getId())) { - if (tracker->getReply().getResult().success()) { - _env.setBucketInfo(*tracker, bucket); - } - } - LOG(spam, "Sending reply up: %s %" PRIu64, - tracker->getReply().toString().c_str(), tracker->getReply().getMsgId()); - _env._fileStorHandler.sendReply(std::move(*tracker).stealReplySP()); + auto tracker = std::make_unique<MessageTracker>(_env, std::move(lock.first), std::move(lock.second)); + tracker = processMessage(msg, std::move(tracker)); + if (tracker) { + tracker->sendReply(); } } @@ -922,7 +877,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId)); if (lock.first) { - processLockedMessage(lock); + processLockedMessage(std::move(lock)); } vespalib::MonitorGuard flushMonitorGuard(_flushMonitor); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 56414835b7b..a3c8099f228 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -28,23 +28,23 @@ public: void flush() override; framework::Thread& getThread() override { return *_thread; } - MessageTracker::UP handlePut(api::PutCommand& cmd, spi::Context & context); - MessageTracker::UP handleRemove(api::RemoveCommand& cmd, spi::Context & context); - MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, spi::Context & context); - MessageTracker::UP handleGet(api::GetCommand& cmd, spi::Context & context); - MessageTracker::UP handleRevert(api::RevertCommand& cmd, spi::Context & context); - MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context); - MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context); - MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context); - MessageTracker::UP handleGetIter(GetIterCommand& cmd, spi::Context & context); - MessageTracker::UP handleReadBucketList(ReadBucketList& cmd); - MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd); - MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context); - MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd); - MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context); - MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context); - MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd); - MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd); + MessageTracker::UP handlePut(api::PutCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker); + MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker); private: uint32_t _stripeId; @@ -67,23 +67,22 @@ private: * an appropriate error and returns false iff the command does not validate * OK. Returns true and does not touch the tracker otherwise. */ - bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const; + static bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker); // Message handling functions - MessageTracker::UP handleCommand(api::StorageCommand&); - MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, spi::Context & context); + MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker); void handleReply(api::StorageReply&); - MessageTracker::UP processMessage(api::StorageMessage& msg); - void processLockedMessage(FileStorHandler::LockedMessage & lock); + MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker); + void processLockedMessage(FileStorHandler::LockedMessage lock); // Thread main loop void run(framework::ThreadHandle&) override; - bool checkForError(const spi::Result& response, MessageTracker& tracker); + static bool checkForError(const spi::Result& response, MessageTracker& tracker); spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const; friend class TestAndSetHelper; - bool tasConditionExists(const api::TestAndSetCommand & cmd); + static bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch = false); }; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 9c49dc96750..53679a1a364 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -14,22 +14,69 @@ namespace { ost << "PersistenceUtil(" << p << ")"; return ost.str(); } + + bool isBatchable(api::MessageType::Id id) + { + return (id == api::MessageType::PUT_ID || + id == api::MessageType::REMOVE_ID || + id == api::MessageType::UPDATE_ID || + id == api::MessageType::REVERT_ID); + } + + bool hasBucketInfo(api::MessageType::Id id) + { + return (isBatchable(id) || + (id == api::MessageType::REMOVELOCATION_ID || + id == api::MessageType::JOINBUCKETS_ID)); + } + } -MessageTracker::MessageTracker(FileStorThreadMetrics::Op& metric, - framework::Clock& clock) +MessageTracker::MessageTracker(PersistenceUtil & env, + FileStorHandler::BucketLockInterface::SP bucketLock, + api::StorageMessage::SP msg) : _sendReply(true), - _metric(metric), + _updateBucketInfo(hasBucketInfo(msg->getType().getId())), + _bucketLock(std::move(bucketLock)), + _msg(std::move(msg)), + _context(_msg->getLoadType(), _msg->getPriority(), _msg->getTrace().getLevel()), + _env(env), + _metric(nullptr), _result(api::ReturnCode::OK), - _timer(clock) -{ - _metric.count.inc(); + _timer(_env._component.getClock()) +{ } + +void +MessageTracker::setMetric(FileStorThreadMetrics::Op& metric) { + metric.count.inc(); + _metric = &metric; } MessageTracker::~MessageTracker() { if (_reply.get() && _reply->getResult().success()) { - _metric.latency.addValue(_timer.getElapsedTimeAsDouble()); + _metric->latency.addValue(_timer.getElapsedTimeAsDouble()); + } +} + +void +MessageTracker::sendReply() { + if (hasReply()) { + if ( ! _context.getTrace().getRoot().isEmpty()) { + getReply().getTrace().getRoot().addChild(_context.getTrace().getRoot()); + } + if (_updateBucketInfo) { + if (getReply().getResult().success()) { + _env.setBucketInfo(*this, _bucketLock->getBucket()); + } + } + LOG(spam, "Sending reply up: %s %" PRIu64, + getReply().toString().c_str(), getReply().getMsgId()); + _env._fileStorHandler.sendReply(std::move(_reply)); + } else { + if ( ! _context.getTrace().getRoot().isEmpty()) { + _msg->getTrace().getRoot().addChild(_context.getTrace().getRoot()); + } } } @@ -53,7 +100,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd) } if (!_reply->getResult().success()) { - _metric.failed.inc(); + _metric->failed.inc(); LOGBP(debug, "Failed to handle command %s: %s", cmd.toString().c_str(), _result.toString().c_str()); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index e8e5f947814..c6cb943f0b9 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -13,14 +13,18 @@ namespace storage { +class PersistenceUtil; + class MessageTracker : protected Types { public: typedef std::unique_ptr<MessageTracker> UP; - MessageTracker(FileStorThreadMetrics::Op& metric, framework::Clock& clock); + MessageTracker(PersistenceUtil & env, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); ~MessageTracker(); + void setMetric(FileStorThreadMetrics::Op& metric); + /** * Called by operation handlers to set reply if they need to send a * non-default reply. They should call this function as soon as they create @@ -57,23 +61,32 @@ public: api::ReturnCode getResult() const { return _result; } + spi::Context & context() { return _context; } + + void sendReply(); + private: - bool _sendReply; - FileStorThreadMetrics::Op& _metric; - api::StorageReply::SP _reply; - api::ReturnCode _result; - framework::MilliSecTimer _timer; + bool _sendReply; + bool _updateBucketInfo; + FileStorHandler::BucketLockInterface::SP _bucketLock; + api::StorageMessage::SP _msg; + spi::Context _context; + PersistenceUtil &_env; + FileStorThreadMetrics::Op *_metric; + api::StorageReply::SP _reply; + api::ReturnCode _result; + framework::MilliSecTimer _timer; }; struct PersistenceUtil { vespa::config::content::StorFilestorConfig _config; - ServiceLayerComponentRegister& _compReg; - ServiceLayerComponent _component; - FileStorHandler& _fileStorHandler; - uint16_t _partition; - uint16_t _nodeIndex; - FileStorThreadMetrics& _metrics; - const document::BucketIdFactory& _bucketFactory; + ServiceLayerComponentRegister &_compReg; + ServiceLayerComponent _component; + FileStorHandler &_fileStorHandler; + uint16_t _partition; + uint16_t _nodeIndex; + FileStorThreadMetrics &_metrics; + const document::BucketIdFactory &_bucketFactory; const std::shared_ptr<const document::DocumentTypeRepo> _repo; spi::PersistenceProvider& _spi; diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index f37c6723933..4829bdf4581 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -80,25 +80,18 @@ public: } MessageTracker::UP -ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, - spi::Context& context) +ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>( - _env._metrics.removeLocation[cmd.getLoadType()], - _env._component.getClock()); + tracker->setMetric(_env._metrics.removeLocation[cmd.getLoadType()]); LOG(debug, "RemoveLocation(%s): using selection '%s'", cmd.getBucketId().toString().c_str(), cmd.getDocumentSelection().c_str()); spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - UnrevertableRemoveEntryProcessor processor(_spi, bucket, context); - BucketProcessor::iterateAll(_spi, - bucket, - cmd.getDocumentSelection(), - processor, - spi::NEWEST_DOCUMENT_ONLY, - context); + UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context()); + BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), + processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context()); tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed)); @@ -106,12 +99,9 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, } MessageTracker::UP -ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, - spi::Context& context) +ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker) { - auto tracker = std::make_unique<MessageTracker>( - _env._metrics.statBucket[cmd.getLoadType()], - _env._component.getClock()); + tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]); std::ostringstream ost; ost << "Persistence bucket " << cmd.getBucketId() @@ -119,15 +109,10 @@ ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); StatEntryProcessor processor(ost); - BucketProcessor::iterateAll(_spi, - bucket, - cmd.getDocumentSelection(), - processor, - spi::ALL_VERSIONS, - context); - - api::StatBucketReply::UP reply(new api::StatBucketReply(cmd, ost.str())); - tracker->setReply(api::StorageReply::SP(reply.release())); + BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(), + processor, spi::ALL_VERSIONS,tracker->context()); + + tracker->setReply(std::make_shared<api::StatBucketReply>(cmd, ost.str())); return tracker; } diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h index 37b46ffc728..87c3c63b8fe 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.h +++ b/storage/src/vespa/storage/persistence/processallhandler.h @@ -8,11 +8,7 @@ #include <vespa/storageapi/message/stat.h> #include <vespa/persistence/spi/persistenceprovider.h> -namespace document { -namespace select { -class Node; -} -} +namespace document::select { class Node; } namespace storage { @@ -20,9 +16,8 @@ class ProcessAllHandler : public Types { public: ProcessAllHandler(PersistenceUtil&, spi::PersistenceProvider&); - MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, - spi::Context&); - MessageTracker::UP handleStatBucket(api::StatBucketCommand&, spi::Context&); + MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, MessageTracker::UP tracker); + MessageTracker::UP handleStatBucket(api::StatBucketCommand&, MessageTracker::UP tracker); protected: PersistenceUtil& _env; diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 76c420e76e6..ed3de5c7873 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -74,12 +74,9 @@ ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const } spi::Result -ProviderErrorWrapper::put(const spi::Bucket& bucket, - spi::Timestamp ts, - const spi::DocumentSP& doc, - spi::Context& context) +ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::DocumentSP doc, spi::Context& context) { - return checkResult(_impl.put(bucket, ts, doc, context)); + return checkResult(_impl.put(bucket, ts, std::move(doc), context)); } spi::RemoveResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 292eb004223..61664419c69 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -47,7 +47,7 @@ public: spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; - spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override; + spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; |