summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-05-05 14:06:33 +0200
committerGitHub <noreply@github.com>2020-05-05 14:06:33 +0200
commit02a7dbc6c9dfd39c96eb9d72df92f43592ada2bf (patch)
treeb3a3518fafac5736a20df51ea7e37b3fceeef648
parent4c479b47f3d615dc2c533b3c830c822e964b7a25 (diff)
parent11c7ca4c164bec4c0afbc2fec70b61ff8755ea7d (diff)
Merge pull request #13153 from vespa-engine/balder/async-put-and-remove
Balder/async put and remove
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def5
-rw-r--r--metrics/src/vespa/metrics/metricmanager.cpp2
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp39
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h1
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp40
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h7
-rw-r--r--persistence/src/vespa/persistence/spi/operationcomplete.h7
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp64
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h19
-rw-r--r--searchcore/src/apps/proton/downpersistence.cpp2
-rw-r--r--searchcore/src/apps/proton/downpersistence.h2
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp58
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h5
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp4
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h5
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp8
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp5
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp27
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h37
-rw-r--r--storage/src/tests/persistence/persistencethread_splittest.cpp2
-rw-r--r--storage/src/tests/persistence/processalltest.cpp14
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp34
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketinfo.h4
-rw-r--r--storage/src/vespa/storage/common/storagelink.cpp39
-rw-r--r--storage/src/vespa/storage/common/storagelink.h32
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp216
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h9
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp41
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h13
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp104
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h11
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp39
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h30
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp2
37 files changed, 628 insertions, 349 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 7816ee74fde..80ddc0931f4 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -26,6 +26,11 @@ disk_operation_timeout int default=0 restart
## Number of threads to use for each mountpoint.
num_threads int default=8 restart
+## Number of threads for response processing and delivery
+## 0 will give legacy sync behavior.
+## Negative number will choose a good number based on # cores.
+num_response_threads int default=0
+
## When merging, if we find more than this number of documents that exist on all
## of the same copies, send a separate apply bucket diff with these entries
## to an optimized merge chain that guarantuees minimum data transfer.
diff --git a/metrics/src/vespa/metrics/metricmanager.cpp b/metrics/src/vespa/metrics/metricmanager.cpp
index c330bf04d56..04f7568bfba 100644
--- a/metrics/src/vespa/metrics/metricmanager.cpp
+++ b/metrics/src/vespa/metrics/metricmanager.cpp
@@ -766,9 +766,9 @@ namespace {
void
MetricManager::run()
{
+ vespalib::MonitorGuard sync(_waiter);
while (!stopping()) {
time_t currentTime = _timer->getTime();
- vespalib::MonitorGuard sync(_waiter);
time_t next = tick(sync, currentTime);
if (currentTime < next) {
sync.wait((next - currentTime) * 1000);
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 984bbcd845f..5720a0ba662 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -4,6 +4,7 @@
#include <vespa/document/select/parser.h>
#include <vespa/document/base/documentid.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vespalib/util/crc.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
@@ -30,7 +31,7 @@ BucketContent::BucketContent()
_outdatedInfo(true),
_active(false)
{ }
-BucketContent::~BucketContent() { }
+BucketContent::~BucketContent() = default;
uint32_t
BucketContent::computeEntryChecksum(const BucketEntry& e) const
@@ -306,11 +307,10 @@ DummyPersistence::DummyPersistence(
_clusterState()
{}
-DummyPersistence::~DummyPersistence() {}
+DummyPersistence::~DummyPersistence() = default;
document::select::Node::UP
-DummyPersistence::parseDocumentSelection(const string& documentSelection,
- bool allowLeaf)
+DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf)
{
document::select::Node::UP ret;
try {
@@ -464,6 +464,37 @@ DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&)
return Result();
}
+UpdateResult
+DummyPersistence::update(const Bucket& bucket, Timestamp ts, DocumentUpdateSP upd, Context& context)
+{
+ GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context);
+
+ if (getResult.hasError()) {
+ return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage());
+ }
+
+ auto docToUpdate = getResult.getDocumentPtr();
+ Timestamp updatedTs = getResult.getTimestamp();
+ if (!docToUpdate) {
+ if (!upd->getCreateIfNonExistent()) {
+ return UpdateResult();
+ } else {
+ docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId());
+ updatedTs = ts;
+ }
+ }
+
+ upd->applyTo(*docToUpdate);
+
+ Result putResult = put(bucket, ts, std::move(docToUpdate), context);
+
+ if (putResult.hasError()) {
+ return UpdateResult(putResult.getErrorCode(), putResult.getErrorMessage());
+ }
+
+ return UpdateResult(updatedTs);
+}
+
RemoveResult
DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&)
{
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 27ed95bd6ee..94c2e1cd9a4 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -156,6 +156,7 @@ public:
Result put(const Bucket&, Timestamp, DocumentSP, Context&) override;
GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override;
RemoveResult remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) override;
+ UpdateResult update(const Bucket&, Timestamp, DocumentUpdateSP, Context&) override;
CreateIteratorResult createIterator(const Bucket&,
const document::FieldSet& fs,
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index e7abe137b89..45c2c7901f9 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -8,39 +8,6 @@
namespace storage::spi {
-UpdateResult
-AbstractPersistenceProvider::update(const Bucket& bucket, Timestamp ts,
- const DocumentUpdate::SP& upd, Context& context)
-{
- GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context);
-
- if (getResult.hasError()) {
- return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage());
- }
-
- auto docToUpdate = getResult.getDocumentPtr();
- Timestamp updatedTs = getResult.getTimestamp();
- if (!docToUpdate) {
- if (!upd->getCreateIfNonExistent()) {
- return UpdateResult();
- } else {
- docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId());
- updatedTs = ts;
- }
- }
-
- upd->applyTo(*docToUpdate);
-
- Result putResult = put(bucket, ts, std::move(docToUpdate), context);
-
- if (putResult.hasError()) {
- return UpdateResult(putResult.getErrorCode(),
- putResult.getErrorMessage());
- }
-
- return UpdateResult(updatedTs);
-}
-
RemoveResult
AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp,
const DocumentId& id, Context& context)
@@ -48,6 +15,13 @@ AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp,
return remove(b, timestamp, id, context);
}
+void
+AbstractPersistenceProvider::removeIfFoundAsync(const Bucket& b, Timestamp timestamp,
+ const DocumentId& id, Context& context, OperationComplete::UP onComplete)
+{
+ removeAsync(b, timestamp, id, context, std::move(onComplete));
+}
+
BucketIdListResult
AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const
{
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index e346fdaa3cb..813050222a9 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -20,12 +20,6 @@ public:
Result initialize() override { return Result(); };
/**
- * Updates the document by calling get(), updating the document,
- * then calling put() on the result.
- */
- UpdateResult update(const Bucket&, Timestamp, const DocumentUpdateSP&, Context&) override;
-
- /**
* Default impl empty.
*/
Result createBucket(const Bucket&, Context&) override { return Result(); }
@@ -39,6 +33,7 @@ public:
* Default impl is remove().
*/
RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override;
+ void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override;
/**
* Default impl empty.
diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h
index fa386e274f2..18a3c250e24 100644
--- a/persistence/src/vespa/persistence/spi/operationcomplete.h
+++ b/persistence/src/vespa/persistence/spi/operationcomplete.h
@@ -8,6 +8,12 @@ namespace storage::spi {
class Result;
+class ResultHandler {
+public:
+ virtual ~ResultHandler() = default;
+ virtual void handle(const Result &) const = 0;
+};
+
/**
* This is the callback interface when using the async operations
* in the persistence provider.
@@ -18,6 +24,7 @@ public:
using UP = std::unique_ptr<OperationComplete>;
virtual ~OperationComplete() = default;
virtual void onComplete(std::unique_ptr<Result> result) = 0;
+ virtual void addResultHandler(const ResultHandler * resultHandler) = 0;
};
} \ No newline at end of file
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index c60ac615644..38fcd2fd072 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -2,6 +2,7 @@
#include "persistenceprovider.h"
#include <future>
+#include <cassert>
namespace storage::spi {
@@ -9,14 +10,20 @@ PersistenceProvider::~PersistenceProvider() = default;
class CatchResult : public OperationComplete {
public:
+ CatchResult() : _promisedResult(), _resulthandler(nullptr) {}
std::future<Result::UP> future_result() {
- return promisedResult.get_future();
+ return _promisedResult.get_future();
}
void onComplete(Result::UP result) override {
- promisedResult.set_value(std::move(result));
+ _promisedResult.set_value(std::move(result));
+ }
+ void addResultHandler(const ResultHandler * resultHandler) override {
+ assert(_resulthandler == nullptr);
+ _resulthandler = resultHandler;
}
private:
- std::promise<Result::UP> promisedResult;
+ std::promise<Result::UP> _promisedResult;
+ const ResultHandler *_resulthandler;
};
Result
@@ -29,9 +36,58 @@ PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP d
void
PersistenceProvider::putAsync(const Bucket &bucket, Timestamp timestamp, DocumentSP doc, Context &context,
- OperationComplete::UP onComplete) {
+ OperationComplete::UP onComplete)
+{
Result result = put(bucket, timestamp, std::move(doc), context);
onComplete->onComplete(std::make_unique<Result>(result));
}
+RemoveResult
+PersistenceProvider::remove(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ removeAsync(bucket, timestamp, docId, context, std::move(catcher));
+ return dynamic_cast<const RemoveResult &>(*future.get());
+}
+
+void
+PersistenceProvider::removeAsync(const Bucket &bucket, Timestamp timestamp, const DocumentId & docId, Context &context,
+ OperationComplete::UP onComplete)
+{
+ RemoveResult result = remove(bucket, timestamp, docId, context);
+ onComplete->onComplete(std::make_unique<RemoveResult>(result));
+}
+
+RemoveResult
+PersistenceProvider::removeIfFound(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ removeIfFoundAsync(bucket, timestamp, docId, context, std::move(catcher));
+ return dynamic_cast<const RemoveResult &>(*future.get());
+}
+
+void
+PersistenceProvider::removeIfFoundAsync(const Bucket &bucket, Timestamp timestamp, const DocumentId & docId, Context &context,
+ OperationComplete::UP onComplete)
+{
+ RemoveResult result = removeIfFound(bucket, timestamp, docId, context);
+ onComplete->onComplete(std::make_unique<RemoveResult>(result));
+}
+
+UpdateResult
+PersistenceProvider::update(const Bucket& bucket, Timestamp timestamp, DocumentUpdateSP upd, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ updateAsync(bucket, timestamp, std::move(upd), context, std::move(catcher));
+ return dynamic_cast<const UpdateResult &>(*future.get());
+}
+
+void
+PersistenceProvider::updateAsync(const Bucket &bucket, Timestamp timestamp, DocumentUpdateSP upd, Context &context,
+ OperationComplete::UP onComplete)
+{
+ UpdateResult result = update(bucket, timestamp, std::move(upd), context);
+ onComplete->onComplete(std::make_unique<UpdateResult>(result));
+}
+
}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 338fa6e03b0..2bb91c776d0 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -109,6 +109,8 @@ struct PersistenceProvider
/**
* Store the given document at the given microsecond time.
+ * An implementation must always implement atleast put or putAsync.
+ * If not an eternal recursion will occur.
*/
virtual Result put(const Bucket&, Timestamp, DocumentSP, Context&);
virtual void putAsync(const Bucket &, Timestamp , DocumentSP , Context &, OperationComplete::UP );
@@ -166,10 +168,15 @@ struct PersistenceProvider
* For such a provider, iterating with removes and all versions should
* semantically be the same thing and yield the same results.
*
+ * An implementation must always implement atleast remove or removeAsync.
+ * If not an eternal recursion will occur.
+ *
* @param timestamp The timestamp for the new bucket entry.
* @param id The ID to remove
*/
- virtual RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) = 0;
+ virtual RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&);
+ virtual void removeAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP);
+
/**
* @see remove()
* <p/>
@@ -180,11 +187,14 @@ struct PersistenceProvider
* resend removes. It is legal to still store a remove entry, but note that
* you will then be prone to user patterns mentioned above to fill up your
* buckets.
+ * An implementation must always implement atleast removeIfFound or removeIfFoundAsync.
+ * If not an eternal recursion will occur.
* <p/>
* @param timestamp The timestamp for the new bucket entry.
* @param id The ID to remove
*/
- virtual RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) = 0;
+ virtual RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&);
+ virtual void removeIfFoundAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP);
/**
* Remove any trace of the entry with the given timestamp. (Be it a document
@@ -197,11 +207,14 @@ struct PersistenceProvider
/**
* Partially modifies a document referenced by the document update.
+ * An implementation must always implement atleast update or updateAsync.
+ * If not an eternal recursion will occur.
*
* @param timestamp The timestamp to use for the new update entry.
* @param update The document update to apply to the stored document.
*/
- virtual UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) = 0;
+ virtual UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&);
+ virtual void updateAsync(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&, OperationComplete::UP);
/**
* Retrieves the latest version of the document specified by the
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp
index 8d8665b645e..aa87c383c33 100644
--- a/searchcore/src/apps/proton/downpersistence.cpp
+++ b/searchcore/src/apps/proton/downpersistence.cpp
@@ -83,7 +83,7 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&)
return errorResult;
}
-UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&)
+UpdateResult DownPersistence::update(const Bucket&, Timestamp, DocumentUpdate::SP, Context&)
{
return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h
index 2716b0ee3b4..10e3d9c1ad7 100644
--- a/searchcore/src/apps/proton/downpersistence.h
+++ b/searchcore/src/apps/proton/downpersistence.h
@@ -36,7 +36,7 @@ public:
RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
Result removeEntry(const Bucket&, Timestamp, Context&) override;
- UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override;
+ UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&) override;
GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override;
CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet,
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index 4fb4abcb7c5..2927457a6a5 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -458,12 +458,12 @@ TEST_F("require that puts are routed to handler", SimpleFixture)
storage::spi::LoadType loadType(0, "default");
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
f.engine.put(bucket1, tstamp1, doc1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
f.engine.put(bucket1, tstamp1, doc2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"),
f.engine.put(bucket1, tstamp1, doc3, context));
@@ -490,14 +490,14 @@ TEST_F("require that updates are routed to handler", SimpleFixture)
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
f.hset.handler1.setExistingTimestamp(tstamp2);
UpdateResult ur = f.engine.update(bucket1, tstamp1, upd1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_EQUAL(tstamp2, ur.getExistingTimestamp());
f.hset.handler2.setExistingTimestamp(tstamp3);
ur = f.engine.update(bucket1, tstamp1, upd2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_EQUAL(tstamp3, ur.getExistingTimestamp());
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"),
@@ -533,31 +533,31 @@ TEST_F("require that removes are routed to handlers", SimpleFixture)
storage::spi::LoadType loadType(0, "default");
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
RemoveResult rr = f.engine.remove(bucket1, tstamp1, docId3, context);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_FALSE(rr.wasFound());
EXPECT_TRUE(rr.hasError());
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), rr);
f.hset.handler1.setExistingTimestamp(tstamp2);
rr = f.engine.remove(bucket1, tstamp1, docId1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_TRUE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
f.hset.handler1.setExistingTimestamp(tstamp0);
f.hset.handler2.setExistingTimestamp(tstamp3);
rr = f.engine.remove(bucket1, tstamp1, docId2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_TRUE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
f.hset.handler2.setExistingTimestamp(tstamp0);
rr = f.engine.remove(bucket1, tstamp1, docId2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_FALSE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 8344996e298..91ccac6fce1 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -347,57 +347,55 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do
handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc));
}
-PersistenceEngine::RemoveResult
-PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&)
+void
+PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context&, OperationComplete::UP onComplete)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
LOG(spam, "remove(%s, %" PRIu64 ", \"%s\")", b.toString().c_str(),
static_cast<uint64_t>(t.getValue()), did.toString().c_str());
if (!did.hasDocType()) {
- return RemoveResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", did.toString().c_str()));
+ return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Old id scheme not supported in elastic mode (%s)", did.toString().c_str())));
}
DocTypeName docType(did.getDocType());
IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
if (!handler) {
- return RemoveResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("No handler for document type '%s'", docType.toString().c_str()));
+ return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("No handler for document type '%s'", docType.toString().c_str())));
}
- TransportLatch latch(1);
- handler->handleRemove(feedtoken::make(latch), b, t, did);
- latch.await();
- return latch.getRemoveResult();
+ auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did);
}
-PersistenceEngine::UpdateResult
-PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP& upd, Context&)
+void
+PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP upd, Context&, OperationComplete::UP onComplete)
{
if (!_writeFilter.acceptWriteOperation()) {
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
if (!state.acceptWriteOperation()) {
- return UpdateResult(Result::ErrorType::RESOURCE_EXHAUSTED,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED,
make_string("Update operation rejected for document '%s': '%s'",
- upd->getId().toString().c_str(), state.message().c_str()));
+ upd->getId().toString().c_str(), state.message().c_str())));
}
}
try {
upd->eagerDeserialize();
} catch (document::FieldNotFoundException & e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'",
- upd->getId().toString().c_str(), upd->getType().getName().c_str()));
+ upd->getId().toString().c_str(), upd->getType().getName().c_str())));
} catch (document::DocumentTypeNotFoundException & e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s'.",
- upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()));
+ upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())));
} catch (document::WrongTensorTypeException &e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'",
upd->getId().toString().c_str(),
upd->getType().getName().c_str(),
- e.getMessage().c_str()));
+ e.getMessage().c_str())));
}
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
DocTypeName docType(upd->getType());
@@ -405,24 +403,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(),
upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false"));
if (!upd->getId().hasDocType()) {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()));
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())));
}
if (upd->getId().getDocType() != docType.getName()) {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()));
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())));
}
IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
- if (handler) {
- TransportLatch latch(1);
- LOG(debug, "update = %s", upd->toXml().c_str());
- handler->handleUpdate(feedtoken::make(latch), b, t, upd);
- latch.await();
- return latch.getUpdateResult();
- } else {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()));
+ if (handler == nullptr) {
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())));
}
+ auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd));
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 43239e95a96..230f8c411aa 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -102,9 +102,8 @@ public:
Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override;
- RemoveResult remove(const Bucket&, Timestamp, const document::DocumentId&, Context&) override;
- UpdateResult update(const Bucket&, Timestamp,
- const std::shared_ptr<document::DocumentUpdate>&, Context&) override;
+ void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override;
+ void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override;
GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override;
CreateIteratorResult createIterator(const Bucket&, const document::FieldSet&, const Selection&,
IncludedVersions, Context&) override;
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index 862d1fb758a..13a254e7a76 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -124,12 +124,12 @@ PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket,
spi::UpdateResult
PersistenceProviderWrapper::update(const spi::Bucket& bucket,
spi::Timestamp timestamp,
- const document::DocumentUpdate::SP& upd,
+ document::DocumentUpdate::SP upd,
spi::Context& context)
{
LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")");
CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE);
- return _spi.update(bucket, timestamp, upd, context);
+ return _spi.update(bucket, timestamp, std::move(upd), context);
}
spi::GetResult
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 25365e64bfc..9ca08becabe 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -91,9 +91,8 @@ public:
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
- spi::GetResult get(const spi::Bucket&, const document::FieldSet&,
- const spi::DocumentId&, spi::Context&) const override ;
+ spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
+ spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index fe966d0bbb2..bf0083311fb 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -201,12 +201,8 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config,
uint16_t deviceIndex)
{
(void) config;
- std::unique_ptr<DiskThread> disk;
- disk.reset(new PersistenceThread(
- node.getComponentRegister(), config.getConfigId(), provider,
- filestorHandler, metrics,
- deviceIndex));
- return disk;
+ return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(),
+ provider, filestorHandler, metrics, deviceIndex);
}
namespace {
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 035da326d48..262906a4baf 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -149,11 +149,6 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils {
std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd;
};
- MessageTracker::UP
- createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
- return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd));
- }
-
std::string
doTestSPIException(MergeHandler& handler,
PersistenceProviderWrapper& providerWrapper,
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 4ac9dfd7765..cf8bf71708d 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -12,6 +12,8 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
+#include <thread>
using document::DocumentType;
using storage::spi::test::makeSpiBucket;
@@ -54,9 +56,9 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const
{
_node.setupDummyPersistence();
_metrics.initDiskMetrics(numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1);
- _handler.reset(new FileStorHandler(_messageKeeper, _metrics,
+ _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics,
_node.getPersistenceProvider().getPartitionStates().getList(),
- _node.getComponentRegister()));
+ _node.getComponentRegister());
for (uint32_t i = 0; i < numDisks; i++) {
_diskEnvs.push_back(
std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler,
@@ -64,6 +66,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const
}
}
+PersistenceTestEnvironment::~PersistenceTestEnvironment() {
+ _handler->close();
+ while (!_handler->closed(0)) {
+ std::this_thread::sleep_for(1ms);
+ }
+}
+
PersistenceTestUtils::PersistenceTestUtils() = default;
PersistenceTestUtils::~PersistenceTestUtils() = default;
@@ -74,15 +83,21 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) {
void
PersistenceTestUtils::setupDisks(uint32_t numDisks) {
- _env.reset(new PersistenceTestEnvironment(DiskCount(numDisks), "todo-make-unique-persistencetestutils"));
+ _env = std::make_unique<PersistenceTestEnvironment>(DiskCount(numDisks), "todo-make-unique-persistencetestutils");
+ setupExecutor(numDisks);
+}
+
+void
+PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
+ _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE);
}
std::unique_ptr<PersistenceThread>
PersistenceTestUtils::createPersistenceThread(uint32_t disk)
{
- return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(),
- getPersistenceProvider(), getEnv()._fileStorHandler,
- getEnv()._metrics, disk);
+ return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(),
+ _env->_config.getConfigId(),getPersistenceProvider(),
+ getEnv()._fileStorHandler, getEnv()._metrics, disk);
}
document::Document::SP
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index 3121bef61e5..cdd08d42565 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -10,6 +10,7 @@
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
+#include <vespa/storage/storageserver/communicationmanager.h>
#include <vespa/document/base/testdocman.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -24,6 +25,7 @@ struct MessageKeeper : public MessageSender {
struct PersistenceTestEnvironment {
PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot);
+ ~PersistenceTestEnvironment();
document::TestDocMan _testDocMan;
vdstestlib::DirConfig _config;
@@ -54,7 +56,22 @@ public:
document::Bucket _bucket;
};
+ struct ReplySender : public MessageSender {
+ void sendCommand(const std::shared_ptr<api::StorageCommand> &) override {
+ abort();
+ }
+
+ void sendReply(const std::shared_ptr<api::StorageReply> & ptr) override {
+ queue.enqueue(std::move(ptr));
+ }
+
+ Queue queue;
+ };
+
std::unique_ptr<PersistenceTestEnvironment> _env;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor;
+ ReplySender _replySender;
+
PersistenceTestUtils();
virtual ~PersistenceTestUtils();
@@ -67,8 +84,13 @@ public:
uint32_t maxSize = 128);
void setupDisks(uint32_t disks);
+ void setupExecutor(uint32_t numThreads);
void TearDown() override {
+ if (_sequenceTaskExecutor) {
+ _sequenceTaskExecutor->sync();
+ _sequenceTaskExecutor.reset();
+ }
_env.reset();
}
@@ -90,6 +112,21 @@ public:
spi::PersistenceProvider& getPersistenceProvider();
+ MessageTracker::UP
+ createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
+ return MessageTracker::createForTesting(getEnv(), _replySender, NoBucketLock::make(bucket), std::move(cmd));
+ }
+
+ api::ReturnCode
+ fetchResult(const MessageTracker::UP & tracker) {
+ if (tracker) {
+ return tracker->getResult();
+ }
+ std::shared_ptr<api::StorageMessage> msg;
+ _replySender.queue.getNext(msg, 60000);
+ return dynamic_cast<api::StorageReply &>(*msg).getResult();
+ }
+
/**
Performs a put to the given disk.
Returns the document that was inserted.
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp
index 1ec6a35fb1d..3d7fc70db6a 100644
--- a/storage/src/tests/persistence/persistencethread_splittest.cpp
+++ b/storage/src/tests/persistence/persistencethread_splittest.cpp
@@ -214,7 +214,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
cmd->setMinByteSize(maxSize);
cmd->setMinDocCount(maxCount);
cmd->setSourceIndex(0);
- MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd));
+ MessageTracker::UP result = thread->handleSplitBucket(*cmd, createTracker(cmd, docBucket));
api::ReturnCode code(result->getResult());
EXPECT_EQ(error, code);
if (!code.success()) {
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index 0d482ebe5b7..5174b733334 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -23,7 +23,7 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
+ auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n",
@@ -47,7 +47,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket);
- auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
+ auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
@@ -74,7 +74,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty
auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception);
+ ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
dumpBucket(bucketId));
@@ -88,7 +88,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio
auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket);
ProcessAllHandler handler(getEnv(), getPersistenceProvider());
- ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception);
+ ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception);
EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n",
dumpBucket(bucketId));
@@ -107,7 +107,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0");
- MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
@@ -141,7 +141,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries)
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true");
- MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
@@ -187,7 +187,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_
document::Bucket bucket = makeDocumentBucket(bucketId);
auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true");
- MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd));
+ MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket));
ASSERT_TRUE(tracker->hasReply());
auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply());
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index 864ab320527..578a90081c7 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -40,18 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
: context(spi::LoadType(0, "default"), 0, 0)
{}
- MessageTracker::UP
- createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) {
- return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd));
- }
-
void SetUp() override {
SingleDiskPersistenceTestUtils::SetUp();
createBucket(BUCKET_ID);
- getPersistenceProvider().createBucket(
- makeSpiBucket(BUCKET_ID),
- context);
+ getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context);
thread = createPersistenceThread(0);
testDoc = createTestDocument();
@@ -59,7 +52,8 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
}
void TearDown() override {
- thread.reset(nullptr);
+ thread->flush();
+ thread.reset();
SingleDiskPersistenceTestUtils::TearDown();
}
@@ -91,7 +85,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) {
auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
setTestCondition(*putTwo);
- ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
}
@@ -111,7 +105,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) {
auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo);
setTestCondition(*putTwo);
- ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -131,7 +125,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) {
auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
setTestCondition(*remove);
- ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -151,7 +145,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) {
auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo);
setTestCondition(*remove);
- ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY),
dumpBucket(BUCKET_ID));
@@ -177,7 +171,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -190,7 +184,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -202,7 +196,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -211,7 +205,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
@@ -223,7 +217,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) {
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
put->setCondition(documentapi::TestAndSetCondition("bjarne"));
- ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
+ ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -235,7 +229,7 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) {
setTestCondition(*put);
thread->handlePut(*put, createTracker(put, BUCKET));
- ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -279,7 +273,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta
}
auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp);
- thread->handlePut(*put, createTracker(put, BUCKET));
+ fetchResult(thread->handlePut(*put, createTracker(put, BUCKET)));
}
void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value)
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h
index c5257d59a3e..2a3d9aa8f91 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h
+++ b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h
@@ -4,8 +4,7 @@
#include <vespa/storageapi/buckets/bucketinfo.h>
-namespace storage {
-namespace bucketdb {
+namespace storage::bucketdb {
struct StorageBucketInfo {
api::BucketInfo info;
@@ -34,4 +33,3 @@ struct StorageBucketInfo {
std::ostream& operator<<(std::ostream& out, const StorageBucketInfo& info);
}
-}
diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp
index 431c90b27f2..d4615ee2df5 100644
--- a/storage/src/vespa/storage/common/storagelink.cpp
+++ b/storage/src/vespa/storage/common/storagelink.cpp
@@ -240,4 +240,43 @@ std::ostream& operator<<(std::ostream& out, StorageLink& link) {
return out;
}
+Queue::Queue() = default;
+Queue::~Queue() = default;
+
+bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) {
+ vespalib::MonitorGuard sync(_queueMonitor);
+ bool first = true;
+ while (true) { // Max twice
+ if (!_queue.empty()) {
+ LOG(spam, "Picking message from queue");
+ msg = std::move(_queue.front());
+ _queue.pop();
+ return true;
+ }
+ if (timeout == 0 || !first) {
+ return false;
+ }
+ sync.wait(timeout);
+ first = false;
+ }
+
+ return false;
+}
+
+void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) {
+ vespalib::MonitorGuard sync(_queueMonitor);
+ _queue.emplace(std::move(msg));
+ sync.unsafeSignalUnlock();
+}
+
+void Queue::signal() {
+ vespalib::MonitorGuard sync(_queueMonitor);
+ sync.unsafeSignalUnlock();
+}
+
+size_t Queue::size() const {
+ vespalib::MonitorGuard sync(_queueMonitor);
+ return _queue.size();
+}
+
}
diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h
index f1f679a1e26..d6df7bf8afb 100644
--- a/storage/src/vespa/storage/common/storagelink.h
+++ b/storage/src/vespa/storage/common/storagelink.h
@@ -23,7 +23,9 @@
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/document/util/printable.h>
+#include <vespa/vespalib/util/sync.h>
#include <atomic>
+#include <queue>
namespace storage {
@@ -182,6 +184,36 @@ private:
friend struct StorageLinkTest;
};
+class Queue {
+private:
+ using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>;
+ QueueType _queue;
+ vespalib::Monitor _queueMonitor;
+
+public:
+ Queue();
+ ~Queue();
+
+ /**
+ * Returns the next event from the event queue
+ * @param msg The next event
+ * @param timeout Millisecs to wait if the queue is empty
+ * (0 = don't wait, -1 = forever)
+ * @return true or false if the queue was empty.
+ */
+ bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout);
+
+ /**
+ * Enqueue msg in FIFO order.
+ */
+ void enqueue(std::shared_ptr<api::StorageMessage> msg);
+
+ /** Signal queue monitor. */
+ void signal();
+
+ size_t size() const;
+};
+
std::ostream& operator<<(std::ostream& out, StorageLink& link);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index db3d865faca..eec8ac3e327 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -15,6 +15,7 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".persistence.filestor.manager");
@@ -88,6 +89,13 @@ FileStorManager::print(std::ostream& out, bool verbose, const std::string& inden
out << "FileStorManager";
}
+namespace {
+
+uint32_t computeNumResponseThreads(int configured) {
+ return (configured < 0) ? std::max(1u, std::thread::hardware_concurrency()/4) : configured;
+}
+
+}
/**
* If live configuration, assuming storageserver makes sure no messages are
* incoming during reconfiguration
@@ -109,12 +117,16 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads);
_filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _partitions, _compReg);
+ uint32_t numResposeThreads = computeNumResponseThreads(_config->numResponseThreads);
+ if (numResposeThreads > 0) {
+ _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResposeThreads, 10000, vespalib::Executor::OptimizeFor::ADAPTIVE);
+ }
for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
if (_partitions[i].isUp()) {
LOG(spam, "Setting up disk %u", i);
for (uint32_t j = 0; j < numThreads; j++) {
- _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i));
+ _disks[i].push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider,
+ *_filestorHandler, *_metrics->disks[i]->threads[j], i));
}
} else {
_filestorHandler->disable(i);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 433b9ddbd39..452b83bb794 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -12,6 +12,7 @@
#include "filestormetrics.h"
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/vespalib/util/sync.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
@@ -67,6 +68,7 @@ class FileStorManager : public StorageLinkQueued,
bool _failDiskOnError;
std::shared_ptr<FileStorMetrics> _metrics;
std::unique_ptr<FileStorHandler> _filestorHandler;
+ std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor;
mutable vespalib::Monitor _threadMonitor; // Notify to stop sleeping
bool _closed;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 8eac55445f9..c53367c16f6 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -10,13 +10,87 @@
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".persistence.thread");
namespace storage {
-PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
+namespace {
+
+class ResultTask : public vespalib::Executor::Task {
+public:
+ ResultTask() : _result(), _resultHandler(nullptr) { }
+ void setResult(spi::Result::UP result) {
+ _result = std::move(result);
+ }
+ void addResultHandler(const spi::ResultHandler * resultHandler) {
+ // Only handles a signal handler now,
+ // Can be extended if necessary later on
+ assert(_resultHandler == nullptr);
+ _resultHandler = resultHandler;
+ }
+ void handle(const spi::Result &result ) {
+ if (_resultHandler != nullptr) {
+ _resultHandler->handle(result);
+ }
+ }
+protected:
+ spi::Result::UP _result;
+private:
+ const spi::ResultHandler * _resultHandler;
+};
+
+template<class FunctionType>
+class LambdaResultTask : public ResultTask {
+public:
+ LambdaResultTask(FunctionType && func)
+ : _func(std::move(func))
+ {}
+ ~LambdaResultTask() override {}
+ void run() override {
+ handle(*_result);
+ _func(std::move(_result));
+ }
+private:
+ FunctionType _func;
+};
+
+template <class FunctionType>
+std::unique_ptr<ResultTask>
+makeResultTask(FunctionType &&function)
+{
+ return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+}
+
+class ResultTaskOperationDone : public spi::OperationComplete {
+public:
+ ResultTaskOperationDone(vespalib::ISequencedTaskExecutor & executor, document::BucketId bucketId,
+ std::unique_ptr<ResultTask> task)
+ : _executor(executor),
+ _task(std::move(task)),
+ _executorId(executor.getExecutorId(bucketId.getId()))
+ {
+ }
+ void onComplete(spi::Result::UP result) override {
+ _task->setResult(std::move(result));
+ _executor.executeTask(_executorId, std::move(_task));
+ }
+ void addResultHandler(const spi::ResultHandler * resultHandler) override {
+ _task->addResultHandler(resultHandler);
+ }
+private:
+ vespalib::ISequencedTaskExecutor & _executor;
+ std::unique_ptr<ResultTask> _task;
+ vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
+};
+
+}
+
+PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequencedExecutor,
+ ServiceLayerComponentRegister& compReg,
const config::ConfigUri & configUri,
spi::PersistenceProvider& provider,
FileStorHandler& filestorHandler,
@@ -24,7 +98,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
uint16_t deviceIndex)
: _stripeId(filestorHandler.getNextStripeId(deviceIndex)),
_env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider),
- _warnOnSlowOperations(5000),
+ _sequencedExecutor(sequencedExecutor),
_spi(provider),
_processAllHandler(_env, provider),
_mergeHandler(_spi, _env),
@@ -91,63 +165,106 @@ PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, Messa
}
MessageTracker::UP
-PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP tracker)
+PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP)
{
+ MessageTracker & tracker = *trackerUP;
auto& metrics = _env._metrics.put[cmd.getLoadType()];
- tracker->setMetric(metrics);
+ tracker.setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) {
- return tracker;
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) {
+ return trackerUP;
}
- spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker->context());
- tracker->checkForError(response);
- return tracker;
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
+ if (_sequencedExecutor == nullptr) {
+ spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context());
+ tracker.checkForError(response);
+ } else {
+ auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->sendReply();
+ });
+ _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ }
+ return trackerUP;
}
MessageTracker::UP
-PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker)
+PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP)
{
+ MessageTracker & tracker = *trackerUP;
auto& metrics = _env._metrics.remove[cmd.getLoadType()];
- tracker->setMetric(metrics);
+ tracker.setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) {
- return tracker;
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) {
+ return trackerUP;
}
- spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker->context());
- if (tracker->checkForError(response)) {
- tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- }
- if (!response.wasFound()) {
- _env._metrics.remove[cmd.getLoadType()].notFound.inc();
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
+ if (_sequencedExecutor == nullptr) {
+ spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context());
+ if (tracker.checkForError(response)) {
+ tracker.setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
+ }
+ if (!response.wasFound()) {
+ metrics.notFound.inc();
+ }
+ } else {
+ // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
+ auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
+ }
+ if (!response.wasFound()) {
+ metrics.notFound.inc();
+ }
+ tracker->sendReply();
+ });
+ _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
- return tracker;
+ return trackerUP;
}
MessageTracker::UP
-PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker)
+PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP)
{
+ MessageTracker & tracker = *trackerUP;
auto& metrics = _env._metrics.update[cmd.getLoadType()];
- tracker->setMetric(metrics);
+ tracker.setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) {
- return tracker;
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) {
+ return trackerUP;
}
-
- spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context());
- if (tracker->checkForError(response)) {
- auto reply = std::make_shared<api::UpdateReply>(cmd);
- reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(std::move(reply));
+
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
+ if (_sequencedExecutor == nullptr) {
+ spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context());
+ if (tracker.checkForError(response)) {
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
+ reply->setOldTimestamp(response.getExistingTimestamp());
+ tracker.setReply(std::move(reply));
+ }
+ } else {
+ // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker
+ auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ const spi::UpdateResult & response = dynamic_cast<const spi::UpdateResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
+ reply->setOldTimestamp(response.getExistingTimestamp());
+ tracker->setReply(std::move(reply));
+ }
+ tracker->sendReply();
+ });
+ _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
- return tracker;
+ return trackerUP;
}
namespace {
@@ -770,37 +887,12 @@ PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP t
} else {
auto & initiatingCommand = static_cast<api::StorageCommand&>(msg);
try {
- int64_t startTime(_component->getClock().getTimeInMillis().getTime());
-
LOG(debug, "Handling command: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- tracker = handleCommandSplitByType(initiatingCommand, std::move(tracker));
- if (!tracker) {
- LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str());
- } else {
- tracker->generateReply(initiatingCommand);
- if ((tracker->hasReply()
- && tracker->getReply().getResult().failed())
- || tracker->getResult().failed())
- {
- _env._metrics.failedOperations.inc();
- }
- }
-
- int64_t stopTime(_component->getClock().getTimeInMillis().getTime());
- if (stopTime - startTime >= _warnOnSlowOperations) {
- LOGBT(warning, msg.getType().toString(),
- "Slow processing of message %s on disk %u. Processing time: %" PRId64 " ms (>=%d ms)",
- msg.toString().c_str(), _env._partition, stopTime - startTime, _warnOnSlowOperations);
- } else {
- LOGBT(spam, msg.getType().toString(), "Processing time of message %s on disk %u: %" PRId64 " ms",
- msg.toString(true).c_str(), _env._partition, stopTime - startTime);
- }
-
- return tracker;
+ return handleCommandSplitByType(initiatingCommand, std::move(tracker));
} catch (std::exception& e) {
LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
- api::StorageReply::SP reply(initiatingCommand.makeReply().release());
+ api::StorageReply::SP reply(initiatingCommand.makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what()));
_env._fileStorHandler.sendReply(reply);
}
@@ -814,7 +906,7 @@ PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) {
LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get());
api::StorageMessage & msg(*lock.second);
- auto tracker = std::make_unique<MessageTracker>(_env, std::move(lock.first), std::move(lock.second));
+ auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), std::move(lock.second));
tracker = processMessage(msg, std::move(tracker));
if (tracker) {
tracker->sendReply();
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index fcecc963658..32387bc6826 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -10,6 +10,7 @@
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storage/common/statusmessages.h>
+#include <vespa/vespalib/util/isequencedtaskexecutor.h>
namespace storage {
@@ -19,9 +20,9 @@ class TestAndSetHelper;
class PersistenceThread final : public DiskThread, public Types
{
public:
- PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
- spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics, uint16_t deviceIndex);
+ PersistenceThread(vespalib::ISequencedTaskExecutor *, ServiceLayerComponentRegister&,
+ const config::ConfigUri & configUri, spi::PersistenceProvider& provider,
+ FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex);
~PersistenceThread() override;
/** Waits for current operation to be finished. */
@@ -48,7 +49,7 @@ public:
private:
uint32_t _stripeId;
PersistenceUtil _env;
- uint32_t _warnOnSlowOperations;
+ vespalib::ISequencedTaskExecutor * _sequencedExecutor;
spi::PersistenceProvider& _spi;
ProcessAllHandler _processAllHandler;
MergeHandler _mergeHandler;
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 9095b351f64..72d6a1bb8d3 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -29,23 +29,37 @@ namespace {
(id == api::MessageType::REMOVELOCATION_ID ||
id == api::MessageType::JOINBUCKETS_ID));
}
-
+ const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s;
}
MessageTracker::MessageTracker(PersistenceUtil & env,
+ MessageSender & replySender,
+ FileStorHandler::BucketLockInterface::SP bucketLock,
+ api::StorageMessage::SP msg)
+ : MessageTracker(env, replySender, true, std::move(bucketLock), std::move(msg))
+{}
+MessageTracker::MessageTracker(PersistenceUtil & env,
+ MessageSender & replySender,
+ bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
api::StorageMessage::SP msg)
: _sendReply(true),
- _updateBucketInfo(hasBucketInfo(msg->getType().getId())),
+ _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
_bucketLock(std::move(bucketLock)),
_msg(std::move(msg)),
_context(_msg->getLoadType(), _msg->getPriority(), _msg->getTrace().getLevel()),
_env(env),
+ _replySender(replySender),
_metric(nullptr),
_result(api::ReturnCode::OK),
_timer(_env._component.getClock())
{ }
+MessageTracker::UP
+MessageTracker::createForTesting(PersistenceUtil &env, MessageSender &replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) {
+ return MessageTracker::UP(new MessageTracker(env, replySender, false, std::move(bucketLock), std::move(msg)));
+}
+
void
MessageTracker::setMetric(FileStorThreadMetrics::Op& metric) {
metric.count.inc();
@@ -56,6 +70,21 @@ MessageTracker::~MessageTracker() = default;
void
MessageTracker::sendReply() {
+ if ( ! _msg->getType().isReply()) {
+ generateReply(static_cast<api::StorageCommand &>(*_msg));
+ }
+ if ((hasReply() && getReply().getResult().failed()) || getResult().failed()) {
+ _env._metrics.failedOperations.inc();
+ }
+ vespalib::duration duration = vespalib::from_s(_timer.getElapsedTimeAsDouble()/1000.0);
+ if (duration >= WARN_ON_SLOW_OPERATIONS) {
+ LOGBT(warning, _msg->getType().toString(),
+ "Slow processing of message %s on disk %u. Processing time: %1.1f s (>=%1.1f s)",
+ _msg->toString().c_str(), _env._partition, vespalib::to_s(duration), vespalib::to_s(WARN_ON_SLOW_OPERATIONS));
+ } else {
+ LOGBT(spam, _msg->getType().toString(), "Processing time of message %s on disk %u: %1.1f s",
+ _msg->toString(true).c_str(), _env._partition, vespalib::to_s(duration));
+ }
if (hasReply()) {
if ( ! _context.getTrace().getRoot().isEmpty()) {
getReply().getTrace().getRoot().addChild(_context.getTrace().getRoot());
@@ -70,7 +99,7 @@ MessageTracker::sendReply() {
}
LOG(spam, "Sending reply up: %s %" PRIu64,
getReply().toString().c_str(), getReply().getMsgId());
- _env._fileStorHandler.sendReply(std::move(_reply));
+ _replySender.sendReply(std::move(_reply));
} else {
if ( ! _context.getTrace().getRoot().isEmpty()) {
_msg->getTrace().getRoot().addChild(_context.getTrace().getRoot());
@@ -105,8 +134,8 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
return;
}
- if (!_reply.get()) {
- _reply.reset(cmd.makeReply().release());
+ if (!_reply) {
+ _reply = cmd.makeReply();
_reply->setResult(_result);
}
@@ -138,7 +167,7 @@ PersistenceUtil::PersistenceUtil(
{
}
-PersistenceUtil::~PersistenceUtil() { }
+PersistenceUtil::~PersistenceUtil() = default;
void
PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i)
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 126fa6ca17a..a57ef186b46 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -19,7 +19,8 @@ class MessageTracker : protected Types {
public:
typedef std::unique_ptr<MessageTracker> UP;
- MessageTracker(PersistenceUtil & env, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
+ MessageTracker(PersistenceUtil & env, MessageSender & replySender,
+ FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
~MessageTracker();
@@ -62,18 +63,28 @@ public:
api::ReturnCode getResult() const { return _result; }
spi::Context & context() { return _context; }
+ document::BucketId getBucketId() const {
+ return _bucketLock->getBucket().getBucketId();
+ }
void sendReply();
bool checkForError(const spi::Result& response);
+ static MessageTracker::UP
+ createForTesting(PersistenceUtil & env, MessageSender & replySender,
+ FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
+
private:
+ MessageTracker(PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
+ FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg);
bool _sendReply;
bool _updateBucketInfo;
FileStorHandler::BucketLockInterface::SP _bucketLock;
api::StorageMessage::SP _msg;
spi::Context _context;
PersistenceUtil &_env;
+ MessageSender &_replySender;
FileStorThreadMetrics::Op *_metric;
api::StorageReply::SP _reply;
api::ReturnCode _result;
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index a86edd359fc..a5564282d17 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -9,12 +9,17 @@ template <typename ResultType>
ResultType
ProviderErrorWrapper::checkResult(ResultType&& result) const
{
+ handle(result);
+ return std::forward<ResultType>(result);
+}
+
+void
+ProviderErrorWrapper::handle(const spi::Result & result) const {
if (result.getErrorCode() == spi::Result::ErrorType::FATAL_ERROR) {
trigger_shutdown_listeners(result.getErrorMessage());
} else if (result.getErrorCode() == spi::Result::ErrorType::RESOURCE_EXHAUSTED) {
trigger_resource_exhaustion_listeners(result.getErrorMessage());
}
- return std::forward<ResultType>(result);
}
void ProviderErrorWrapper::trigger_shutdown_listeners(vespalib::stringref reason) const {
@@ -61,8 +66,7 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste
}
spi::Result
-ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket,
- spi::BucketInfo::ActiveState newState)
+ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState)
{
return checkResult(_impl.setActiveState(bucket, newState));
}
@@ -80,76 +84,59 @@ ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::Doc
}
spi::RemoveResult
-ProviderErrorWrapper::remove(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const document::DocumentId& docId,
- spi::Context& context)
+ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context)
{
return checkResult(_impl.remove(bucket, ts, docId, context));
}
spi::RemoveResult
-ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const document::DocumentId& docId,
- spi::Context& context)
+ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts,
+ const document::DocumentId& docId, spi::Context& context)
{
return checkResult(_impl.removeIfFound(bucket, ts, docId, context));
}
spi::UpdateResult
-ProviderErrorWrapper::update(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const spi::DocumentUpdateSP& docUpdate,
- spi::Context& context)
+ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts,
+ spi::DocumentUpdateSP docUpdate, spi::Context& context)
{
- return checkResult(_impl.update(bucket, ts, docUpdate, context));
+ return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context));
}
spi::GetResult
-ProviderErrorWrapper::get(const spi::Bucket& bucket,
- const document::FieldSet& fieldSet,
- const document::DocumentId& docId,
- spi::Context& context) const
+ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
+ const document::DocumentId& docId, spi::Context& context) const
{
return checkResult(_impl.get(bucket, fieldSet, docId, context));
}
spi::CreateIteratorResult
-ProviderErrorWrapper::createIterator(const spi::Bucket& bucket,
- const document::FieldSet& fieldSet,
- const spi::Selection& selection,
- spi::IncludedVersions versions,
- spi::Context& context)
+ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
+ const spi::Selection& selection, spi::IncludedVersions versions, spi::Context& context)
{
return checkResult(_impl.createIterator(bucket, fieldSet, selection, versions, context));
}
spi::IterateResult
-ProviderErrorWrapper::iterate(spi::IteratorId iteratorId,
- uint64_t maxByteSize,
- spi::Context& context) const
+ProviderErrorWrapper::iterate(spi::IteratorId iteratorId, uint64_t maxByteSize, spi::Context& context) const
{
return checkResult(_impl.iterate(iteratorId, maxByteSize, context));
}
spi::Result
-ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId,
- spi::Context& context)
+ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& context)
{
return checkResult(_impl.destroyIterator(iteratorId, context));
}
spi::Result
-ProviderErrorWrapper::createBucket(const spi::Bucket& bucket,
- spi::Context& context)
+ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
{
return checkResult(_impl.createBucket(bucket, context));
}
spi::Result
-ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket,
- spi::Context& context)
+ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context)
{
return checkResult(_impl.deleteBucket(bucket, context));
}
@@ -161,34 +148,61 @@ ProviderErrorWrapper::getModifiedBuckets(BucketSpace bucketSpace) const
}
spi::Result
-ProviderErrorWrapper::split(const spi::Bucket& source,
- const spi::Bucket& target1,
- const spi::Bucket& target2,
- spi::Context& context)
+ProviderErrorWrapper::split(const spi::Bucket& source, const spi::Bucket& target1,
+ const spi::Bucket& target2, spi::Context& context)
{
return checkResult(_impl.split(source, target1, target2, context));
}
spi::Result
-ProviderErrorWrapper::join(const spi::Bucket& source1,
- const spi::Bucket& source2,
- const spi::Bucket& target, spi::Context& context)
+ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source2,
+ const spi::Bucket& target, spi::Context& context)
{
return checkResult(_impl.join(source1, source2, target, context));
}
spi::Result
-ProviderErrorWrapper::move(const spi::Bucket& source,
- spi::PartitionId target, spi::Context& context)
+ProviderErrorWrapper::move(const spi::Bucket& source, spi::PartitionId target, spi::Context& context)
{
return checkResult(_impl.move(source, target, context));
}
spi::Result
-ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket,
- spi::Timestamp ts, spi::Context& context)
+ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, spi::Context& context)
{
return checkResult(_impl.removeEntry(bucket, ts, context));
}
+void
+ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc,
+ spi::Context &context, spi::OperationComplete::UP onComplete)
+{
+ onComplete->addResultHandler(this);
+ _impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete));
+}
+
+void
+ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId,
+ spi::Context & context, spi::OperationComplete::UP onComplete)
+{
+ onComplete->addResultHandler(this);
+ _impl.removeAsync(bucket, ts, docId, context, std::move(onComplete));
+}
+
+void
+ProviderErrorWrapper::removeIfFoundAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId,
+ spi::Context & context, spi::OperationComplete::UP onComplete)
+{
+ onComplete->addResultHandler(this);
+ _impl.removeIfFoundAsync(bucket, ts, docId, context, std::move(onComplete));
+}
+
+void
+ProviderErrorWrapper::updateAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentUpdateSP upd,
+ spi::Context &context, spi::OperationComplete::UP onComplete)
+{
+ onComplete->addResultHandler(this);
+ _impl.updateAsync(bucket, ts, std::move(upd), context, std::move(onComplete));
+}
+
} // ns storage
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 23da566afee..602877e0b02 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -33,7 +33,7 @@ public:
}
};
-class ProviderErrorWrapper : public spi::PersistenceProvider {
+class ProviderErrorWrapper : public spi::PersistenceProvider, public spi::ResultHandler {
public:
explicit ProviderErrorWrapper(spi::PersistenceProvider& impl)
: _impl(impl),
@@ -50,7 +50,7 @@ public:
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
+ spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
@@ -72,9 +72,16 @@ public:
}
void register_error_listener(std::shared_ptr<ProviderErrorListener> listener);
+
+ void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override;
+ void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
+
private:
template <typename ResultType>
ResultType checkResult(ResultType&& result) const;
+ void handle(const spi::Result &) const override;
void trigger_shutdown_listeners(vespalib::stringref reason) const;
void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const;
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index fa2b0cda018..a0b75b7602d 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -28,45 +28,6 @@ using document::FixedBucketSpaces;
namespace storage {
-Queue::Queue() = default;
-Queue::~Queue() = default;
-
-bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) {
- vespalib::MonitorGuard sync(_queueMonitor);
- bool first = true;
- while (true) { // Max twice
- if (!_queue.empty()) {
- LOG(spam, "Picking message from queue");
- msg = std::move(_queue.front());
- _queue.pop();
- return true;
- }
- if (timeout == 0 || !first) {
- return false;
- }
- sync.wait(timeout);
- first = false;
- }
-
- return false;
-}
-
-void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) {
- vespalib::MonitorGuard sync(_queueMonitor);
- _queue.emplace(std::move(msg));
- sync.unsafeSignalUnlock();
-}
-
-void Queue::signal() {
- vespalib::MonitorGuard sync(_queueMonitor);
- sync.unsafeSignalUnlock();
-}
-
-size_t Queue::size() const {
- vespalib::MonitorGuard sync(_queueMonitor);
- return _queue.size();
-}
-
StorageTransportContext::StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg)
: _docAPIMsg(std::move(msg))
{ }
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index c08ad214768..1d64c8a8911 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -44,36 +44,6 @@ class VisitorThread;
class FNetListener;
class RPCRequestWrapper;
-class Queue {
-private:
- using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>;
- QueueType _queue;
- vespalib::Monitor _queueMonitor;
-
-public:
- Queue();
- ~Queue();
-
- /**
- * Returns the next event from the event queue
- * @param msg The next event
- * @param timeout Millisecs to wait if the queue is empty
- * (0 = don't wait, -1 = forever)
- * @return true or false if the queue was empty.
- */
- bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout);
-
- /**
- * Enqueue msg in FIFO order.
- */
- void enqueue(std::shared_ptr<api::StorageMessage> msg);
-
- /** Signal queue monitor. */
- void signal();
-
- size_t size() const;
-};
-
class StorageTransportContext : public api::TransportContext {
public:
StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg);
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp
index 5efd638ec26..2e90e1ae3ee 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp
@@ -23,6 +23,6 @@ TestComponentRegister::TestComponentRegister(ComponentRegisterImpl::UP compReg)
// register status pages without a server
}
-TestComponentRegister::~TestComponentRegister() {}
+TestComponentRegister::~TestComponentRegister() = default;
}