summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-28 08:24:57 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-29 14:14:03 +0000
commiteb7b71781ca079b5577a13b300beafee388bc1ce (patch)
tree8a5194ed759a8fc8433fef14118e67ac6bfc2632
parent9499865f8a43aa097841606795a2bea8d0273ef9 (diff)
- Add async interface to put
- Use MessageTracker for keeping context. - implement putAsync, but still use it synchronously.
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp18
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h23
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp2
-rw-r--r--persistence/src/vespa/persistence/spi/operationcomplete.h22
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp26
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h4
-rw-r--r--searchcore/src/apps/proton/downpersistence.cpp48
-rw-r--r--searchcore/src/apps/proton/downpersistence.h2
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp3
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp6
-rw-r--r--searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp2
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp2
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h75
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h15
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp83
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h45
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h2
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp8
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h2
-rw-r--r--storage/src/tests/persistence/diskmoveoperationhandlertest.cpp10
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp7
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp103
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp2
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h17
-rw-r--r--storage/src/tests/persistence/persistencethread_splittest.cpp20
-rw-r--r--storage/src/tests/persistence/processalltest.cpp45
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp69
-rw-r--r--storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp24
-rw-r--r--storage/src/vespa/storage/persistence/diskmoveoperationhandler.h3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp82
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h9
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp237
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h47
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp63
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h39
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.cpp37
-rw-r--r--storage/src/vespa/storage/persistence/processallhandler.h11
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h2
49 files changed, 681 insertions, 611 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 6834f453695..3f05ed36802 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -435,8 +435,7 @@ DummyPersistence::getBucketInfo(const Bucket& b) const
}
Result
-DummyPersistence::put(const Bucket& b, Timestamp t, const Document::SP& doc,
- Context&)
+DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&)
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "put(%s, %" PRIu64 ", %s)",
@@ -461,14 +460,13 @@ DummyPersistence::put(const Bucket& b, Timestamp t, const Document::SP& doc,
LOG(spam, "Inserting document %s", doc->toString(true).c_str());
- DocEntry::UP entry(new DocEntry(t, NONE, Document::UP(doc->clone())));
+ auto entry = std::make_unique<DocEntry>(t, NONE, Document::UP(doc->clone()));
(*bc)->insert(std::move(entry));
return Result();
}
Result
-DummyPersistence::maintain(const Bucket& b,
- MaintenanceLevel)
+DummyPersistence::maintain(const Bucket& b, MaintenanceLevel)
{
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
if (_simulateMaintainFailure) {
@@ -489,10 +487,7 @@ DummyPersistence::maintain(const Bucket& b,
}
RemoveResult
-DummyPersistence::remove(const Bucket& b,
- Timestamp t,
- const DocumentId& did,
- Context&)
+DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&)
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "remove(%s, %" PRIu64 ", %s)",
@@ -518,10 +513,7 @@ DummyPersistence::remove(const Bucket& b,
}
GetResult
-DummyPersistence::get(const Bucket& b,
- const document::FieldSet& fieldSet,
- const DocumentId& did,
- Context&) const
+DummyPersistence::get(const Bucket& b, const document::FieldSet& fieldSet, const DocumentId& did, Context&) const
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "get(%s, %s)",
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index c97aab822ac..88e17a90a98 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -153,16 +153,9 @@ public:
Result setClusterState(BucketSpace bucketSpace, const ClusterState& newState) override;
Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
- Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) override;
- GetResult get(const Bucket&,
- const document::FieldSet& fieldSet,
- const DocumentId&,
- Context&) const override;
-
- RemoveResult remove(const Bucket& b,
- Timestamp t,
- const DocumentId& did,
- Context&) override;
+ Result put(const Bucket&, Timestamp, DocumentSP, Context&) override;
+ GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override;
+ RemoveResult remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) override;
CreateIteratorResult createIterator(const Bucket&,
const document::FieldSet& fs,
@@ -176,15 +169,9 @@ public:
Result createBucket(const Bucket&, Context&) override;
Result deleteBucket(const Bucket&, Context&) override;
- Result split(const Bucket& source,
- const Bucket& target1,
- const Bucket& target2,
- Context&) override;
+ Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
- Result join(const Bucket& source1,
- const Bucket& source2,
- const Bucket& target,
- Context&) override;
+ Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
Result revert(const Bucket&, Timestamp, Context&);
Result maintain(const Bucket& bucket, MaintenanceLevel level) override;
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index e35a6a74bde..e7abe137b89 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -31,7 +31,7 @@ AbstractPersistenceProvider::update(const Bucket& bucket, Timestamp ts,
upd->applyTo(*docToUpdate);
- Result putResult = put(bucket, ts, docToUpdate, context);
+ Result putResult = put(bucket, ts, std::move(docToUpdate), context);
if (putResult.hasError()) {
return UpdateResult(putResult.getErrorCode(),
diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h
new file mode 100644
index 00000000000..1a548e613dd
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/operationcomplete.h
@@ -0,0 +1,22 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <memory>
+
+namespace storage::spi {
+
+class Result;
+
+/**
+ * This is the callback interface when using the async operations
+ * in the persistence provider.
+ */
+class OperationComplete
+{
+public:
+ using UP = std::unique_ptr<OperationComplete>;
+ virtual ~OperationComplete() = default;
+ virtual void onComplete(std::unique_ptr<Result> result) = 0;
+};
+
+} \ No newline at end of file
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 61d141c0229..02fb1bb4719 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -1,9 +1,35 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "persistenceprovider.h"
+#include <future>
namespace storage::spi {
PersistenceProvider::~PersistenceProvider() = default;
+class CatchResult : public OperationComplete {
+public:
+ std::future<Result::UP> waitResult() {
+ return promisedResult.get_future();
+ }
+ void onComplete(Result::UP result) override {
+ promisedResult.set_value(std::move(result));
+ }
+private:
+ std::promise<Result::UP> promisedResult;
+};
+Result
+PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->waitResult();
+ putAsync(bucket, timestamp, std::move(doc), context, std::move(catcher));
+ return *future.get();
+}
+void
+PersistenceProvider::putAsync(const Bucket &bucket, Timestamp timestamp, DocumentSP doc, Context &context,
+ OperationComplete::UP onComplete) {
+ Result result = put(bucket, timestamp, std::move(doc), context);
+ onComplete->onComplete(std::make_unique<Result>(result));
+}
+
}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index c70d5e3f1c3..70645b31902 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -10,6 +10,7 @@
#include "result.h"
#include "selection.h"
#include "clusterstate.h"
+#include "operationcomplete.h"
namespace document { class FieldSet; }
@@ -109,7 +110,8 @@ struct PersistenceProvider
/**
* Store the given document at the given microsecond time.
*/
- virtual Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) = 0;
+ virtual Result put(const Bucket&, Timestamp, DocumentSP, Context&);
+ virtual void putAsync(const Bucket &, Timestamp , DocumentSP , Context &, OperationComplete::UP );
/**
* This remove function assumes that there exist something to be removed.
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp
index d4ec9cee395..511d0e3fee8 100644
--- a/searchcore/src/apps/proton/downpersistence.cpp
+++ b/searchcore/src/apps/proton/downpersistence.cpp
@@ -19,9 +19,7 @@ DownPersistence::DownPersistence(const vespalib::string &downReason)
{
}
-DownPersistence::~DownPersistence()
-{
-}
+DownPersistence::~DownPersistence() = default;
Result
DownPersistence::initialize()
@@ -40,8 +38,7 @@ DownPersistence::getPartitionStates() const
BucketIdListResult
DownPersistence::listBuckets(BucketSpace, PartitionId) const
{
- return BucketIdListResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
@@ -59,30 +56,25 @@ DownPersistence:: setActiveState(const Bucket&, BucketInfo::ActiveState)
BucketInfoResult
DownPersistence:: getBucketInfo(const Bucket&) const
{
- return BucketInfoResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return BucketInfoResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
-DownPersistence::put(const Bucket&, Timestamp, const Document::SP&, Context&)
+DownPersistence::put(const Bucket&, Timestamp, Document::SP, Context&)
{
return errorResult;
}
RemoveResult
-DownPersistence:: remove(const Bucket&, Timestamp,
- const DocumentId&, Context&)
+DownPersistence:: remove(const Bucket&, Timestamp, const DocumentId&, Context&)
{
- return RemoveResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
RemoveResult
-DownPersistence::removeIfFound(const Bucket&, Timestamp,
- const DocumentId&, Context&)
+DownPersistence::removeIfFound(const Bucket&, Timestamp,const DocumentId&, Context&)
{
- return RemoveResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
@@ -91,35 +83,28 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&)
return errorResult;
}
-UpdateResult DownPersistence::update(const Bucket&, Timestamp,
- const DocumentUpdate::SP&, Context&)
+UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&)
{
- return UpdateResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
GetResult
-DownPersistence::get(const Bucket&, const document::FieldSet&,
- const DocumentId&, Context&) const
+DownPersistence::get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const
{
- return GetResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return GetResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
CreateIteratorResult
DownPersistence::createIterator(const Bucket&, const document::FieldSet&,
- const Selection&, IncludedVersions,
- Context&)
+ const Selection&, IncludedVersions, Context&)
{
- return CreateIteratorResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return CreateIteratorResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
IterateResult
DownPersistence::iterate(IteratorId, uint64_t, Context&) const
{
- return IterateResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return IterateResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
@@ -144,8 +129,7 @@ DownPersistence::deleteBucket(const Bucket&, Context&)
BucketIdListResult
DownPersistence::getModifiedBuckets(BucketSpace) const
{
- return BucketIdListResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h
index 8cdac7aaa1b..2ae0605fdb6 100644
--- a/searchcore/src/apps/proton/downpersistence.h
+++ b/searchcore/src/apps/proton/downpersistence.h
@@ -32,7 +32,7 @@ public:
Result setClusterState(BucketSpace, const ClusterState&) override;
Result setActiveState(const Bucket&, BucketInfo::ActiveState) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
- Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) override;
+ Result put(const Bucket&, Timestamp, DocumentSP, Context&) override;
RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
Result removeEntry(const Bucket&, Timestamp, Context&) override;
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index 55d524519b8..e1785e1e48d 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -255,8 +255,7 @@ public:
storage::spi::Timestamp ts(0);
DbDocumentId dbdId(lid);
DbDocumentId prevDbdId(0);
- document::Document::SP xdoc(new document::Document(doc));
- PutOperation op(bucketId, ts, xdoc);
+ PutOperation op(bucketId, ts, std::make_shared<document::Document>(doc));
op.setSerialNum(serialNum);
op.setDbDocumentId(dbdId);
op.setPrevDbDocumentId(prevDbdId);
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 796ec4436fe..18235116d27 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -544,7 +544,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture)
TEST_F("require that outdated put is ignored", FeedHandlerFixture)
{
DocumentContext doc_context("id:ns:searchdocument::foo", *f.schema.builder);
- auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), doc_context.doc);
+ auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), std::move(doc_context.doc));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token), std::move(op));
@@ -673,7 +673,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF
f.writeFilter._message = "Attribute resource limit reached";
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
- auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc);
+ auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), std::move(docCtx.doc));
FeedTokenContext token;
f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.put_count);
@@ -801,7 +801,7 @@ TEST_F("require that put with different document type repo is ok", FeedHandlerFi
TwoFieldsSchemaContext schema;
DocumentContext doc_context("id:ns:searchdocument::foo", *schema.builder);
auto op = std::make_unique<PutOperation>(doc_context.bucketId,
- Timestamp(10), doc_context.doc);
+ Timestamp(10), std::move(doc_context.doc));
FeedTokenContext token_context;
EXPECT_EQUAL(schema.getRepo().get(), op->getDocument()->getRepo());
EXPECT_NOT_EQUAL(f.schema.getRepo().get(), op->getDocument()->getRepo());
diff --git a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp
index 5fffd70f11d..53fdcb7757f 100644
--- a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp
+++ b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp
@@ -190,7 +190,7 @@ TEST("require that toString() on derived classes are meaningful")
PutOperation().toString());
EXPECT_EQUAL("Put(id::::, BucketId(0x000000000000002a), timestamp=10, dbdId=(subDbId=0, lid=0), "
"prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)",
- PutOperation(bucket_id1, timestamp, doc).toString());
+ PutOperation(bucket_id1, timestamp, std::move(doc)).toString());
EXPECT_EQUAL("Remove(id::::, BucketId(0x0000000000000000), timestamp=0, dbdId=(subDbId=0, lid=0), "
"prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)",
diff --git a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
index 35590cc68f6..0c72d11899a 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
@@ -15,7 +15,7 @@ using HandlerSnapshot = PersistenceHandlerMap::HandlerSnapshot;
struct DummyPersistenceHandler : public IPersistenceHandler {
using SP = std::shared_ptr<DummyPersistenceHandler>;
void initialize() override {}
- void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::Document::SP &) override {}
+ void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, DocumentSP) override {}
void handleUpdate(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentUpdate::SP &) override {}
void handleRemove(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentId &) override {}
void handleListBuckets(IBucketIdListResultHandler &) override {}
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index 19d9d41c3e4..4fb4abcb7c5 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -9,7 +9,6 @@
#include <vespa/document/update/documentupdate.h>
#include <vespa/persistence/spi/documentselection.h>
#include <vespa/persistence/spi/test.h>
-#include <vespa/persistence/spi/test.h>
#include <vespa/searchcore/proton/persistenceengine/bucket_guard.h>
#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
@@ -199,7 +198,7 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer {
void initialize() override { initialized = true; }
void handlePut(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const document::Document::SP& doc) override {
+ Timestamp timestamp, DocumentSP doc) override {
token->setResult(ResultUP(new storage::spi::Result()), false);
handle(token, bucket, timestamp, doc->getId());
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
index b6223dd41ab..e0bc7cb551f 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
@@ -35,4 +35,8 @@ State::fail()
}
}
+OwningState::~OwningState() {
+ ack();
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index ab3fdea3345..363886b9f31 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -15,36 +15,53 @@ typedef std::unique_ptr<storage::spi::Result> ResultUP;
* instance of this class is passed to every invokation of the IFeedHandler.
*/
namespace feedtoken {
- class ITransport {
- public:
- virtual ~ITransport() { }
- virtual void send(ResultUP result, bool documentWasFound) = 0;
- };
-
- class State : public search::IDestructorCallback {
- public:
- State(const State &) = delete;
- State & operator = (const State &) = delete;
- State(ITransport & transport);
- ~State() override;
- void fail();
- void setResult(ResultUP result, bool documentWasFound) {
- _documentWasFound = documentWasFound;
- _result = std::move(result);
- }
- const storage::spi::Result &getResult() { return *_result; }
- private:
- void ack();
- ITransport &_transport;
- ResultUP _result;
- bool _documentWasFound;
- std::atomic<bool> _alreadySent;
- };
-
- inline std::shared_ptr<State>
- make(ITransport & latch) {
- return std::make_shared<State>(latch);
+
+class ITransport {
+public:
+ virtual ~ITransport() { }
+ virtual void send(ResultUP result, bool documentWasFound) = 0;
+};
+
+class State : public search::IDestructorCallback {
+public:
+ State(const State &) = delete;
+ State & operator = (const State &) = delete;
+ State(ITransport & transport);
+ ~State() override;
+ void fail();
+ void setResult(ResultUP result, bool documentWasFound) {
+ _documentWasFound = documentWasFound;
+ _result = std::move(result);
}
+ const storage::spi::Result &getResult() { return *_result; }
+protected:
+ void ack();
+private:
+ ITransport &_transport;
+ ResultUP _result;
+ bool _documentWasFound;
+ std::atomic<bool> _alreadySent;
+};
+class OwningState : public State {
+public:
+ OwningState(std::unique_ptr<ITransport> transport)
+ : State(*transport),
+ _owned(std::move(transport))
+ {}
+ ~OwningState() override;
+private:
+ std::unique_ptr<ITransport> _owned;
+};
+
+inline std::shared_ptr<State>
+make(ITransport & latch) {
+ return std::make_shared<State>(latch);
+}
+inline std::shared_ptr<State>
+make(std::unique_ptr<ITransport> transport) {
+ return std::make_shared<OwningState>(std::move(transport));
+}
+
}
using FeedToken = std::shared_ptr<feedtoken::State>;
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp
index f1161c8ebdd..56224889b78 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp
@@ -26,7 +26,7 @@ DocumentOperation::DocumentOperation(Type type)
}
-DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const Timestamp &timestamp)
+DocumentOperation::DocumentOperation(Type type, BucketId bucketId, Timestamp timestamp)
: FeedOperation(type),
_bucketId(bucketId),
_timestamp(timestamp),
@@ -38,6 +38,8 @@ DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const
{
}
+DocumentOperation::~DocumentOperation() = default;
+
void
DocumentOperation::assertValidBucketId(const document::DocumentId &docId) const
{
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h
index 6847dbfd943..044d44b8276 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h
@@ -22,15 +22,14 @@ protected:
DocumentOperation(Type type);
- DocumentOperation(Type type, const document::BucketId &bucketId,
- const storage::spi::Timestamp &timestamp);
+ DocumentOperation(Type type, document::BucketId bucketId, storage::spi::Timestamp timestamp);
void assertValidBucketId(const document::DocumentId &docId) const;
void assertValidBucketId(const document::GlobalId &docId) const;
vespalib::string docArgsToString() const;
public:
- ~DocumentOperation() override {}
+ ~DocumentOperation() override;
const document::BucketId &getBucketId() const { return _bucketId; }
storage::spi::Timestamp getTimestamp() const { return _timestamp; }
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
index efac7297c67..dd001348093 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
@@ -17,16 +17,12 @@ PutOperation::PutOperation()
{ }
-PutOperation::PutOperation(const BucketId &bucketId,
- const Timestamp &timestamp,
- const Document::SP &doc)
- : DocumentOperation(FeedOperation::PUT,
- bucketId,
- timestamp),
- _doc(doc)
+PutOperation::PutOperation(BucketId bucketId, Timestamp timestamp, Document::SP doc)
+ : DocumentOperation(FeedOperation::PUT, bucketId, timestamp),
+ _doc(std::move(doc))
{ }
-PutOperation::~PutOperation() { }
+PutOperation::~PutOperation() = default;
void
PutOperation::serialize(vespalib::nbostream &os) const
@@ -40,8 +36,7 @@ PutOperation::serialize(vespalib::nbostream &os) const
void
-PutOperation::deserialize(vespalib::nbostream &is,
- const DocumentTypeRepo &repo)
+PutOperation::deserialize(vespalib::nbostream &is, const DocumentTypeRepo &repo)
{
DocumentOperation::deserialize(is, repo);
size_t oldSize = is.size();
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
index 33330692fab..5ca6d755e49 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
@@ -12,17 +12,16 @@ class PutOperation : public DocumentOperation
public:
PutOperation();
- PutOperation(const document::BucketId &bucketId,
- const storage::spi::Timestamp &timestamp,
- const DocumentSP &doc);
- virtual ~PutOperation();
+ PutOperation(document::BucketId bucketId,
+ storage::spi::Timestamp timestamp,
+ DocumentSP doc);
+ ~PutOperation() override;
const DocumentSP &getDocument() const { return _doc; }
void assertValid() const;
- virtual void serialize(vespalib::nbostream &os) const override;
- virtual void deserialize(vespalib::nbostream &is,
- const document::DocumentTypeRepo &repo) override;
+ void serialize(vespalib::nbostream &os) const override;
+ void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override;
void deserializeDocument(const document::DocumentTypeRepo &repo);
- virtual vespalib::string toString() const override;
+ vespalib::string toString() const override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
index 82a9c8174fa..c95f51b29dc 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
@@ -42,7 +42,7 @@ public:
virtual void initialize() = 0;
virtual void handlePut(FeedToken token, const storage::spi::Bucket &bucket,
- storage::spi::Timestamp timestamp, const DocumentSP &doc) = 0;
+ storage::spi::Timestamp timestamp, DocumentSP doc) = 0;
virtual void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket,
storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 2ede5d45f7e..48074d491c8 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -64,7 +64,7 @@ public:
if (result.hasError()) {
std::lock_guard<std::mutex> guard(_lock);
if (_result.hasError()) {
- _result = TransportLatch::mergeErrorResults(_result, result);
+ _result = TransportMerger::mergeErrorResults(_result, result);
} else {
_result = result;
}
@@ -319,34 +319,32 @@ PersistenceEngine::getBucketInfo(const Bucket& b) const
}
-Result
-PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::SP& doc, Context&)
+void
+PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::DocumentSP doc, Context &, OperationComplete::UP onComplete)
{
if (!_writeFilter.acceptWriteOperation()) {
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
if (!state.acceptWriteOperation()) {
- return Result(Result::ErrorType::RESOURCE_EXHAUSTED,
+ return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::RESOURCE_EXHAUSTED,
make_string("Put operation rejected for document '%s': '%s'",
- doc->getId().toString().c_str(), state.message().c_str()));
+ doc->getId().toString().c_str(), state.message().c_str())));
}
}
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
DocTypeName docType(doc->getType());
- LOG(spam, "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))", b.toString().c_str(), static_cast<uint64_t>(t.getValue()),
+ LOG(spam, "putAsync(%s, %" PRIu64 ", (\"%s\", \"%s\"))", bucket.toString().c_str(), static_cast<uint64_t>(ts.getValue()),
docType.toString().c_str(), doc->getId().toString().c_str());
if (!doc->getId().hasDocType()) {
- return Result(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str()));
+ return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())));
}
- IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
+ IPersistenceHandler * handler = getHandler(rguard, bucket.getBucketSpace(), docType);
if (!handler) {
- return Result(Result::ErrorType::PERMANENT_ERROR,
- make_string("No handler for document type '%s'", docType.toString().c_str()));
+ return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("No handler for document type '%s'", docType.toString().c_str())));
}
- TransportLatch latch(1);
- handler->handlePut(feedtoken::make(latch), b, t, doc);
- latch.await();
- return latch.getResult();
+ auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc));
}
PersistenceEngine::RemoveResult
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 5d3be07c532..d28a00b909e 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -38,6 +38,7 @@ private:
using Timestamp = storage::spi::Timestamp;
using TimestampList = storage::spi::TimestampList;
using UpdateResult = storage::spi::UpdateResult;
+ using OperationComplete = storage::spi::OperationComplete;
struct IteratorEntry {
PersistenceHandlerSequence handler_sequence;
@@ -100,7 +101,7 @@ public:
Result setClusterState(BucketSpace bucketSpace, const ClusterState& calc) override;
Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
- Result put(const Bucket&, Timestamp, const std::shared_ptr<document::Document>&, Context&) override;
+ void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override;
RemoveResult remove(const Bucket&, Timestamp, const document::DocumentId&, Context&) override;
UpdateResult update(const Bucket&, Timestamp,
const std::shared_ptr<document::DocumentUpdate>&, Context&) override;
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
index 4a5dfb0a2a5..64159841efe 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
@@ -8,10 +8,50 @@ using storage::spi::Result;
namespace proton {
+std::unique_ptr<std::mutex> createOptionalLock(bool needLocking) {
+ return needLocking
+ ? std::make_unique<std::mutex>()
+ : std::unique_ptr<std::mutex>();
+}
+TransportMerger::TransportMerger(bool needLocking)
+ : _result(),
+ _lock(createOptionalLock(needLocking))
+{
+}
+TransportMerger::~TransportMerger() = default;
+
+void
+TransportMerger::mergeResult(ResultUP result, bool documentWasFound) {
+ if (_lock) {
+ std::lock_guard<std::mutex> guard(*_lock);
+ mergeWithLock(std::move(result), documentWasFound);
+ } else {
+ mergeWithLock(std::move(result), documentWasFound);
+ }
+}
+
+void
+TransportMerger::mergeWithLock(ResultUP result, bool documentWasFound) {
+ if (!_result) {
+ _result = std::move(result);
+ } else if (result->hasError()) {
+ _result = std::make_unique<Result>(mergeErrorResults(*_result, *result));
+ } else if (documentWasFound) {
+ _result = std::move(result);
+ }
+ completeIfDone();
+}
+
+Result
+TransportMerger::mergeErrorResults(const Result &lhs, const Result &rhs)
+{
+ Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode();
+ return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str()));
+}
+
TransportLatch::TransportLatch(uint32_t cnt)
- : _latch(cnt),
- _lock(),
- _result()
+ : TransportMerger(cnt > 1),
+ _latch(cnt)
{
if (cnt == 0u) {
_result = std::make_unique<Result>();
@@ -23,24 +63,33 @@ TransportLatch::~TransportLatch() = default;
void
TransportLatch::send(ResultUP result, bool documentWasFound)
{
- {
- std::lock_guard<std::mutex> guard(_lock);
- if (!_result) {
- _result = std::move(result);
- } else if (result->hasError()) {
- _result.reset(new Result(mergeErrorResults(*_result, *result)));
- } else if (documentWasFound) {
- _result = std::move(result);
- }
- }
+ mergeResult(std::move(result), documentWasFound);
_latch.countDown();
}
-Result
-TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs)
+AsyncTranportContext::AsyncTranportContext(uint32_t cnt, OperationComplete::UP onComplete)
+ : TransportMerger(cnt > 1),
+ _countDown(cnt),
+ _onComplete(std::move(onComplete))
{
- Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode();
- return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str()));
+ if (cnt == 0u) {
+ _onComplete->onComplete(std::make_unique<Result>());
+ }
+}
+
+void
+AsyncTranportContext::completeIfDone() {
+ _countDown--;
+ if (_countDown == 0) {
+ _onComplete->onComplete(std::make_unique<Result>());
+ }
+}
+AsyncTranportContext::~AsyncTranportContext() = default;
+
+void
+AsyncTranportContext::send(ResultUP result, bool documentWasFound)
+{
+ mergeResult(std::move(result), documentWasFound);
}
} // proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
index 2fb55b14fdf..b3b3bc43aa7 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
@@ -13,18 +13,33 @@ namespace proton {
* Implementation of FeedToken::ITransport for handling the async reply for an operation.
* Uses an internal count down latch to keep track the number of outstanding replies.
*/
-class TransportLatch : public feedtoken::ITransport {
-private:
+
+class TransportMerger : public feedtoken::ITransport {
+public:
using Result = storage::spi::Result;
+ static Result mergeErrorResults(const Result &lhs, const Result &rhs);
+protected:
+ TransportMerger(bool needLocking);
+ ~TransportMerger() override;
+ void mergeResult(ResultUP result, bool documentWasFound);
+ virtual void completeIfDone() { } // Called with lock held if necessary on every merge
+ ResultUP _result;
+
+private:
+ void mergeWithLock(ResultUP result, bool documentWasFound);
+ std::unique_ptr<std::mutex> _lock;
+};
+class TransportLatch : public TransportMerger {
+private:
+
using UpdateResult = storage::spi::UpdateResult;
using RemoveResult = storage::spi::RemoveResult;
vespalib::CountDownLatch _latch;
- std::mutex _lock;
- ResultUP _result;
+
public:
TransportLatch(uint32_t cnt);
- ~TransportLatch();
+ ~TransportLatch() override;
void send(ResultUP result, bool documentWasFound) override;
void await() {
_latch.await();
@@ -38,7 +53,25 @@ public:
const RemoveResult &getRemoveResult() const {
return dynamic_cast<const RemoveResult &>(*_result);
}
- static Result mergeErrorResults(const Result &lhs, const Result &rhs);
+
+};
+
+/**
+ * Implementation of FeedToken::ITransport for async handling of the async reply for an operation.
+ * Uses an internal count to keep track the number of the outstanding replies.
+ */
+class AsyncTranportContext : public TransportMerger {
+private:
+ using Result = storage::spi::Result;
+ using OperationComplete = storage::spi::OperationComplete;
+
+ int _countDown;
+ OperationComplete::UP _onComplete;
+ void completeIfDone() override;
+public:
+ AsyncTranportContext(uint32_t cnt, OperationComplete::UP);
+ ~AsyncTranportContext() override;
+ void send(ResultUP result, bool documentWasFound) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 362de7ee780..4a87ee0009c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -154,7 +154,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o
auto doc = make_shared<Document>(op.getUpdate()->getType(), op.getUpdate()->getId());
doc->setRepo(*_activeFeedView->getDocumentTypeRepo());
op.getUpdate()->applyTo(*doc);
- PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc);
+ PutOperation putOp(op.getBucketId(), op.getTimestamp(), std::move(doc));
_activeFeedView->preparePut(putOp);
storeOperation(putOp, token);
if (token) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
index 16af4e87795..ba0db2bb77d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
@@ -39,9 +39,9 @@ PersistenceHandlerProxy::initialize()
}
void
-PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentSP &doc)
+PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, DocumentSP doc)
{
- auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, doc);
+ auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, std::move(doc));
_feedHandler.handleOperation(token, std::move(op));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
index 3e3b3dd6fb6..5a82b46f91c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
@@ -25,7 +25,7 @@ public:
void initialize() override;
void handlePut(FeedToken token, const storage::spi::Bucket &bucket,
- storage::spi::Timestamp timestamp, const DocumentSP &doc) override;
+ storage::spi::Timestamp timestamp, DocumentSP doc) override;
void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket,
storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) override;
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index fb80c25bfb7..862d1fb758a 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -91,14 +91,12 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const
}
spi::Result
-PersistenceProviderWrapper::put(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const document::Document::SP& doc,
- spi::Context& context)
+PersistenceProviderWrapper::put(const spi::Bucket& bucket, spi::Timestamp timestamp,
+ document::Document::SP doc, spi::Context& context)
{
LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")");
CHECK_ERROR(spi::Result, FAIL_PUT);
- return _spi.put(bucket, timestamp, doc, context);
+ return _spi.put(bucket, timestamp, std::move(doc), context);
}
spi::RemoveResult
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 9bd3653e8a1..25365e64bfc 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -88,7 +88,7 @@ public:
spi::PartitionStateListResult getPartitionStates() const override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace, spi::PartitionId) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override;
+ spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
diff --git a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp
index 0dd3285e5f3..c10681405e7 100644
--- a/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp
+++ b/storage/src/tests/persistence/diskmoveoperationhandlertest.cpp
@@ -27,13 +27,11 @@ TEST_F(DiskMoveOperationHandlerTest, simple) {
doPutOnDisk(3, 4, spi::Timestamp(1000 + i));
}
- DiskMoveOperationHandler diskMoveHandler(
- getEnv(3),
- getPersistenceProvider());
- BucketDiskMoveCommand move(makeDocumentBucket(document::BucketId(16, 4)), 3, 4);
-
+ DiskMoveOperationHandler diskMoveHandler(getEnv(3),getPersistenceProvider());
+ document::Bucket bucket = makeDocumentBucket(document::BucketId(16, 4));
+ auto move = std::make_shared<BucketDiskMoveCommand>(bucket, 3, 4);
spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- diskMoveHandler.handleBucketDiskMove(move, context);
+ diskMoveHandler.handleBucketDiskMove(*move, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), move));
EXPECT_EQ("BucketId(0x4000000000000004): 10,4",
getBucketStatus(document::BucketId(16,4)));
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index ba344971c3b..e9f878bfe1e 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -46,13 +46,8 @@ public:
_deleteBucketInvocations(0)
{}
- spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp,
- const document::Document::SP& doc, spi::Context& context) override
+ spi::Result put(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&) override
{
- (void) bucket;
- (void) timestamp;
- (void) doc;
- (void) context;
_queueBarrier.await();
// message abort stage with active opertion in disk queue
std::this_thread::sleep_for(75ms);
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index dffd4ef1768..035da326d48 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -149,6 +149,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd;
};
+ MessageTracker::UP
+ createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
+ return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd));
+ }
+
std::string
doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
@@ -203,9 +208,9 @@ TEST_F(MergeHandlerTest, merge_bucket_command) {
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- cmd.setSourceIndex(1234);
- MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ cmd->setSourceIndex(1234);
+ MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
LOG(debug, "Check state");
ASSERT_EQ(1, messageKeeper()._msgs.size());
@@ -217,7 +222,7 @@ TEST_F(MergeHandlerTest, merge_bucket_command) {
EXPECT_EQ(1, cmd2.getAddress()->getIndex());
EXPECT_EQ(1234, cmd2.getSourceIndex());
- tracker->generateReply(cmd);
+ tracker->generateReply(*cmd);
EXPECT_FALSE(tracker->hasReply());
}
@@ -228,8 +233,8 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Verifying that get bucket diff is sent on");
- api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
- MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context);
+ auto cmd = std::make_shared<api::GetBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
+ MessageTracker::UP tracker1 = handler.handleGetBucketDiff(*cmd, createTracker(cmd, _bucket));
api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP();
if (midChain) {
@@ -277,8 +282,8 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Verifying that apply bucket diff is sent on");
- api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp);
- MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context);
+ auto cmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, _maxTimestamp);
+ MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
api::StorageMessage::SP replySent = std::move(*tracker1).stealReplySP();
if (midChain) {
@@ -324,9 +329,9 @@ TEST_F(MergeHandlerTest, master_message_flow) {
MergeHandler handler(getPersistenceProvider(), getEnv());
LOG(debug, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
LOG(debug, "Check state");
ASSERT_EQ(1, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType());
@@ -425,8 +430,8 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
LOG(debug, "Handle a merge bucket command");
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd);
@@ -505,9 +510,8 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, maxChunkSize);
applyBucketDiffCmd->getDiff() = applyDiff;
- MergeHandler handler(
- getPersistenceProvider(), getEnv(), maxChunkSize);
- handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
+ MergeHandler handler(getPersistenceProvider(), getEnv(), maxChunkSize);
+ handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto fwdDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
// Should not fill up more than chunk size allows for
@@ -520,8 +524,8 @@ TEST_F(MergeHandlerTest, max_timestamp) {
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
@@ -534,8 +538,7 @@ MergeHandlerTest::fillDummyApplyDiff(
std::vector<api::ApplyBucketDiffCommand::Entry>& diff)
{
document::TestDocMan docMan;
- document::Document::SP doc(
- docMan.createRandomDocumentAtLocation(_location));
+ document::Document::SP doc(docMan.createRandomDocumentAtLocation(_location));
std::vector<char> headerBlob;
{
vespalib::nbostream stream;
@@ -638,7 +641,8 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
providerWrapper.clearOperationLog();
try {
- handler.handleApplyBucketDiff(*createDummyApplyDiff(6000), *_context);
+ auto cmd = createDummyApplyDiff(6000);
+ handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket));
FAIL() << "No exception thrown on failing in-place remove";
} catch (const std::runtime_error& e) {
EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos);
@@ -648,15 +652,15 @@ TEST_F(MergeHandlerTest, spi_flush_guard) {
TEST_F(MergeHandlerTest, bucket_not_found_in_db) {
MergeHandler handler(getPersistenceProvider(), getEnv());
// Send merge for unknown bucket
- api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
- MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp);
+ MessageTracker::UP tracker = handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
EXPECT_TRUE(tracker->getResult().isBucketDisappearance());
}
TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
MergeHandler handler(getPersistenceProvider(), getEnv());
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd);
@@ -682,8 +686,8 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
_nodes.emplace_back(0, false);
_nodes.emplace_back(1, false);
_nodes.emplace_back(2, false);
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
auto getBucketDiffReply = std::make_unique<api::GetBucketDiffReply>(*getBucketDiffCmd);
@@ -722,7 +726,7 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024);
applyBucketDiffCmd->getDiff() = applyDiff;
- auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
+ auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
ASSERT_TRUE(applyBucketDiffReply.get());
@@ -809,10 +813,10 @@ void
MergeHandlerTest::HandleMergeBucketInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
- api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleMergeBucket(cmd, context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
}
TEST_F(MergeHandlerTest, merge_bucket_spi_failures) {
@@ -841,17 +845,16 @@ void
MergeHandlerTest::HandleGetBucketDiffInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context& )
{
- api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleGetBucketDiff(cmd, context);
+ auto cmd = std::make_shared<api::GetBucketDiffCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) {
PersistenceProviderWrapper providerWrapper(getPersistenceProvider());
MergeHandler handler(providerWrapper, getEnv());
- providerWrapper.setResult(
- spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
+ providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?"));
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
@@ -874,12 +877,11 @@ void
MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
++_counter;
- std::shared_ptr<api::ApplyBucketDiffCommand> cmd(
- test.createDummyApplyDiff(100000 * _counter));
- handler.handleApplyBucketDiff(*cmd, context);
+ auto cmd = test.createDummyApplyDiff(100000 * _counter);
+ handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
}
TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
@@ -904,8 +906,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it));
// Casual, in-place testing of bug 6752085.
// This will fail if we give NaN to the metric in question.
- EXPECT_TRUE(std::isfinite(getEnv()._metrics
- .mergeAverageDataReceivedNeeded.getLast()));
+ EXPECT_TRUE(std::isfinite(getEnv()._metrics.mergeAverageDataReceivedNeeded.getLast()));
}
}
@@ -913,10 +914,10 @@ void
MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
- api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleMergeBucket(cmd, context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
_diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>();
}
@@ -974,13 +975,13 @@ void
MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
MergeHandlerTest& test,
MergeHandler& handler,
- spi::Context& context)
+ spi::Context&)
{
++_counter;
_stub.clear();
if (getChainPos() == FRONT) {
- api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp);
- handler.handleMergeBucket(cmd, context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(test._bucket, test._nodes, test._maxTimestamp);
+ handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket));
auto diffCmd = test.fetchSingleMessage<api::GetBucketDiffCommand>();
auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4);
diffCmd->getDiff() = dummyDiff->getDiff();
@@ -995,7 +996,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke(
// Pretend last node in chain has data and that it will be fetched when
// chain is unwinded.
auto cmd = test.createDummyApplyDiff(100000 * _counter, 0x4, false);
- handler.handleApplyBucketDiff(*cmd, context);
+ handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket));
_applyCmd = test.fetchSingleMessage<api::ApplyBucketDiffCommand>();
}
}
@@ -1147,13 +1148,13 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) {
auto applyBucketDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(_bucket, _nodes, 1024*1024);
applyBucketDiffCmd->getDiff() = applyDiff;
- auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context);
+ auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket));
auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP());
ASSERT_TRUE(applyBucketDiffReply.get());
- api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp);
- handler.handleMergeBucket(cmd, *_context);
+ auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp);
+ handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket));
auto getBucketDiffCmd = fetchSingleMessage<api::GetBucketDiffCommand>();
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 25c0a36a7f5..4ac9dfd7765 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -283,7 +283,7 @@ PersistenceTestUtils::doPut(const document::Document::SP& doc,
spi::Context context(defaultLoadType, spi::Priority(0),
spi::Trace::TraceLevel(0));
getPersistenceProvider().createBucket(b, context);
- getPersistenceProvider().put(b, time, doc, context);
+ getPersistenceProvider().put(b, time, std::move(doc), context);
}
spi::UpdateResult
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index e418765ecac..3121bef61e5 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -37,6 +37,23 @@ struct PersistenceTestEnvironment {
class PersistenceTestUtils : public testing::Test {
public:
+ class NoBucketLock : public FileStorHandler::BucketLockInterface
+ {
+ public:
+ NoBucketLock(document::Bucket bucket) : _bucket(bucket) { }
+ const document::Bucket &getBucket() const override {
+ return _bucket;
+ }
+ api::LockingRequirements lockingRequirements() const noexcept override {
+ return api::LockingRequirements::Shared;
+ }
+ static std::shared_ptr<NoBucketLock> make(document::Bucket bucket) {
+ return std::make_shared<NoBucketLock>(bucket);
+ }
+ private:
+ document::Bucket _bucket;
+ };
+
std::unique_ptr<PersistenceTestEnvironment> _env;
PersistenceTestUtils();
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp
index 98a2be6880d..1ec6a35fb1d 100644
--- a/storage/src/tests/persistence/persistencethread_splittest.cpp
+++ b/storage/src/tests/persistence/persistencethread_splittest.cpp
@@ -201,19 +201,20 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
}
document::Document::SP doc(testDocMan.createRandomDocumentAtLocation(
docloc, seed, docSize, docSize));
- spi.put(bucket, spi::Timestamp(1000 + i), doc, context);
+ spi.put(bucket, spi::Timestamp(1000 + i), std::move(doc), context);
}
std::unique_ptr<PersistenceThread> thread(createPersistenceThread(0));
getNode().getStateUpdater().setClusterState(
std::make_shared<lib::ClusterState>("distributor:1 storage:1"));
- api::SplitBucketCommand cmd(makeDocumentBucket(document::BucketId(currentSplitLevel, 1)));
- cmd.setMaxSplitBits(maxBits);
- cmd.setMinSplitBits(minBits);
- cmd.setMinByteSize(maxSize);
- cmd.setMinDocCount(maxCount);
- cmd.setSourceIndex(0);
- MessageTracker::UP result(thread->handleSplitBucket(cmd, context));
+ document::Bucket docBucket = makeDocumentBucket(document::BucketId(currentSplitLevel, 1));
+ auto cmd = std::make_shared<api::SplitBucketCommand>(docBucket);
+ cmd->setMaxSplitBits(maxBits);
+ cmd->setMinSplitBits(minBits);
+ cmd->setMinByteSize(maxSize);
+ cmd->setMinDocCount(maxCount);
+ cmd->setSourceIndex(0);
+ MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd));
api::ReturnCode code(result->getResult());
EXPECT_EQ(error, code);
if (!code.success()) {
@@ -222,8 +223,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
auto& reply = dynamic_cast<api::SplitBucketReply&>(result->getReply());
std::set<std::string> expected;
for (uint32_t i=0; i<resultBuckets; ++i) {
- document::BucketId b(resultSplitLevel,
- location | (i == 0 ? 0 : splitMask));
+ document::BucketId b(resultSplitLevel, location | (i == 0 ? 0 : splitMask));
std::ostringstream ost;
ost << b << " - " << b.getUsedBits();
expected.insert(ost.str());
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index 5462b4a5b0a..0d482ebe5b7 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -20,10 +20,10 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
doPut(4, spi::Timestamp(1234));
doPut(4, spi::Timestamp(2345));
- api::RemoveLocationCommand removeLocation("id.user == 4", makeDocumentBucket(bucketId));
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- auto tracker = handler.handleRemoveLocation(removeLocation, context);
+ auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n",
@@ -45,10 +45,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
}
- api::RemoveLocationCommand
- removeLocation("testdoctype1.headerval % 2 == 0", makeDocumentBucket(bucketId));
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- auto tracker = handler.handleRemoveLocation(removeLocation, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket);
+ auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
@@ -71,12 +70,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
- api::RemoveLocationCommand
- removeLocation("unknowndoctype.headerval % 2 == 0", makeDocumentBucket(bucketId));
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception);
+ ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
dumpBucket(bucketId));
@@ -86,11 +84,11 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio
document::BucketId bucketId(16, 4);
doPut(4, spi::Timestamp(1234));
- api::RemoveLocationCommand removeLocation("id.bogus != badgers", makeDocumentBucket(bucketId));
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- ASSERT_THROW(handler.handleRemoveLocation(removeLocation, context), std::exception);
+ ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
dumpBucket(bucketId));
@@ -107,10 +105,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc
doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
}
- api::StatBucketCommand statBucket(makeDocumentBucket(bucketId),
- "testdoctype1.headerval % 2 == 0");
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0");
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
@@ -142,9 +139,9 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries)
true);
}
- api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true");
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true");
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
@@ -188,9 +185,9 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_
doPut(doc, bucketId, spi::Timestamp(100 + i), 0);
}
- api::StatBucketCommand statBucket(makeDocumentBucket(bucketId), "true");
- spi::Context context(documentapi::LoadType::DEFAULT, 0, 0);
- MessageTracker::UP tracker = handler.handleStatBucket(statBucket, context);
+ document::Bucket bucket = makeDocumentBucket(bucketId);
+ auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true");
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index 08555fe0627..864ab320527 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -29,6 +29,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
const document::StringFieldValue MATCHING_HEADER{"Some string with woofy dog as a substring"};
const document::StringFieldValue OLD_CONTENT{"Some old content"};
const document::StringFieldValue NEW_CONTENT{"Freshly pressed and squeezed content"};
+ const document::Bucket BUCKET = makeDocumentBucket(BUCKET_ID);
unique_ptr<PersistenceThread> thread;
shared_ptr<document::Document> testDoc;
@@ -39,6 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
: context(spi::LoadType(0, "default"), 0, 0)
{}
+ MessageTracker::UP
+ createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
+ return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd));
+ }
+
void SetUp() override {
SingleDiskPersistenceTestUtils::SetUp();
@@ -57,7 +63,7 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
SingleDiskPersistenceTestUtils::TearDown();
}
- std::unique_ptr<api::UpdateCommand> conditional_update_test(
+ std::shared_ptr<api::UpdateCommand> conditional_update_test(
bool createIfMissing,
api::Timestamp updateTimestamp);
@@ -82,10 +88,10 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) {
// Conditionally replace document, but fail due to lack of woofy dog
api::Timestamp timestampTwo = 1;
- api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo);
- setTestCondition(putTwo);
+ auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
+ setTestCondition(*putTwo);
- ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
}
@@ -102,10 +108,10 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) {
// Conditionally replace document with updated version, succeed in doing so
api::Timestamp timestampTwo = 1;
- api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo);
- setTestCondition(putTwo);
+ auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
+ setTestCondition(*putTwo);
- ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -122,10 +128,10 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) {
// Conditionally remove document, fail in doing so
api::Timestamp timestampTwo = 1;
- api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo);
- setTestCondition(remove);
+ auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
+ setTestCondition(*remove);
- ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -142,18 +148,17 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) {
// Conditionally remove document, succeed in doing so
api::Timestamp timestampTwo = 1;
- api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo);
- setTestCondition(remove);
+ auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
+ setTestCondition(*remove);
- ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY),
dumpBucket(BUCKET_ID));
}
-std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test(
- bool createIfMissing,
- api::Timestamp updateTimestamp)
+std::shared_ptr<api::UpdateCommand>
+TestAndSetTest::conditional_update_test(bool createIfMissing, api::Timestamp updateTimestamp)
{
auto docUpdate = std::make_shared<document::DocumentUpdate>(_env->_testDocMan.getTypeRepo(), testDoc->getType(), testDocId);
auto fieldUpdate = document::FieldUpdate(testDoc->getField("content"));
@@ -161,7 +166,7 @@ std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test(
docUpdate->addUpdate(fieldUpdate);
docUpdate->setCreateIfNonExistent(createIfMissing);
- auto updateUp = std::make_unique<api::UpdateCommand>(makeDocumentBucket(BUCKET_ID), docUpdate, updateTimestamp);
+ auto updateUp = std::make_unique<api::UpdateCommand>(BUCKET, docUpdate, updateTimestamp);
setTestCondition(*updateUp);
return updateUp;
}
@@ -172,7 +177,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -185,7 +190,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -197,7 +202,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -206,7 +211,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
@@ -215,10 +220,10 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) {
// Conditionally replace nonexisting document
// Fail early since document selection is invalid
api::Timestamp timestamp = 0;
- api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
- put.setCondition(documentapi::TestAndSetCondition("bjarne"));
+ auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
+ put->setCondition(documentapi::TestAndSetCondition("bjarne"));
- ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
+ ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -226,11 +231,11 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) {
// Conditionally replace nonexisting document
// Fail since no document exists to match with test and set
api::Timestamp timestamp = 0;
- api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
- setTestCondition(put);
- thread->handlePut(put, context);
+ auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
+ setTestCondition(*put);
+ thread->handlePut(*put, createTracker(put, BUCKET));
- ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(),
+ ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -253,8 +258,8 @@ TestAndSetTest::createTestDocument()
document::Document::SP TestAndSetTest::retrieveTestDocument()
{
- api::GetCommand get(makeDocumentBucket(BUCKET_ID), testDocId, "[all]");
- auto tracker = thread->handleGet(get, context);
+ auto get = std::make_shared<api::GetCommand>(BUCKET, testDocId, "[all]");
+ auto tracker = thread->handleGet(*get, createTracker(get, BUCKET));
assert(tracker->getResult() == api::ReturnCode::Result::OK);
auto & reply = static_cast<api::GetReply &>(tracker->getReply());
@@ -273,8 +278,8 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta
testDoc->setValue(testDoc->getField("hstringval"), MATCHING_HEADER);
}
- api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
- thread->handlePut(put, context);
+ auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
+ thread->handlePut(*put, createTracker(put, BUCKET));
}
void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value)
diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
index e906cfc624f..18877bdf8f7 100644
--- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
+++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
@@ -7,20 +7,16 @@ LOG_SETUP(".persistence.diskmoveoperationhandler");
namespace storage {
-DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env,
- spi::PersistenceProvider& provider)
+DiskMoveOperationHandler::DiskMoveOperationHandler(PersistenceUtil& env, spi::PersistenceProvider& provider)
: _env(env),
_provider(provider)
{
}
MessageTracker::UP
-DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd,
- spi::Context& context)
+DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd, MessageTracker::UP tracker)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.movedBuckets,
- _env._component.getClock()));
+ tracker->setMetric(_env._metrics.movedBuckets);
document::Bucket bucket(cmd.getBucket());
uint32_t targetDisk(cmd.getDstDisk());
@@ -46,13 +42,10 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd,
deviceIndex, targetDisk);
spi::Bucket from(bucket, spi::PartitionId(deviceIndex));
- spi::Bucket to(bucket, spi::PartitionId(targetDisk));
- spi::Result result(
- _provider.move(from, spi::PartitionId(targetDisk), context));
+ spi::Result result(_provider.move(from, spi::PartitionId(targetDisk), tracker->context()));
if (result.hasError()) {
- tracker->fail(api::ReturnCode::INTERNAL_FAILURE,
- result.getErrorMessage());
+ tracker->fail(api::ReturnCode::INTERNAL_FAILURE, result.getErrorMessage());
return tracker;
}
@@ -82,12 +75,7 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd,
}
// Answer message, setting extra info such as filesize
- tracker->setReply(std::shared_ptr<BucketDiskMoveReply>(
- new BucketDiskMoveReply(
- cmd,
- bInfo,
- sourceFileSize,
- sourceFileSize)));
+ tracker->setReply(std::make_shared<BucketDiskMoveReply>(cmd, bInfo, sourceFileSize, sourceFileSize));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h
index f0c4bbef66a..9e8d33fc802 100644
--- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h
+++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.h
@@ -12,8 +12,7 @@ public:
DiskMoveOperationHandler(PersistenceUtil&,
spi::PersistenceProvider& provider);
- MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&,
- spi::Context&);
+ MessageTracker::UP handleBucketDiskMove(BucketDiskMoveCommand&, MessageTracker::UP tracker);
private:
PersistenceUtil& _env;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 4cb687bb753..3483b15dd0e 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -537,9 +537,10 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket,
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
- checkResult(_spi.put(bucket, timestamp, doc, context),
+ DocumentId docId = doc->getId();
+ checkResult(_spi.put(bucket, timestamp, std::move(doc), context),
bucket,
- doc->getId(),
+ docId,
"put");
} else {
DocumentId docId(e._docName);
@@ -903,12 +904,9 @@ public:
};
MessageTracker::UP
-MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
- spi::Context& context)
+MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP tracker)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.mergeBuckets,
- _env._component.getClock()));
+ tracker->setMetric(_env._metrics.mergeBuckets);
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
LOG(debug, "MergeBucket(%s) with max timestamp %" PRIu64 ".",
@@ -949,44 +947,33 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd,
tracker->fail(ReturnCode::BUSY, err);
return tracker;
}
- checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
+ checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
- MergeStatus::SP s = MergeStatus::SP(new MergeStatus(
+ auto s = std::make_shared<MergeStatus>(
_env._component.getClock(), cmd.getLoadType(),
- cmd.getPriority(), cmd.getTrace().getLevel()));
+ cmd.getPriority(), cmd.getTrace().getLevel());
_env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
s->nodeList = cmd.getNodes();
s->maxTimestamp = Timestamp(cmd.getMaxTimestamp());
s->timeout = cmd.getTimeout();
s->startTime = framework::MilliSecTimer(_env._component.getClock());
- std::shared_ptr<api::GetBucketDiffCommand> cmd2(
- new api::GetBucketDiffCommand(bucket.getBucket(),
- s->nodeList,
- s->maxTimestamp.getTime()));
- if (!buildBucketInfoList(bucket,
- cmd.getLoadType(),
- s->maxTimestamp,
- 0,
- cmd2->getDiff(),
- context))
- {
+ auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), s->nodeList, s->maxTimestamp.getTime());
+ if (!buildBucketInfoList(bucket, cmd.getLoadType(), s->maxTimestamp, 0, cmd2->getDiff(), tracker->context())) {
LOG(debug, "Bucket non-existing in db. Failing merge.");
tracker->fail(ReturnCode::BUCKET_DELETED,
"Bucket not found in buildBucketInfo step");
return tracker;
}
- _env._metrics.mergeMetadataReadLatency.addValue(
- s->startTime.getElapsedTimeAsDouble());
+ _env._metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble());
LOG(spam, "Sending GetBucketDiff %" PRIu64 " for %s to next node %u "
"with diff of %u entries.",
cmd2->getMsgId(),
bucket.toString().c_str(),
s->nodeList[1].index,
uint32_t(cmd2->getDiff().size()));
- cmd2->setAddress(createAddress(_env._component.getClusterName(),
- s->nodeList[1].index));
+ cmd2->setAddress(createAddress(_env._component.getClusterName(), s->nodeList[1].index));
cmd2->setPriority(s->context.getPriority());
cmd2->setTimeout(s->timeout);
cmd2->setSourceIndex(cmd.getSourceIndex());
@@ -1129,15 +1116,12 @@ namespace {
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
- spi::Context& context)
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker)
{
- MessageTracker::UP tracker(new MessageTracker(
- _env._metrics.getBucketDiff,
- _env._component.getClock()));
+ tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
- checkResult(_spi.createBucket(bucket, context), bucket, "create bucket");
+ checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(ReturnCode::BUSY,
@@ -1151,7 +1135,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
framework::MilliSecTimer startTime(_env._component.getClock());
if (!buildBucketInfoList(bucket, cmd.getLoadType(),
Timestamp(cmd.getMaxTimestamp()),
- index, local, context))
+ index, local, tracker->context()))
{
LOG(debug, "Bucket non-existing in db. Failing merge.");
tracker->fail(ReturnCode::BUCKET_DELETED,
@@ -1186,31 +1170,26 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd,
cmd.getMsgId(), bucket.toString().c_str(),
cmd.getNodes()[index - 1].index, final.size(), local.size());
- api::GetBucketDiffReply* reply = new api::GetBucketDiffReply(cmd);
- tracker->setReply(api::StorageReply::SP(reply));
+ auto reply = std::make_shared<api::GetBucketDiffReply>(cmd);
reply->getDiff().swap(final);
+ tracker->setReply(std::move(reply));
} else {
// When not the last node in merge chain, we must save reply, and
// send command on.
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
- MergeStatus::SP s(new MergeStatus(_env._component.getClock(),
+ auto s = std::make_shared<MergeStatus>(_env._component.getClock(),
cmd.getLoadType(), cmd.getPriority(),
- cmd.getTrace().getLevel()));
+ cmd.getTrace().getLevel());
_env._fileStorHandler.addMergeStatus(bucket.getBucket(), s);
- s->pendingGetDiff =
- api::GetBucketDiffReply::SP(new api::GetBucketDiffReply(cmd));
+ s->pendingGetDiff = std::make_shared<api::GetBucketDiffReply>(cmd);
s->pendingGetDiff->setPriority(cmd.getPriority());
- LOG(spam, "Sending GetBucketDiff for %s on to node %d, "
- "added %zu new entries to diff.",
+ LOG(spam, "Sending GetBucketDiff for %s on to node %d, added %zu new entries to diff.",
bucket.toString().c_str(), cmd.getNodes()[index + 1].index,
local.size() - remote.size());
- std::shared_ptr<api::GetBucketDiffCommand> cmd2(
- new api::GetBucketDiffCommand(
- bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp()));
- cmd2->setAddress(createAddress(_env._component.getClusterName(),
- cmd.getNodes()[index + 1].index));
+ auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), cmd.getNodes(), cmd.getMaxTimestamp());
+ cmd2->setAddress(createAddress(_env._component.getClusterName(), cmd.getNodes()[index + 1].index));
cmd2->getDiff().swap(local);
cmd2->setPriority(cmd.getPriority());
cmd2->setTimeout(cmd.getTimeout());
@@ -1330,10 +1309,9 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
}
MessageTracker::UP
-MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
- spi::Context& context)
+MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.applyBucketDiff, _env._component.getClock());
+ tracker->setMetric(_env._metrics.applyBucketDiff);
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
LOG(debug, "%s", cmd.toString().c_str());
@@ -1348,10 +1326,8 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
bool lastInChain = index + 1u >= cmd.getNodes().size();
if (applyDiffNeedLocalData(cmd.getDiff(), index, !lastInChain)) {
framework::MilliSecTimer startTime(_env._component.getClock());
- fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index,
- context);
- _env._metrics.mergeDataReadLatency.addValue(
- startTime.getElapsedTimeAsDouble());
+ fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context());
+ _env._metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
} else {
LOG(spam, "Merge(%s): Moving %zu entries, didn't need "
"local data on node %u (%u).",
@@ -1363,7 +1339,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd,
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_env._component.getClock());
api::BucketInfo info(applyDiffLocally(bucket, cmd.getLoadType(),
- cmd.getDiff(), index, context));
+ cmd.getDiff(), index, tracker->context()));
_env._metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
} else {
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 4ea69bd0fdf..7052258ec03 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -55,13 +55,10 @@ public:
uint8_t nodeIndex,
spi::Context& context);
- MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&,
- spi::Context&);
- MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&,
- spi::Context&);
+ MessageTracker::UP handleMergeBucket(api::MergeBucketCommand&, MessageTracker::UP);
+ MessageTracker::UP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTracker::UP);
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&);
- MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&,
- spi::Context&);
+ MessageTracker::UP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTracker::UP);
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&);
private:
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 422e19a492e..5905d93cc83 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -69,7 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id, const document::Bucket &bucke
bool
PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker)
{
- uint32_t code = _env.convertErrorCode(response);
+ uint32_t code = PersistenceUtil::convertErrorCode(response);
if (code != 0) {
tracker.fail(code, response.getErrorMessage());
@@ -80,11 +80,13 @@ PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tr
}
-bool PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) {
+bool
+PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) {
return cmd.getCondition().isPresent();
}
-bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
+bool
+PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch) {
try {
TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch);
@@ -104,35 +106,35 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd,
}
MessageTracker::UP
-PersistenceThread::handlePut(api::PutCommand& cmd, spi::Context & context)
+PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.put[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) {
return tracker;
}
spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), context);
+ spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker->context());
checkForError(response, *tracker);
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context)
+PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.remove[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) {
return tracker;
}
spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), context);
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker->context());
if (checkForError(response, *tracker)) {
tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
@@ -143,18 +145,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context)
}
MessageTracker::UP
-PersistenceThread::handleUpdate(api::UpdateCommand& cmd, spi::Context & context)
+PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.update[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context, cmd.getUpdate()->getCreateIfNonExistent())) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) {
return tracker;
}
spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), context);
+ spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context());
if (checkForError(response, *tracker)) {
auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
@@ -176,17 +178,16 @@ spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency co
}
MessageTracker::UP
-PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context)
+PersistenceThread::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker)
{
auto& metrics = _env._metrics.get[cmd.getLoadType()];
- auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock());
+ tracker->setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
- context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
+ document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
+ tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
spi::GetResult result =
- _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), context);
+ _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), tracker->context());
if (checkForError(result, *tracker)) {
if (!result.hasDocument()) {
@@ -199,9 +200,9 @@ PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context)
}
MessageTracker::UP
-PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
+PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock());
+ tracker->setMetric(_env._metrics.repairs);
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(),
(cmd.verifyBody() ? "Verifying body" : "Not verifying body"));
@@ -225,28 +226,28 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleRevert(api::RevertCommand& cmd, spi::Context & context)
+PersistenceThread::handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock());
+ tracker->setMetric(_env._metrics.revert[cmd.getLoadType()]);
spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
for (const api::Timestamp & token : tokens) {
- spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), context);
+ spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), tracker->context());
}
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context)
+PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.createBuckets);
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
}
spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- _spi.createBucket(spiBucket, context);
+ _spi.createBucket(spiBucket, tracker->context());
if (cmd.getActive()) {
_spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
}
@@ -295,9 +296,9 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, con
}
MessageTracker::UP
-PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context)
+PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.deleteBuckets);
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()");
if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
@@ -308,7 +309,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex
if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
return tracker;
}
- _spi.deleteBucket(bucket, context);
+ _spi.deleteBucket(bucket, tracker->context());
StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
{
StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
@@ -333,12 +334,12 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Contex
}
MessageTracker::UP
-PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context)
+PersistenceThread::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock());
- spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), context));
+ tracker->setMetric(_env._metrics.visit[cmd.getLoadType()]);
+ spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), tracker->context()));
if (checkForError(result, *tracker)) {
- GetIterReply::SP reply(new GetIterReply(cmd));
+ auto reply = std::make_shared<GetIterReply>(cmd);
reply->getEntries() = result.steal_entries();
_env._metrics.visit[cmd.getLoadType()].
documentsPerIterate.addValue(reply->getEntries().size());
@@ -351,9 +352,9 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context)
}
MessageTracker::UP
-PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
+PersistenceThread::handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock());
+ tracker->setMetric(_env._metrics.readBucketList);
spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition()));
if (checkForError(result, *tracker)) {
@@ -366,23 +367,22 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd)
}
MessageTracker::UP
-PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd)
+PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock());
+ tracker->setMetric(_env._metrics.readBucketInfo);
_env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket()));
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context)
+PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock());
- document::FieldSetRepo repo;
- document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields());
- context.setReadConsistency(cmd.getReadConsistency());
+ tracker->setMetric(_env._metrics.createIterator);
+ document::FieldSet::UP fieldSet = document::FieldSetRepo::parse(*_env._component.getTypeRepo(), cmd.getFields());
+ tracker->context().setReadConsistency(cmd.getReadConsistency());
spi::CreateIteratorResult result(_spi.createIterator(
spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
- *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), context));
+ *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), tracker->context()));
if (checkForError(result, *tracker)) {
tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
}
@@ -390,9 +390,9 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context
}
MessageTracker::UP
-PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context)
+PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.splitBuckets);
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
// Calculate the various bucket ids involved.
@@ -411,7 +411,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
SplitBitDetector::Result targetInfo;
if (_env._config.enableMultibitSplitOptimalization) {
targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(),
- context, cmd.getMinDocCount(), cmd.getMinByteSize());
+ tracker->context(), cmd.getMinDocCount(), cmd.getMinByteSize());
}
if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
document::BucketId src(cmd.getBucketId());
@@ -451,9 +451,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
}
#endif
spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)),
- spi::Bucket(target2, spi::PartitionId(lock2.disk)), context);
+ spi::Bucket(target2, spi::PartitionId(lock2.disk)), tracker->context());
if (result.hasError()) {
- tracker->fail(_env.convertErrorCode(result), result.getErrorMessage());
+ tracker->fail(PersistenceUtil::convertErrorCode(result), result.getErrorMessage());
return tracker;
}
// After split we need to take all bucket db locks to update them.
@@ -509,7 +509,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
spi::PartitionId(targets[i].second.diskIndex)));
LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it",
createTarget.toString().c_str());
- _spi.createBucket(createTarget, context);
+ _spi.createBucket(createTarget, tracker->context());
}
splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(),
targets[i].first->getBucketInfo());
@@ -529,7 +529,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context
}
bool
-PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const
+PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker)
{
if (cmd.getSourceBuckets().size() != 2) {
tracker.fail(ReturnCode::ILLEGAL_PARAMETERS,
@@ -554,9 +554,9 @@ PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, Messa
}
MessageTracker::UP
-PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context)
+PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock());
+ tracker->setMetric(_env._metrics.joinBuckets);
if (!validateJoinCommand(cmd, *tracker)) {
return tracker;
}
@@ -603,7 +603,7 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context
_spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)),
spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)),
spi::Bucket(destBucket, spi::PartitionId(_env._partition)),
- context);
+ tracker->context());
if (!checkForError(result, *tracker)) {
return tracker;
}
@@ -634,9 +634,9 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context
}
MessageTracker::UP
-PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
+PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.setBucketStates,_env._component.getClock());
+ tracker->setMetric(_env._metrics.setBucketStates);
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
@@ -665,9 +665,9 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context)
+PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock());
+ tracker->setMetric(_env._metrics.internalJoin);
document::Bucket destBucket = cmd.getBucket();
{
// Create empty bucket for target.
@@ -682,7 +682,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi:
_spi.join(spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())),
spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())),
spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())),
- context);
+ tracker->context());
if (checkForError(result, *tracker)) {
tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket())));
}
@@ -690,9 +690,9 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi:
}
MessageTracker::UP
-PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd)
+PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(_env._metrics.recheckBucketInfo, _env._component.getClock());
+ tracker->setMetric(_env._metrics.recheckBucketInfo);
document::Bucket bucket(cmd.getBucket());
api::BucketInfo info(_env.getBucketInfo(bucket));
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
@@ -720,58 +720,58 @@ PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Context & context)
+PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker)
{
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
- return handleGet(static_cast<api::GetCommand&>(msg), context);
+ return handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
case api::MessageType::PUT_ID:
- return handlePut(static_cast<api::PutCommand&>(msg), context);
+ return handlePut(static_cast<api::PutCommand&>(msg), std::move(tracker));
case api::MessageType::REMOVE_ID:
- return handleRemove(static_cast<api::RemoveCommand&>(msg), context);
+ return handleRemove(static_cast<api::RemoveCommand&>(msg), std::move(tracker));
case api::MessageType::UPDATE_ID:
- return handleUpdate(static_cast<api::UpdateCommand&>(msg), context);
+ return handleUpdate(static_cast<api::UpdateCommand&>(msg), std::move(tracker));
case api::MessageType::REVERT_ID:
- return handleRevert(static_cast<api::RevertCommand&>(msg), context);
+ return handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), context);
+ return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), context);
+ return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
- return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), context);
+ return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
- return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), context);
+ return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), std::move(tracker));
// Depends on iterators
case api::MessageType::STATBUCKET_ID:
- return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), context);
+ return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), std::move(tracker));
case api::MessageType::REMOVELOCATION_ID:
- return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), context);
+ return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), std::move(tracker));
case api::MessageType::MERGEBUCKET_ID:
- return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), context);
+ return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), std::move(tracker));
case api::MessageType::GETBUCKETDIFF_ID:
- return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), context);
+ return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::APPLYBUCKETDIFF_ID:
- return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), context);
+ return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::SETBUCKETSTATE_ID:
- return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg));
+ return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
- return handleGetIter(static_cast<GetIterCommand&>(msg), context);
+ return handleGetIter(static_cast<GetIterCommand&>(msg), std::move(tracker));
case CreateIteratorCommand::ID:
- return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), context);
+ return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), std::move(tracker));
case ReadBucketList::ID:
- return handleReadBucketList(static_cast<ReadBucketList&>(msg));
+ return handleReadBucketList(static_cast<ReadBucketList&>(msg), std::move(tracker));
case ReadBucketInfo::ID:
- return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg));
+ return handleReadBucketInfo(static_cast<ReadBucketInfo&>(msg), std::move(tracker));
case RepairBucketCommand::ID:
- return handleRepairBucket(static_cast<RepairBucketCommand&>(msg));
+ return handleRepairBucket(static_cast<RepairBucketCommand&>(msg), std::move(tracker));
case BucketDiskMoveCommand::ID:
- return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), context);
+ return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), std::move(tracker));
case InternalBucketJoinCommand::ID:
- return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), context);
+ return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), std::move(tracker));
case RecheckBucketInfoCommand::ID:
- return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg));
+ return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg), std::move(tracker));
default:
LOG(warning, "Persistence thread received unhandled internal command %s", msg.toString().c_str());
break;
@@ -782,21 +782,6 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Conte
return MessageTracker::UP();
}
-MessageTracker::UP
-PersistenceThread::handleCommand(api::StorageCommand& msg)
-{
- spi::Context context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel());
- MessageTracker::UP mtracker(handleCommandSplitByType(msg, context));
- if (mtracker && ! context.getTrace().getRoot().isEmpty()) {
- if (mtracker->hasReply()) {
- mtracker->getReply().getTrace().getRoot().addChild(context.getTrace().getRoot());
- } else {
- msg.getTrace().getRoot().addChild(context.getTrace().getRoot());
- }
- }
- return mtracker;
-}
-
void
PersistenceThread::handleReply(api::StorageReply& reply)
{
@@ -813,7 +798,7 @@ PersistenceThread::handleReply(api::StorageReply& reply)
}
MessageTracker::UP
-PersistenceThread::processMessage(api::StorageMessage& msg)
+PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP tracker)
{
MBUS_TRACE(msg.getTrace(), 5, "PersistenceThread: Processing message in persistence layer");
@@ -829,13 +814,12 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
}
} else {
api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg);
-
try {
int64_t startTime(_component->getClock().getTimeInMillis().getTime());
LOG(debug, "Handling command: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- auto tracker(handleCommand(initiatingCommand));
+ tracker = handleCommandSplitByType(initiatingCommand, std::move(tracker));
if (!tracker) {
LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str());
} else {
@@ -867,47 +851,18 @@ PersistenceThread::processMessage(api::StorageMessage& msg)
}
}
- return MessageTracker::UP();
-}
-
-namespace {
-
-
-bool isBatchable(api::MessageType::Id id)
-{
- return (id == api::MessageType::PUT_ID ||
- id == api::MessageType::REMOVE_ID ||
- id == api::MessageType::UPDATE_ID ||
- id == api::MessageType::REVERT_ID);
-}
-
-bool hasBucketInfo(api::MessageType::Id id)
-{
- return (isBatchable(id) ||
- (id == api::MessageType::REMOVELOCATION_ID ||
- id == api::MessageType::JOINBUCKETS_ID));
-}
-
+ return tracker;
}
void
-PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) {
- std::vector<MessageTracker::UP> trackers;
- document::Bucket bucket = lock.first->getBucket();
-
+PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) {
LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
api::StorageMessage & msg(*lock.second);
- std::unique_ptr<MessageTracker> tracker = processMessage(msg);
- if (tracker && tracker->hasReply()) {
- if (hasBucketInfo(msg.getType().getId())) {
- if (tracker->getReply().getResult().success()) {
- _env.setBucketInfo(*tracker, bucket);
- }
- }
- LOG(spam, "Sending reply up: %s %" PRIu64,
- tracker->getReply().toString().c_str(), tracker->getReply().getMsgId());
- _env._fileStorHandler.sendReply(std::move(*tracker).stealReplySP());
+ auto tracker = std::make_unique<MessageTracker>(_env, std::move(lock.first), std::move(lock.second));
+ tracker = processMessage(msg, std::move(tracker));
+ if (tracker) {
+ tracker->sendReply();
}
}
@@ -922,7 +877,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId));
if (lock.first) {
- processLockedMessage(lock);
+ processLockedMessage(std::move(lock));
}
vespalib::MonitorGuard flushMonitorGuard(_flushMonitor);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 56414835b7b..a3c8099f228 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -28,23 +28,23 @@ public:
void flush() override;
framework::Thread& getThread() override { return *_thread; }
- MessageTracker::UP handlePut(api::PutCommand& cmd, spi::Context & context);
- MessageTracker::UP handleRemove(api::RemoveCommand& cmd, spi::Context & context);
- MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, spi::Context & context);
- MessageTracker::UP handleGet(api::GetCommand& cmd, spi::Context & context);
- MessageTracker::UP handleRevert(api::RevertCommand& cmd, spi::Context & context);
- MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context);
- MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context);
- MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context);
- MessageTracker::UP handleGetIter(GetIterCommand& cmd, spi::Context & context);
- MessageTracker::UP handleReadBucketList(ReadBucketList& cmd);
- MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd);
- MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context);
- MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd);
- MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context);
- MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context);
- MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd);
- MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd);
+ MessageTracker::UP handlePut(api::PutCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleGet(api::GetCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRevert(api::RevertCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleReadBucketList(ReadBucketList& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd, MessageTracker::UP tracker);
+ MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker);
private:
uint32_t _stripeId;
@@ -67,23 +67,22 @@ private:
* an appropriate error and returns false iff the command does not validate
* OK. Returns true and does not touch the tracker otherwise.
*/
- bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const;
+ static bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker);
// Message handling functions
- MessageTracker::UP handleCommand(api::StorageCommand&);
- MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, spi::Context & context);
+ MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker);
void handleReply(api::StorageReply&);
- MessageTracker::UP processMessage(api::StorageMessage& msg);
- void processLockedMessage(FileStorHandler::LockedMessage & lock);
+ MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker);
+ void processLockedMessage(FileStorHandler::LockedMessage lock);
// Thread main loop
void run(framework::ThreadHandle&) override;
- bool checkForError(const spi::Result& response, MessageTracker& tracker);
+ static bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const;
friend class TestAndSetHelper;
- bool tasConditionExists(const api::TestAndSetCommand & cmd);
+ static bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch = false);
};
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 9c49dc96750..53679a1a364 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -14,22 +14,69 @@ namespace {
ost << "PersistenceUtil(" << p << ")";
return ost.str();
}
+
+ bool isBatchable(api::MessageType::Id id)
+ {
+ return (id == api::MessageType::PUT_ID ||
+ id == api::MessageType::REMOVE_ID ||
+ id == api::MessageType::UPDATE_ID ||
+ id == api::MessageType::REVERT_ID);
+ }
+
+ bool hasBucketInfo(api::MessageType::Id id)
+ {
+ return (isBatchable(id) ||
+ (id == api::MessageType::REMOVELOCATION_ID ||
+ id == api::MessageType::JOINBUCKETS_ID));
+ }
+
}
-MessageTracker::MessageTracker(FileStorThreadMetrics::Op& metric,
- framework::Clock& clock)
+MessageTracker::MessageTracker(PersistenceUtil & env,
+ FileStorHandler::BucketLockInterface::SP bucketLock,
+ api::StorageMessage::SP msg)
: _sendReply(true),
- _metric(metric),
+ _updateBucketInfo(hasBucketInfo(msg->getType().getId())),
+ _bucketLock(std::move(bucketLock)),
+ _msg(std::move(msg)),
+ _context(_msg->getLoadType(), _msg->getPriority(), _msg->getTrace().getLevel()),
+ _env(env),
+ _metric(nullptr),
_result(api::ReturnCode::OK),
- _timer(clock)
-{
- _metric.count.inc();
+ _timer(_env._component.getClock())
+{ }
+
+void
+MessageTracker::setMetric(FileStorThreadMetrics::Op& metric) {
+ metric.count.inc();
+ _metric = &metric;
}
MessageTracker::~MessageTracker()
{
if (_reply.get() && _reply->getResult().success()) {
- _metric.latency.addValue(_timer.getElapsedTimeAsDouble());
+ _metric->latency.addValue(_timer.getElapsedTimeAsDouble());
+ }
+}
+
+void
+MessageTracker::sendReply() {
+ if (hasReply()) {
+ if ( ! _context.getTrace().getRoot().isEmpty()) {
+ getReply().getTrace().getRoot().addChild(_context.getTrace().getRoot());
+ }
+ if (_updateBucketInfo) {
+ if (getReply().getResult().success()) {
+ _env.setBucketInfo(*this, _bucketLock->getBucket());
+ }
+ }
+ LOG(spam, "Sending reply up: %s %" PRIu64,
+ getReply().toString().c_str(), getReply().getMsgId());
+ _env._fileStorHandler.sendReply(std::move(_reply));
+ } else {
+ if ( ! _context.getTrace().getRoot().isEmpty()) {
+ _msg->getTrace().getRoot().addChild(_context.getTrace().getRoot());
+ }
}
}
@@ -53,7 +100,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
}
if (!_reply->getResult().success()) {
- _metric.failed.inc();
+ _metric->failed.inc();
LOGBP(debug, "Failed to handle command %s: %s",
cmd.toString().c_str(),
_result.toString().c_str());
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index e8e5f947814..c6cb943f0b9 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -13,14 +13,18 @@
namespace storage {
+class PersistenceUtil;
+
class MessageTracker : protected Types {
public:
typedef std::unique_ptr<MessageTracker> UP;
- MessageTracker(FileStorThreadMetrics::Op& metric, framework::Clock& clock);
+ MessageTracker(PersistenceUtil & env, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
~MessageTracker();
+ void setMetric(FileStorThreadMetrics::Op& metric);
+
/**
* Called by operation handlers to set reply if they need to send a
* non-default reply. They should call this function as soon as they create
@@ -57,23 +61,32 @@ public:
api::ReturnCode getResult() const { return _result; }
+ spi::Context & context() { return _context; }
+
+ void sendReply();
+
private:
- bool _sendReply;
- FileStorThreadMetrics::Op& _metric;
- api::StorageReply::SP _reply;
- api::ReturnCode _result;
- framework::MilliSecTimer _timer;
+ bool _sendReply;
+ bool _updateBucketInfo;
+ FileStorHandler::BucketLockInterface::SP _bucketLock;
+ api::StorageMessage::SP _msg;
+ spi::Context _context;
+ PersistenceUtil &_env;
+ FileStorThreadMetrics::Op *_metric;
+ api::StorageReply::SP _reply;
+ api::ReturnCode _result;
+ framework::MilliSecTimer _timer;
};
struct PersistenceUtil {
vespa::config::content::StorFilestorConfig _config;
- ServiceLayerComponentRegister& _compReg;
- ServiceLayerComponent _component;
- FileStorHandler& _fileStorHandler;
- uint16_t _partition;
- uint16_t _nodeIndex;
- FileStorThreadMetrics& _metrics;
- const document::BucketIdFactory& _bucketFactory;
+ ServiceLayerComponentRegister &_compReg;
+ ServiceLayerComponent _component;
+ FileStorHandler &_fileStorHandler;
+ uint16_t _partition;
+ uint16_t _nodeIndex;
+ FileStorThreadMetrics &_metrics;
+ const document::BucketIdFactory &_bucketFactory;
const std::shared_ptr<const document::DocumentTypeRepo> _repo;
spi::PersistenceProvider& _spi;
diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp
index f37c6723933..4829bdf4581 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.cpp
+++ b/storage/src/vespa/storage/persistence/processallhandler.cpp
@@ -80,25 +80,18 @@ public:
}
MessageTracker::UP
-ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd,
- spi::Context& context)
+ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(
- _env._metrics.removeLocation[cmd.getLoadType()],
- _env._component.getClock());
+ tracker->setMetric(_env._metrics.removeLocation[cmd.getLoadType()]);
LOG(debug, "RemoveLocation(%s): using selection '%s'",
cmd.getBucketId().toString().c_str(),
cmd.getDocumentSelection().c_str());
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- UnrevertableRemoveEntryProcessor processor(_spi, bucket, context);
- BucketProcessor::iterateAll(_spi,
- bucket,
- cmd.getDocumentSelection(),
- processor,
- spi::NEWEST_DOCUMENT_ONLY,
- context);
+ UnrevertableRemoveEntryProcessor processor(_spi, bucket, tracker->context());
+ BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
+ processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context());
tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed));
@@ -106,12 +99,9 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd,
}
MessageTracker::UP
-ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd,
- spi::Context& context)
+ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, MessageTracker::UP tracker)
{
- auto tracker = std::make_unique<MessageTracker>(
- _env._metrics.statBucket[cmd.getLoadType()],
- _env._component.getClock());
+ tracker->setMetric(_env._metrics.statBucket[cmd.getLoadType()]);
std::ostringstream ost;
ost << "Persistence bucket " << cmd.getBucketId()
@@ -119,15 +109,10 @@ ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd,
spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
StatEntryProcessor processor(ost);
- BucketProcessor::iterateAll(_spi,
- bucket,
- cmd.getDocumentSelection(),
- processor,
- spi::ALL_VERSIONS,
- context);
-
- api::StatBucketReply::UP reply(new api::StatBucketReply(cmd, ost.str()));
- tracker->setReply(api::StorageReply::SP(reply.release()));
+ BucketProcessor::iterateAll(_spi, bucket, cmd.getDocumentSelection(),
+ processor, spi::ALL_VERSIONS,tracker->context());
+
+ tracker->setReply(std::make_shared<api::StatBucketReply>(cmd, ost.str()));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/processallhandler.h b/storage/src/vespa/storage/persistence/processallhandler.h
index 37b46ffc728..87c3c63b8fe 100644
--- a/storage/src/vespa/storage/persistence/processallhandler.h
+++ b/storage/src/vespa/storage/persistence/processallhandler.h
@@ -8,11 +8,7 @@
#include <vespa/storageapi/message/stat.h>
#include <vespa/persistence/spi/persistenceprovider.h>
-namespace document {
-namespace select {
-class Node;
-}
-}
+namespace document::select { class Node; }
namespace storage {
@@ -20,9 +16,8 @@ class ProcessAllHandler : public Types {
public:
ProcessAllHandler(PersistenceUtil&, spi::PersistenceProvider&);
- MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&,
- spi::Context&);
- MessageTracker::UP handleStatBucket(api::StatBucketCommand&, spi::Context&);
+ MessageTracker::UP handleRemoveLocation(api::RemoveLocationCommand&, MessageTracker::UP tracker);
+ MessageTracker::UP handleStatBucket(api::StatBucketCommand&, MessageTracker::UP tracker);
protected:
PersistenceUtil& _env;
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 76c420e76e6..ed3de5c7873 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -74,12 +74,9 @@ ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const
}
spi::Result
-ProviderErrorWrapper::put(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const spi::DocumentSP& doc,
- spi::Context& context)
+ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::DocumentSP doc, spi::Context& context)
{
- return checkResult(_impl.put(bucket, ts, doc, context));
+ return checkResult(_impl.put(bucket, ts, std::move(doc), context));
}
spi::RemoveResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 292eb004223..61664419c69 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -47,7 +47,7 @@ public:
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, const spi::DocumentSP&, spi::Context&) override;
+ spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;