summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-30 15:33:30 +0200
committerGitHub <noreply@github.com>2020-04-30 15:33:30 +0200
commit48ff6fe2efd6901796a9b8a0ceb8161232bcea15 (patch)
tree23fe33f72f246bb19b938acd0b719e5515dada9d /searchcore
parent6e35291a4a5d190e0c09e627e99a5af09b00e33b (diff)
parent704b6635543a6e0b1489f6371de865383575c6e3 (diff)
Merge pull request #13113 from vespa-engine/balder/make-put-async
- Add async interface to put
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/proton/downpersistence.cpp48
-rw-r--r--searchcore/src/apps/proton/downpersistence.h2
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp3
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp6
-rw-r--r--searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp2
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp2
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h86
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h15
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp83
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h47
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h2
21 files changed, 228 insertions, 138 deletions
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp
index d4ec9cee395..511d0e3fee8 100644
--- a/searchcore/src/apps/proton/downpersistence.cpp
+++ b/searchcore/src/apps/proton/downpersistence.cpp
@@ -19,9 +19,7 @@ DownPersistence::DownPersistence(const vespalib::string &downReason)
{
}
-DownPersistence::~DownPersistence()
-{
-}
+DownPersistence::~DownPersistence() = default;
Result
DownPersistence::initialize()
@@ -40,8 +38,7 @@ DownPersistence::getPartitionStates() const
BucketIdListResult
DownPersistence::listBuckets(BucketSpace, PartitionId) const
{
- return BucketIdListResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
@@ -59,30 +56,25 @@ DownPersistence:: setActiveState(const Bucket&, BucketInfo::ActiveState)
BucketInfoResult
DownPersistence:: getBucketInfo(const Bucket&) const
{
- return BucketInfoResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return BucketInfoResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
-DownPersistence::put(const Bucket&, Timestamp, const Document::SP&, Context&)
+DownPersistence::put(const Bucket&, Timestamp, Document::SP, Context&)
{
return errorResult;
}
RemoveResult
-DownPersistence:: remove(const Bucket&, Timestamp,
- const DocumentId&, Context&)
+DownPersistence:: remove(const Bucket&, Timestamp, const DocumentId&, Context&)
{
- return RemoveResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
RemoveResult
-DownPersistence::removeIfFound(const Bucket&, Timestamp,
- const DocumentId&, Context&)
+DownPersistence::removeIfFound(const Bucket&, Timestamp,const DocumentId&, Context&)
{
- return RemoveResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
@@ -91,35 +83,28 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&)
return errorResult;
}
-UpdateResult DownPersistence::update(const Bucket&, Timestamp,
- const DocumentUpdate::SP&, Context&)
+UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&)
{
- return UpdateResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
GetResult
-DownPersistence::get(const Bucket&, const document::FieldSet&,
- const DocumentId&, Context&) const
+DownPersistence::get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const
{
- return GetResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return GetResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
CreateIteratorResult
DownPersistence::createIterator(const Bucket&, const document::FieldSet&,
- const Selection&, IncludedVersions,
- Context&)
+ const Selection&, IncludedVersions, Context&)
{
- return CreateIteratorResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return CreateIteratorResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
IterateResult
DownPersistence::iterate(IteratorId, uint64_t, Context&) const
{
- return IterateResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return IterateResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
Result
@@ -144,8 +129,7 @@ DownPersistence::deleteBucket(const Bucket&, Context&)
BucketIdListResult
DownPersistence::getModifiedBuckets(BucketSpace) const
{
- return BucketIdListResult(errorResult.getErrorCode(),
- errorResult.getErrorMessage());
+ return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h
index 8cdac7aaa1b..2ae0605fdb6 100644
--- a/searchcore/src/apps/proton/downpersistence.h
+++ b/searchcore/src/apps/proton/downpersistence.h
@@ -32,7 +32,7 @@ public:
Result setClusterState(BucketSpace, const ClusterState&) override;
Result setActiveState(const Bucket&, BucketInfo::ActiveState) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
- Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) override;
+ Result put(const Bucket&, Timestamp, DocumentSP, Context&) override;
RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
Result removeEntry(const Bucket&, Timestamp, Context&) override;
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index 55d524519b8..e1785e1e48d 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -255,8 +255,7 @@ public:
storage::spi::Timestamp ts(0);
DbDocumentId dbdId(lid);
DbDocumentId prevDbdId(0);
- document::Document::SP xdoc(new document::Document(doc));
- PutOperation op(bucketId, ts, xdoc);
+ PutOperation op(bucketId, ts, std::make_shared<document::Document>(doc));
op.setSerialNum(serialNum);
op.setDbDocumentId(dbdId);
op.setPrevDbDocumentId(prevDbdId);
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 796ec4436fe..18235116d27 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -544,7 +544,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture)
TEST_F("require that outdated put is ignored", FeedHandlerFixture)
{
DocumentContext doc_context("id:ns:searchdocument::foo", *f.schema.builder);
- auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), doc_context.doc);
+ auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), std::move(doc_context.doc));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token), std::move(op));
@@ -673,7 +673,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF
f.writeFilter._message = "Attribute resource limit reached";
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
- auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc);
+ auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), std::move(docCtx.doc));
FeedTokenContext token;
f.handler.performOperation(std::move(token.token), std::move(op));
EXPECT_EQUAL(0, f.feedView.put_count);
@@ -801,7 +801,7 @@ TEST_F("require that put with different document type repo is ok", FeedHandlerFi
TwoFieldsSchemaContext schema;
DocumentContext doc_context("id:ns:searchdocument::foo", *schema.builder);
auto op = std::make_unique<PutOperation>(doc_context.bucketId,
- Timestamp(10), doc_context.doc);
+ Timestamp(10), std::move(doc_context.doc));
FeedTokenContext token_context;
EXPECT_EQUAL(schema.getRepo().get(), op->getDocument()->getRepo());
EXPECT_NOT_EQUAL(f.schema.getRepo().get(), op->getDocument()->getRepo());
diff --git a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp
index 5fffd70f11d..53fdcb7757f 100644
--- a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp
+++ b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp
@@ -190,7 +190,7 @@ TEST("require that toString() on derived classes are meaningful")
PutOperation().toString());
EXPECT_EQUAL("Put(id::::, BucketId(0x000000000000002a), timestamp=10, dbdId=(subDbId=0, lid=0), "
"prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)",
- PutOperation(bucket_id1, timestamp, doc).toString());
+ PutOperation(bucket_id1, timestamp, std::move(doc)).toString());
EXPECT_EQUAL("Remove(id::::, BucketId(0x0000000000000000), timestamp=0, dbdId=(subDbId=0, lid=0), "
"prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)",
diff --git a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
index 35590cc68f6..0c72d11899a 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
@@ -15,7 +15,7 @@ using HandlerSnapshot = PersistenceHandlerMap::HandlerSnapshot;
struct DummyPersistenceHandler : public IPersistenceHandler {
using SP = std::shared_ptr<DummyPersistenceHandler>;
void initialize() override {}
- void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::Document::SP &) override {}
+ void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, DocumentSP) override {}
void handleUpdate(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentUpdate::SP &) override {}
void handleRemove(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentId &) override {}
void handleListBuckets(IBucketIdListResultHandler &) override {}
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index 19d9d41c3e4..4fb4abcb7c5 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -9,7 +9,6 @@
#include <vespa/document/update/documentupdate.h>
#include <vespa/persistence/spi/documentselection.h>
#include <vespa/persistence/spi/test.h>
-#include <vespa/persistence/spi/test.h>
#include <vespa/searchcore/proton/persistenceengine/bucket_guard.h>
#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
@@ -199,7 +198,7 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer {
void initialize() override { initialized = true; }
void handlePut(FeedToken token, const Bucket& bucket,
- Timestamp timestamp, const document::Document::SP& doc) override {
+ Timestamp timestamp, DocumentSP doc) override {
token->setResult(ResultUP(new storage::spi::Result()), false);
handle(token, bucket, timestamp, doc->getId());
}
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
index b6223dd41ab..e0bc7cb551f 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
@@ -35,4 +35,8 @@ State::fail()
}
}
+OwningState::~OwningState() {
+ ack();
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index ab3fdea3345..cf6c1f5a7e9 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -9,42 +9,68 @@ namespace proton {
typedef std::unique_ptr<storage::spi::Result> ResultUP;
+namespace feedtoken {
+
/**
* This class is used by the FeedEngine to encapsulate the necessary information
* for an IFeedHandler to perform an async reply to an operation. A unique
* instance of this class is passed to every invokation of the IFeedHandler.
*/
-namespace feedtoken {
- class ITransport {
- public:
- virtual ~ITransport() { }
- virtual void send(ResultUP result, bool documentWasFound) = 0;
- };
-
- class State : public search::IDestructorCallback {
- public:
- State(const State &) = delete;
- State & operator = (const State &) = delete;
- State(ITransport & transport);
- ~State() override;
- void fail();
- void setResult(ResultUP result, bool documentWasFound) {
- _documentWasFound = documentWasFound;
- _result = std::move(result);
- }
- const storage::spi::Result &getResult() { return *_result; }
- private:
- void ack();
- ITransport &_transport;
- ResultUP _result;
- bool _documentWasFound;
- std::atomic<bool> _alreadySent;
- };
-
- inline std::shared_ptr<State>
- make(ITransport & latch) {
- return std::make_shared<State>(latch);
+class ITransport {
+public:
+ virtual ~ITransport() { }
+ virtual void send(ResultUP result, bool documentWasFound) = 0;
+};
+
+/**
+ * This holds the result of the feed operation until it is either failed or acked.
+ * Guarantees that the result is propagated back to the invoker via ITransport interface.
+ */
+class State : public search::IDestructorCallback {
+public:
+ State(const State &) = delete;
+ State & operator = (const State &) = delete;
+ State(ITransport & transport);
+ ~State() override;
+ void fail();
+ void setResult(ResultUP result, bool documentWasFound) {
+ _documentWasFound = documentWasFound;
+ _result = std::move(result);
}
+ const storage::spi::Result &getResult() { return *_result; }
+protected:
+ void ack();
+private:
+ ITransport &_transport;
+ ResultUP _result;
+ bool _documentWasFound;
+ std::atomic<bool> _alreadySent;
+};
+
+/**
+ * This takes ownership ov the transport object, so that it can be used fully asynchronous
+ * without invoker needing to hold any state.
+ */
+class OwningState : public State {
+public:
+ OwningState(std::unique_ptr<ITransport> transport)
+ : State(*transport),
+ _owned(std::move(transport))
+ {}
+ ~OwningState() override;
+private:
+ std::unique_ptr<ITransport> _owned;
+};
+
+inline std::shared_ptr<State>
+make(ITransport & latch) {
+ return std::make_shared<State>(latch);
+}
+inline std::shared_ptr<State>
+make(std::unique_ptr<ITransport> transport) {
+ return std::make_shared<OwningState>(std::move(transport));
+}
+
}
using FeedToken = std::shared_ptr<feedtoken::State>;
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp
index f1161c8ebdd..56224889b78 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp
@@ -26,7 +26,7 @@ DocumentOperation::DocumentOperation(Type type)
}
-DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const Timestamp &timestamp)
+DocumentOperation::DocumentOperation(Type type, BucketId bucketId, Timestamp timestamp)
: FeedOperation(type),
_bucketId(bucketId),
_timestamp(timestamp),
@@ -38,6 +38,8 @@ DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const
{
}
+DocumentOperation::~DocumentOperation() = default;
+
void
DocumentOperation::assertValidBucketId(const document::DocumentId &docId) const
{
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h
index 6847dbfd943..044d44b8276 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h
@@ -22,15 +22,14 @@ protected:
DocumentOperation(Type type);
- DocumentOperation(Type type, const document::BucketId &bucketId,
- const storage::spi::Timestamp &timestamp);
+ DocumentOperation(Type type, document::BucketId bucketId, storage::spi::Timestamp timestamp);
void assertValidBucketId(const document::DocumentId &docId) const;
void assertValidBucketId(const document::GlobalId &docId) const;
vespalib::string docArgsToString() const;
public:
- ~DocumentOperation() override {}
+ ~DocumentOperation() override;
const document::BucketId &getBucketId() const { return _bucketId; }
storage::spi::Timestamp getTimestamp() const { return _timestamp; }
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
index efac7297c67..dd001348093 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
@@ -17,16 +17,12 @@ PutOperation::PutOperation()
{ }
-PutOperation::PutOperation(const BucketId &bucketId,
- const Timestamp &timestamp,
- const Document::SP &doc)
- : DocumentOperation(FeedOperation::PUT,
- bucketId,
- timestamp),
- _doc(doc)
+PutOperation::PutOperation(BucketId bucketId, Timestamp timestamp, Document::SP doc)
+ : DocumentOperation(FeedOperation::PUT, bucketId, timestamp),
+ _doc(std::move(doc))
{ }
-PutOperation::~PutOperation() { }
+PutOperation::~PutOperation() = default;
void
PutOperation::serialize(vespalib::nbostream &os) const
@@ -40,8 +36,7 @@ PutOperation::serialize(vespalib::nbostream &os) const
void
-PutOperation::deserialize(vespalib::nbostream &is,
- const DocumentTypeRepo &repo)
+PutOperation::deserialize(vespalib::nbostream &is, const DocumentTypeRepo &repo)
{
DocumentOperation::deserialize(is, repo);
size_t oldSize = is.size();
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
index 33330692fab..5ca6d755e49 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
@@ -12,17 +12,16 @@ class PutOperation : public DocumentOperation
public:
PutOperation();
- PutOperation(const document::BucketId &bucketId,
- const storage::spi::Timestamp &timestamp,
- const DocumentSP &doc);
- virtual ~PutOperation();
+ PutOperation(document::BucketId bucketId,
+ storage::spi::Timestamp timestamp,
+ DocumentSP doc);
+ ~PutOperation() override;
const DocumentSP &getDocument() const { return _doc; }
void assertValid() const;
- virtual void serialize(vespalib::nbostream &os) const override;
- virtual void deserialize(vespalib::nbostream &is,
- const document::DocumentTypeRepo &repo) override;
+ void serialize(vespalib::nbostream &os) const override;
+ void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override;
void deserializeDocument(const document::DocumentTypeRepo &repo);
- virtual vespalib::string toString() const override;
+ vespalib::string toString() const override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
index 82a9c8174fa..c95f51b29dc 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
@@ -42,7 +42,7 @@ public:
virtual void initialize() = 0;
virtual void handlePut(FeedToken token, const storage::spi::Bucket &bucket,
- storage::spi::Timestamp timestamp, const DocumentSP &doc) = 0;
+ storage::spi::Timestamp timestamp, DocumentSP doc) = 0;
virtual void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket,
storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 2ede5d45f7e..48074d491c8 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -64,7 +64,7 @@ public:
if (result.hasError()) {
std::lock_guard<std::mutex> guard(_lock);
if (_result.hasError()) {
- _result = TransportLatch::mergeErrorResults(_result, result);
+ _result = TransportMerger::mergeErrorResults(_result, result);
} else {
_result = result;
}
@@ -319,34 +319,32 @@ PersistenceEngine::getBucketInfo(const Bucket& b) const
}
-Result
-PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::SP& doc, Context&)
+void
+PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::DocumentSP doc, Context &, OperationComplete::UP onComplete)
{
if (!_writeFilter.acceptWriteOperation()) {
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
if (!state.acceptWriteOperation()) {
- return Result(Result::ErrorType::RESOURCE_EXHAUSTED,
+ return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::RESOURCE_EXHAUSTED,
make_string("Put operation rejected for document '%s': '%s'",
- doc->getId().toString().c_str(), state.message().c_str()));
+ doc->getId().toString().c_str(), state.message().c_str())));
}
}
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
DocTypeName docType(doc->getType());
- LOG(spam, "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))", b.toString().c_str(), static_cast<uint64_t>(t.getValue()),
+ LOG(spam, "putAsync(%s, %" PRIu64 ", (\"%s\", \"%s\"))", bucket.toString().c_str(), static_cast<uint64_t>(ts.getValue()),
docType.toString().c_str(), doc->getId().toString().c_str());
if (!doc->getId().hasDocType()) {
- return Result(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str()));
+ return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())));
}
- IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
+ IPersistenceHandler * handler = getHandler(rguard, bucket.getBucketSpace(), docType);
if (!handler) {
- return Result(Result::ErrorType::PERMANENT_ERROR,
- make_string("No handler for document type '%s'", docType.toString().c_str()));
+ return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("No handler for document type '%s'", docType.toString().c_str())));
}
- TransportLatch latch(1);
- handler->handlePut(feedtoken::make(latch), b, t, doc);
- latch.await();
- return latch.getResult();
+ auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc));
}
PersistenceEngine::RemoveResult
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 5d3be07c532..d28a00b909e 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -38,6 +38,7 @@ private:
using Timestamp = storage::spi::Timestamp;
using TimestampList = storage::spi::TimestampList;
using UpdateResult = storage::spi::UpdateResult;
+ using OperationComplete = storage::spi::OperationComplete;
struct IteratorEntry {
PersistenceHandlerSequence handler_sequence;
@@ -100,7 +101,7 @@ public:
Result setClusterState(BucketSpace bucketSpace, const ClusterState& calc) override;
Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
- Result put(const Bucket&, Timestamp, const std::shared_ptr<document::Document>&, Context&) override;
+ void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override;
RemoveResult remove(const Bucket&, Timestamp, const document::DocumentId&, Context&) override;
UpdateResult update(const Bucket&, Timestamp,
const std::shared_ptr<document::DocumentUpdate>&, Context&) override;
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
index 4a5dfb0a2a5..64159841efe 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
@@ -8,10 +8,50 @@ using storage::spi::Result;
namespace proton {
+std::unique_ptr<std::mutex> createOptionalLock(bool needLocking) {
+ return needLocking
+ ? std::make_unique<std::mutex>()
+ : std::unique_ptr<std::mutex>();
+}
+TransportMerger::TransportMerger(bool needLocking)
+ : _result(),
+ _lock(createOptionalLock(needLocking))
+{
+}
+TransportMerger::~TransportMerger() = default;
+
+void
+TransportMerger::mergeResult(ResultUP result, bool documentWasFound) {
+ if (_lock) {
+ std::lock_guard<std::mutex> guard(*_lock);
+ mergeWithLock(std::move(result), documentWasFound);
+ } else {
+ mergeWithLock(std::move(result), documentWasFound);
+ }
+}
+
+void
+TransportMerger::mergeWithLock(ResultUP result, bool documentWasFound) {
+ if (!_result) {
+ _result = std::move(result);
+ } else if (result->hasError()) {
+ _result = std::make_unique<Result>(mergeErrorResults(*_result, *result));
+ } else if (documentWasFound) {
+ _result = std::move(result);
+ }
+ completeIfDone();
+}
+
+Result
+TransportMerger::mergeErrorResults(const Result &lhs, const Result &rhs)
+{
+ Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode();
+ return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str()));
+}
+
TransportLatch::TransportLatch(uint32_t cnt)
- : _latch(cnt),
- _lock(),
- _result()
+ : TransportMerger(cnt > 1),
+ _latch(cnt)
{
if (cnt == 0u) {
_result = std::make_unique<Result>();
@@ -23,24 +63,33 @@ TransportLatch::~TransportLatch() = default;
void
TransportLatch::send(ResultUP result, bool documentWasFound)
{
- {
- std::lock_guard<std::mutex> guard(_lock);
- if (!_result) {
- _result = std::move(result);
- } else if (result->hasError()) {
- _result.reset(new Result(mergeErrorResults(*_result, *result)));
- } else if (documentWasFound) {
- _result = std::move(result);
- }
- }
+ mergeResult(std::move(result), documentWasFound);
_latch.countDown();
}
-Result
-TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs)
+AsyncTranportContext::AsyncTranportContext(uint32_t cnt, OperationComplete::UP onComplete)
+ : TransportMerger(cnt > 1),
+ _countDown(cnt),
+ _onComplete(std::move(onComplete))
{
- Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode();
- return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str()));
+ if (cnt == 0u) {
+ _onComplete->onComplete(std::make_unique<Result>());
+ }
+}
+
+void
+AsyncTranportContext::completeIfDone() {
+ _countDown--;
+ if (_countDown == 0) {
+ _onComplete->onComplete(std::make_unique<Result>());
+ }
+}
+AsyncTranportContext::~AsyncTranportContext() = default;
+
+void
+AsyncTranportContext::send(ResultUP result, bool documentWasFound)
+{
+ mergeResult(std::move(result), documentWasFound);
}
} // proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
index 2fb55b14fdf..da4e19d3584 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
@@ -10,21 +10,38 @@
namespace proton {
/**
+ * Base implementation for merging results from multiple sources.
+ */
+
+class TransportMerger : public feedtoken::ITransport {
+public:
+ using Result = storage::spi::Result;
+ static Result mergeErrorResults(const Result &lhs, const Result &rhs);
+protected:
+ TransportMerger(bool needLocking);
+ ~TransportMerger() override;
+ void mergeResult(ResultUP result, bool documentWasFound);
+ virtual void completeIfDone() { } // Called with lock held if necessary on every merge
+ ResultUP _result;
+
+private:
+ void mergeWithLock(ResultUP result, bool documentWasFound);
+ std::unique_ptr<std::mutex> _lock;
+};
+
+/**
* Implementation of FeedToken::ITransport for handling the async reply for an operation.
* Uses an internal count down latch to keep track the number of outstanding replies.
*/
-class TransportLatch : public feedtoken::ITransport {
+class TransportLatch : public TransportMerger {
private:
- using Result = storage::spi::Result;
using UpdateResult = storage::spi::UpdateResult;
using RemoveResult = storage::spi::RemoveResult;
vespalib::CountDownLatch _latch;
- std::mutex _lock;
- ResultUP _result;
public:
TransportLatch(uint32_t cnt);
- ~TransportLatch();
+ ~TransportLatch() override;
void send(ResultUP result, bool documentWasFound) override;
void await() {
_latch.await();
@@ -38,7 +55,25 @@ public:
const RemoveResult &getRemoveResult() const {
return dynamic_cast<const RemoveResult &>(*_result);
}
- static Result mergeErrorResults(const Result &lhs, const Result &rhs);
+
+};
+
+/**
+ * Implementation of FeedToken::ITransport for async handling of the async reply for an operation.
+ * Uses an internal count to keep track the number of the outstanding replies.
+ */
+class AsyncTranportContext : public TransportMerger {
+private:
+ using Result = storage::spi::Result;
+ using OperationComplete = storage::spi::OperationComplete;
+
+ int _countDown;
+ OperationComplete::UP _onComplete;
+ void completeIfDone() override;
+public:
+ AsyncTranportContext(uint32_t cnt, OperationComplete::UP);
+ ~AsyncTranportContext() override;
+ void send(ResultUP result, bool documentWasFound) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index 362de7ee780..4a87ee0009c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -154,7 +154,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o
auto doc = make_shared<Document>(op.getUpdate()->getType(), op.getUpdate()->getId());
doc->setRepo(*_activeFeedView->getDocumentTypeRepo());
op.getUpdate()->applyTo(*doc);
- PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc);
+ PutOperation putOp(op.getBucketId(), op.getTimestamp(), std::move(doc));
_activeFeedView->preparePut(putOp);
storeOperation(putOp, token);
if (token) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
index 16af4e87795..ba0db2bb77d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
@@ -39,9 +39,9 @@ PersistenceHandlerProxy::initialize()
}
void
-PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentSP &doc)
+PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, DocumentSP doc)
{
- auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, doc);
+ auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, std::move(doc));
_feedHandler.handleOperation(token, std::move(op));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
index 3e3b3dd6fb6..5a82b46f91c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
@@ -25,7 +25,7 @@ public:
void initialize() override;
void handlePut(FeedToken token, const storage::spi::Bucket &bucket,
- storage::spi::Timestamp timestamp, const DocumentSP &doc) override;
+ storage::spi::Timestamp timestamp, DocumentSP doc) override;
void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket,
storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) override;