diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-05 14:06:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-05 14:06:33 +0200 |
commit | 02a7dbc6c9dfd39c96eb9d72df92f43592ada2bf (patch) | |
tree | b3a3518fafac5736a20df51ea7e37b3fceeef648 | |
parent | 4c479b47f3d615dc2c533b3c830c822e964b7a25 (diff) | |
parent | 11c7ca4c164bec4c0afbc2fec70b61ff8755ea7d (diff) |
Merge pull request #13153 from vespa-engine/balder/async-put-and-remove
Balder/async put and remove
37 files changed, 628 insertions, 349 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 7816ee74fde..80ddc0931f4 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -26,6 +26,11 @@ disk_operation_timeout int default=0 restart ## Number of threads to use for each mountpoint. num_threads int default=8 restart +## Number of threads for response processing and delivery +## 0 will give legacy sync behavior. +## Negative number will choose a good number based on # cores. +num_response_threads int default=0 + ## When merging, if we find more than this number of documents that exist on all ## of the same copies, send a separate apply bucket diff with these entries ## to an optimized merge chain that guarantuees minimum data transfer. diff --git a/metrics/src/vespa/metrics/metricmanager.cpp b/metrics/src/vespa/metrics/metricmanager.cpp index c330bf04d56..04f7568bfba 100644 --- a/metrics/src/vespa/metrics/metricmanager.cpp +++ b/metrics/src/vespa/metrics/metricmanager.cpp @@ -766,9 +766,9 @@ namespace { void MetricManager::run() { + vespalib::MonitorGuard sync(_waiter); while (!stopping()) { time_t currentTime = _timer->getTime(); - vespalib::MonitorGuard sync(_waiter); time_t next = tick(sync, currentTime); if (currentTime < next) { sync.wait((next - currentTime) * 1000); diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 984bbcd845f..5720a0ba662 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -4,6 +4,7 @@ #include <vespa/document/select/parser.h> #include <vespa/document/base/documentid.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/util/crc.h> #include <vespa/document/fieldset/fieldsetrepo.h> @@ -30,7 +31,7 @@ BucketContent::BucketContent() _outdatedInfo(true), _active(false) { } -BucketContent::~BucketContent() { } +BucketContent::~BucketContent() = default; uint32_t BucketContent::computeEntryChecksum(const BucketEntry& e) const @@ -306,11 +307,10 @@ DummyPersistence::DummyPersistence( _clusterState() {} -DummyPersistence::~DummyPersistence() {} +DummyPersistence::~DummyPersistence() = default; document::select::Node::UP -DummyPersistence::parseDocumentSelection(const string& documentSelection, - bool allowLeaf) +DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf) { document::select::Node::UP ret; try { @@ -464,6 +464,37 @@ DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&) return Result(); } +UpdateResult +DummyPersistence::update(const Bucket& bucket, Timestamp ts, DocumentUpdateSP upd, Context& context) +{ + GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context); + + if (getResult.hasError()) { + return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage()); + } + + auto docToUpdate = getResult.getDocumentPtr(); + Timestamp updatedTs = getResult.getTimestamp(); + if (!docToUpdate) { + if (!upd->getCreateIfNonExistent()) { + return UpdateResult(); + } else { + docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId()); + updatedTs = ts; + } + } + + upd->applyTo(*docToUpdate); + + Result putResult = put(bucket, ts, std::move(docToUpdate), context); + + if (putResult.hasError()) { + return UpdateResult(putResult.getErrorCode(), putResult.getErrorMessage()); + } + + return UpdateResult(updatedTs); +} + RemoveResult DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 27ed95bd6ee..94c2e1cd9a4 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -156,6 +156,7 @@ public: Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override; RemoveResult remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) override; + UpdateResult update(const Bucket&, Timestamp, DocumentUpdateSP, Context&) override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fs, diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index e7abe137b89..45c2c7901f9 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -8,39 +8,6 @@ namespace storage::spi { -UpdateResult -AbstractPersistenceProvider::update(const Bucket& bucket, Timestamp ts, - const DocumentUpdate::SP& upd, Context& context) -{ - GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context); - - if (getResult.hasError()) { - return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage()); - } - - auto docToUpdate = getResult.getDocumentPtr(); - Timestamp updatedTs = getResult.getTimestamp(); - if (!docToUpdate) { - if (!upd->getCreateIfNonExistent()) { - return UpdateResult(); - } else { - docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId()); - updatedTs = ts; - } - } - - upd->applyTo(*docToUpdate); - - Result putResult = put(bucket, ts, std::move(docToUpdate), context); - - if (putResult.hasError()) { - return UpdateResult(putResult.getErrorCode(), - putResult.getErrorMessage()); - } - - return UpdateResult(updatedTs); -} - RemoveResult AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp, const DocumentId& id, Context& context) @@ -48,6 +15,13 @@ AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp, return remove(b, timestamp, id, context); } +void +AbstractPersistenceProvider::removeIfFoundAsync(const Bucket& b, Timestamp timestamp, + const DocumentId& id, Context& context, OperationComplete::UP onComplete) +{ + removeAsync(b, timestamp, id, context, std::move(onComplete)); +} + BucketIdListResult AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const { diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index e346fdaa3cb..813050222a9 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -20,12 +20,6 @@ public: Result initialize() override { return Result(); }; /** - * Updates the document by calling get(), updating the document, - * then calling put() on the result. - */ - UpdateResult update(const Bucket&, Timestamp, const DocumentUpdateSP&, Context&) override; - - /** * Default impl empty. */ Result createBucket(const Bucket&, Context&) override { return Result(); } @@ -39,6 +33,7 @@ public: * Default impl is remove(). */ RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override; + void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; /** * Default impl empty. diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h index fa386e274f2..18a3c250e24 100644 --- a/persistence/src/vespa/persistence/spi/operationcomplete.h +++ b/persistence/src/vespa/persistence/spi/operationcomplete.h @@ -8,6 +8,12 @@ namespace storage::spi { class Result; +class ResultHandler { +public: + virtual ~ResultHandler() = default; + virtual void handle(const Result &) const = 0; +}; + /** * This is the callback interface when using the async operations * in the persistence provider. @@ -18,6 +24,7 @@ public: using UP = std::unique_ptr<OperationComplete>; virtual ~OperationComplete() = default; virtual void onComplete(std::unique_ptr<Result> result) = 0; + virtual void addResultHandler(const ResultHandler * resultHandler) = 0; }; }
\ No newline at end of file diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index c60ac615644..38fcd2fd072 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -2,6 +2,7 @@ #include "persistenceprovider.h" #include <future> +#include <cassert> namespace storage::spi { @@ -9,14 +10,20 @@ PersistenceProvider::~PersistenceProvider() = default; class CatchResult : public OperationComplete { public: + CatchResult() : _promisedResult(), _resulthandler(nullptr) {} std::future<Result::UP> future_result() { - return promisedResult.get_future(); + return _promisedResult.get_future(); } void onComplete(Result::UP result) override { - promisedResult.set_value(std::move(result)); + _promisedResult.set_value(std::move(result)); + } + void addResultHandler(const ResultHandler * resultHandler) override { + assert(_resulthandler == nullptr); + _resulthandler = resultHandler; } private: - std::promise<Result::UP> promisedResult; + std::promise<Result::UP> _promisedResult; + const ResultHandler *_resulthandler; }; Result @@ -29,9 +36,58 @@ PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP d void PersistenceProvider::putAsync(const Bucket &bucket, Timestamp timestamp, DocumentSP doc, Context &context, - OperationComplete::UP onComplete) { + OperationComplete::UP onComplete) +{ Result result = put(bucket, timestamp, std::move(doc), context); onComplete->onComplete(std::make_unique<Result>(result)); } +RemoveResult +PersistenceProvider::remove(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + removeAsync(bucket, timestamp, docId, context, std::move(catcher)); + return dynamic_cast<const RemoveResult &>(*future.get()); +} + +void +PersistenceProvider::removeAsync(const Bucket &bucket, Timestamp timestamp, const DocumentId & docId, Context &context, + OperationComplete::UP onComplete) +{ + RemoveResult result = remove(bucket, timestamp, docId, context); + onComplete->onComplete(std::make_unique<RemoveResult>(result)); +} + +RemoveResult +PersistenceProvider::removeIfFound(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + removeIfFoundAsync(bucket, timestamp, docId, context, std::move(catcher)); + return dynamic_cast<const RemoveResult &>(*future.get()); +} + +void +PersistenceProvider::removeIfFoundAsync(const Bucket &bucket, Timestamp timestamp, const DocumentId & docId, Context &context, + OperationComplete::UP onComplete) +{ + RemoveResult result = removeIfFound(bucket, timestamp, docId, context); + onComplete->onComplete(std::make_unique<RemoveResult>(result)); +} + +UpdateResult +PersistenceProvider::update(const Bucket& bucket, Timestamp timestamp, DocumentUpdateSP upd, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + updateAsync(bucket, timestamp, std::move(upd), context, std::move(catcher)); + return dynamic_cast<const UpdateResult &>(*future.get()); +} + +void +PersistenceProvider::updateAsync(const Bucket &bucket, Timestamp timestamp, DocumentUpdateSP upd, Context &context, + OperationComplete::UP onComplete) +{ + UpdateResult result = update(bucket, timestamp, std::move(upd), context); + onComplete->onComplete(std::make_unique<UpdateResult>(result)); +} + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 338fa6e03b0..2bb91c776d0 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -109,6 +109,8 @@ struct PersistenceProvider /** * Store the given document at the given microsecond time. + * An implementation must always implement atleast put or putAsync. + * If not an eternal recursion will occur. */ virtual Result put(const Bucket&, Timestamp, DocumentSP, Context&); virtual void putAsync(const Bucket &, Timestamp , DocumentSP , Context &, OperationComplete::UP ); @@ -166,10 +168,15 @@ struct PersistenceProvider * For such a provider, iterating with removes and all versions should * semantically be the same thing and yield the same results. * + * An implementation must always implement atleast remove or removeAsync. + * If not an eternal recursion will occur. + * * @param timestamp The timestamp for the new bucket entry. * @param id The ID to remove */ - virtual RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) = 0; + virtual RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&); + virtual void removeAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP); + /** * @see remove() * <p/> @@ -180,11 +187,14 @@ struct PersistenceProvider * resend removes. It is legal to still store a remove entry, but note that * you will then be prone to user patterns mentioned above to fill up your * buckets. + * An implementation must always implement atleast removeIfFound or removeIfFoundAsync. + * If not an eternal recursion will occur. * <p/> * @param timestamp The timestamp for the new bucket entry. * @param id The ID to remove */ - virtual RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) = 0; + virtual RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&); + virtual void removeIfFoundAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP); /** * Remove any trace of the entry with the given timestamp. (Be it a document @@ -197,11 +207,14 @@ struct PersistenceProvider /** * Partially modifies a document referenced by the document update. + * An implementation must always implement atleast update or updateAsync. + * If not an eternal recursion will occur. * * @param timestamp The timestamp to use for the new update entry. * @param update The document update to apply to the stored document. */ - virtual UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) = 0; + virtual UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&); + virtual void updateAsync(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&, OperationComplete::UP); /** * Retrieves the latest version of the document specified by the diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp index 8d8665b645e..aa87c383c33 100644 --- a/searchcore/src/apps/proton/downpersistence.cpp +++ b/searchcore/src/apps/proton/downpersistence.cpp @@ -83,7 +83,7 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&) return errorResult; } -UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&) +UpdateResult DownPersistence::update(const Bucket&, Timestamp, DocumentUpdate::SP, Context&) { return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h index 2716b0ee3b4..10e3d9c1ad7 100644 --- a/searchcore/src/apps/proton/downpersistence.h +++ b/searchcore/src/apps/proton/downpersistence.h @@ -36,7 +36,7 @@ public: RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; Result removeEntry(const Bucket&, Timestamp, Context&) override; - UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override; + UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&) override; GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet, diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 4fb4abcb7c5..2927457a6a5 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -458,12 +458,12 @@ TEST_F("require that puts are routed to handler", SimpleFixture) storage::spi::LoadType loadType(0, "default"); Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); f.engine.put(bucket1, tstamp1, doc1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); f.engine.put(bucket1, tstamp1, doc2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), f.engine.put(bucket1, tstamp1, doc3, context)); @@ -490,14 +490,14 @@ TEST_F("require that updates are routed to handler", SimpleFixture) Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); f.hset.handler1.setExistingTimestamp(tstamp2); UpdateResult ur = f.engine.update(bucket1, tstamp1, upd1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_EQUAL(tstamp2, ur.getExistingTimestamp()); f.hset.handler2.setExistingTimestamp(tstamp3); ur = f.engine.update(bucket1, tstamp1, upd2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_EQUAL(tstamp3, ur.getExistingTimestamp()); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), @@ -533,31 +533,31 @@ TEST_F("require that removes are routed to handlers", SimpleFixture) storage::spi::LoadType loadType(0, "default"); Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); RemoveResult rr = f.engine.remove(bucket1, tstamp1, docId3, context); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_FALSE(rr.wasFound()); EXPECT_TRUE(rr.hasError()); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), rr); f.hset.handler1.setExistingTimestamp(tstamp2); rr = f.engine.remove(bucket1, tstamp1, docId1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_TRUE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); f.hset.handler1.setExistingTimestamp(tstamp0); f.hset.handler2.setExistingTimestamp(tstamp3); rr = f.engine.remove(bucket1, tstamp1, docId2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_TRUE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); f.hset.handler2.setExistingTimestamp(tstamp0); rr = f.engine.remove(bucket1, tstamp1, docId2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_FALSE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 8344996e298..91ccac6fce1 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -347,57 +347,55 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc)); } -PersistenceEngine::RemoveResult -PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) +void +PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context&, OperationComplete::UP onComplete) { std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); LOG(spam, "remove(%s, %" PRIu64 ", \"%s\")", b.toString().c_str(), static_cast<uint64_t>(t.getValue()), did.toString().c_str()); if (!did.hasDocType()) { - return RemoveResult(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", did.toString().c_str())); + return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("Old id scheme not supported in elastic mode (%s)", did.toString().c_str()))); } DocTypeName docType(did.getDocType()); IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); if (!handler) { - return RemoveResult(Result::ErrorType::PERMANENT_ERROR, - make_string("No handler for document type '%s'", docType.toString().c_str())); + return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("No handler for document type '%s'", docType.toString().c_str()))); } - TransportLatch latch(1); - handler->handleRemove(feedtoken::make(latch), b, t, did); - latch.await(); - return latch.getRemoveResult(); + auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did); } -PersistenceEngine::UpdateResult -PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP& upd, Context&) +void +PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP upd, Context&, OperationComplete::UP onComplete) { if (!_writeFilter.acceptWriteOperation()) { IResourceWriteFilter::State state = _writeFilter.getAcceptState(); if (!state.acceptWriteOperation()) { - return UpdateResult(Result::ErrorType::RESOURCE_EXHAUSTED, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED, make_string("Update operation rejected for document '%s': '%s'", - upd->getId().toString().c_str(), state.message().c_str())); + upd->getId().toString().c_str(), state.message().c_str()))); } } try { upd->eagerDeserialize(); } catch (document::FieldNotFoundException & e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'", - upd->getId().toString().c_str(), upd->getType().getName().c_str())); + upd->getId().toString().c_str(), upd->getType().getName().c_str()))); } catch (document::DocumentTypeNotFoundException & e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s'.", - upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())); + upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()))); } catch (document::WrongTensorTypeException &e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'", upd->getId().toString().c_str(), upd->getType().getName().c_str(), - e.getMessage().c_str())); + e.getMessage().c_str()))); } std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); DocTypeName docType(upd->getType()); @@ -405,24 +403,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(), upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false")); if (!upd->getId().hasDocType()) { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())); + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()))); } if (upd->getId().getDocType() != docType.getName()) { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, - make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())); + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()))); } IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); - if (handler) { - TransportLatch latch(1); - LOG(debug, "update = %s", upd->toXml().c_str()); - handler->handleUpdate(feedtoken::make(latch), b, t, upd); - latch.await(); - return latch.getUpdateResult(); - } else { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())); + if (handler == nullptr) { + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } + auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd)); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 43239e95a96..230f8c411aa 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -102,9 +102,8 @@ public: Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; BucketInfoResult getBucketInfo(const Bucket&) const override; void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; - RemoveResult remove(const Bucket&, Timestamp, const document::DocumentId&, Context&) override; - UpdateResult update(const Bucket&, Timestamp, - const std::shared_ptr<document::DocumentUpdate>&, Context&) override; + void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override; + void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override; GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet&, const Selection&, IncludedVersions, Context&) override; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 862d1fb758a..13a254e7a76 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -124,12 +124,12 @@ PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket, spi::UpdateResult PersistenceProviderWrapper::update(const spi::Bucket& bucket, spi::Timestamp timestamp, - const document::DocumentUpdate::SP& upd, + document::DocumentUpdate::SP upd, spi::Context& context) { LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")"); CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE); - return _spi.update(bucket, timestamp, upd, context); + return _spi.update(bucket, timestamp, std::move(upd), context); } spi::GetResult diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 25365e64bfc..9ca08becabe 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -91,9 +91,8 @@ public: spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; - spi::GetResult get(const spi::Bucket&, const document::FieldSet&, - const spi::DocumentId&, spi::Context&) const override ; + spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; + spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index fe966d0bbb2..bf0083311fb 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -201,12 +201,8 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, uint16_t deviceIndex) { (void) config; - std::unique_ptr<DiskThread> disk; - disk.reset(new PersistenceThread( - node.getComponentRegister(), config.getConfigId(), provider, - filestorHandler, metrics, - deviceIndex)); - return disk; + return std::make_unique<PersistenceThread>(nullptr,node.getComponentRegister(), config.getConfigId(), + provider, filestorHandler, metrics, deviceIndex); } namespace { diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 035da326d48..262906a4baf 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -149,11 +149,6 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; }; - MessageTracker::UP - createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { - return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); - } - std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 4ac9dfd7765..cf8bf71708d 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -12,6 +12,8 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <thread> using document::DocumentType; using storage::spi::test::makeSpiBucket; @@ -54,9 +56,9 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const { _node.setupDummyPersistence(); _metrics.initDiskMetrics(numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1); - _handler.reset(new FileStorHandler(_messageKeeper, _metrics, + _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, _node.getPersistenceProvider().getPartitionStates().getList(), - _node.getComponentRegister())); + _node.getComponentRegister()); for (uint32_t i = 0; i < numDisks; i++) { _diskEnvs.push_back( std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, @@ -64,6 +66,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const } } +PersistenceTestEnvironment::~PersistenceTestEnvironment() { + _handler->close(); + while (!_handler->closed(0)) { + std::this_thread::sleep_for(1ms); + } +} + PersistenceTestUtils::PersistenceTestUtils() = default; PersistenceTestUtils::~PersistenceTestUtils() = default; @@ -74,15 +83,21 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) { void PersistenceTestUtils::setupDisks(uint32_t numDisks) { - _env.reset(new PersistenceTestEnvironment(DiskCount(numDisks), "todo-make-unique-persistencetestutils")); + _env = std::make_unique<PersistenceTestEnvironment>(DiskCount(numDisks), "todo-make-unique-persistencetestutils"); + setupExecutor(numDisks); +} + +void +PersistenceTestUtils::setupExecutor(uint32_t numThreads) { + _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); } std::unique_ptr<PersistenceThread> PersistenceTestUtils::createPersistenceThread(uint32_t disk) { - return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(), - getPersistenceProvider(), getEnv()._fileStorHandler, - getEnv()._metrics, disk); + return std::make_unique<PersistenceThread>(_sequenceTaskExecutor.get(), _env->_node.getComponentRegister(), + _env->_config.getConfigId(),getPersistenceProvider(), + getEnv()._fileStorHandler, getEnv()._metrics, disk); } document::Document::SP diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index 3121bef61e5..cdd08d42565 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -10,6 +10,7 @@ #include <vespa/storage/common/storagecomponent.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> +#include <vespa/storage/storageserver/communicationmanager.h> #include <vespa/document/base/testdocman.h> #include <vespa/vespalib/gtest/gtest.h> @@ -24,6 +25,7 @@ struct MessageKeeper : public MessageSender { struct PersistenceTestEnvironment { PersistenceTestEnvironment(DiskCount numDisks, const std::string & rootOfRoot); + ~PersistenceTestEnvironment(); document::TestDocMan _testDocMan; vdstestlib::DirConfig _config; @@ -54,7 +56,22 @@ public: document::Bucket _bucket; }; + struct ReplySender : public MessageSender { + void sendCommand(const std::shared_ptr<api::StorageCommand> &) override { + abort(); + } + + void sendReply(const std::shared_ptr<api::StorageReply> & ptr) override { + queue.enqueue(std::move(ptr)); + } + + Queue queue; + }; + std::unique_ptr<PersistenceTestEnvironment> _env; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequenceTaskExecutor; + ReplySender _replySender; + PersistenceTestUtils(); virtual ~PersistenceTestUtils(); @@ -67,8 +84,13 @@ public: uint32_t maxSize = 128); void setupDisks(uint32_t disks); + void setupExecutor(uint32_t numThreads); void TearDown() override { + if (_sequenceTaskExecutor) { + _sequenceTaskExecutor->sync(); + _sequenceTaskExecutor.reset(); + } _env.reset(); } @@ -90,6 +112,21 @@ public: spi::PersistenceProvider& getPersistenceProvider(); + MessageTracker::UP + createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { + return MessageTracker::createForTesting(getEnv(), _replySender, NoBucketLock::make(bucket), std::move(cmd)); + } + + api::ReturnCode + fetchResult(const MessageTracker::UP & tracker) { + if (tracker) { + return tracker->getResult(); + } + std::shared_ptr<api::StorageMessage> msg; + _replySender.queue.getNext(msg, 60000); + return dynamic_cast<api::StorageReply &>(*msg).getResult(); + } + /** Performs a put to the given disk. Returns the document that was inserted. diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index 1ec6a35fb1d..3d7fc70db6a 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -214,7 +214,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) cmd->setMinByteSize(maxSize); cmd->setMinDocCount(maxCount); cmd->setSourceIndex(0); - MessageTracker::UP result = thread->handleSplitBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(docBucket), cmd)); + MessageTracker::UP result = thread->handleSplitBucket(*cmd, createTracker(cmd, docBucket)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 0d482ebe5b7..5174b733334 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -23,7 +23,7 @@ TEST_F(ProcessAllHandlerTest, remove_location) { document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("id.user == 4", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", @@ -47,7 +47,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); - auto tracker = handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" @@ -74,7 +74,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_ty auto cmd = std::make_shared<api::RemoveLocationCommand>("unknowndoctype.headerval % 2 == 0", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -88,7 +88,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_bogus_selectio auto cmd = std::make_shared<api::RemoveLocationCommand>("id.bogus != badgers", bucket); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); - ASSERT_THROW(handler.handleRemoveLocation(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)), std::exception); + ASSERT_THROW(handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)), std::exception); EXPECT_EQ("DocEntry(1234, 0, Doc(id:mail:testdoctype1:n=4:3619.html))\n", dumpBucket(bucketId)); @@ -107,7 +107,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_returns_document_metadata_matc document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "testdoctype1.headerval % 2 == 0"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -141,7 +141,7 @@ TEST_F(ProcessAllHandlerTest, stat_bucket_request_can_returned_removed_entries) document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); @@ -187,7 +187,7 @@ TEST_F(ProcessAllHandlerTest, bucket_stat_request_can_return_all_put_entries_in_ document::Bucket bucket = makeDocumentBucket(bucketId); auto cmd = std::make_shared<api::StatBucketCommand>(bucket, "true"); - MessageTracker::UP tracker = handler.handleStatBucket(*cmd, std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), cmd)); + MessageTracker::UP tracker = handler.handleStatBucket(*cmd, createTracker(cmd, bucket)); ASSERT_TRUE(tracker->hasReply()); auto& reply = dynamic_cast<api::StatBucketReply&>(tracker->getReply()); diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 864ab320527..578a90081c7 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -40,18 +40,11 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { : context(spi::LoadType(0, "default"), 0, 0) {} - MessageTracker::UP - createTracker(api::StorageMessage::SP cmd, document::Bucket bucket) { - return std::make_unique<MessageTracker>(getEnv(), NoBucketLock::make(bucket), std::move(cmd)); - } - void SetUp() override { SingleDiskPersistenceTestUtils::SetUp(); createBucket(BUCKET_ID); - getPersistenceProvider().createBucket( - makeSpiBucket(BUCKET_ID), - context); + getPersistenceProvider().createBucket(makeSpiBucket(BUCKET_ID),context); thread = createPersistenceThread(0); testDoc = createTestDocument(); @@ -59,7 +52,8 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { } void TearDown() override { - thread.reset(nullptr); + thread->flush(); + thread.reset(); SingleDiskPersistenceTestUtils::TearDown(); } @@ -91,7 +85,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -111,7 +105,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { auto putTwo = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestampTwo); setTestCondition(*putTwo); - ASSERT_EQ(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handlePut(*putTwo, createTracker(putTwo, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -131,7 +125,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -151,7 +145,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { auto remove = std::make_shared<api::RemoveCommand>(BUCKET, testDocId, timestampTwo); setTestCondition(*remove); - ASSERT_EQ(thread->handleRemove(*remove, createTracker(remove, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleRemove(*remove, createTracker(remove, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); @@ -177,7 +171,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -190,7 +184,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -202,7 +196,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -211,7 +205,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } @@ -223,7 +217,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); put->setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -235,7 +229,7 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { setTestCondition(*put); thread->handlePut(*put, createTracker(put, BUCKET)); - ASSERT_EQ(thread->handlePut(*put, createTracker(put, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -279,7 +273,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta } auto put = std::make_shared<api::PutCommand>(BUCKET, testDoc, timestamp); - thread->handlePut(*put, createTracker(put, BUCKET)); + fetchResult(thread->handlePut(*put, createTracker(put, BUCKET))); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) diff --git a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h index c5257d59a3e..2a3d9aa8f91 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/storagebucketinfo.h @@ -4,8 +4,7 @@ #include <vespa/storageapi/buckets/bucketinfo.h> -namespace storage { -namespace bucketdb { +namespace storage::bucketdb { struct StorageBucketInfo { api::BucketInfo info; @@ -34,4 +33,3 @@ struct StorageBucketInfo { std::ostream& operator<<(std::ostream& out, const StorageBucketInfo& info); } -} diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 431c90b27f2..d4615ee2df5 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -240,4 +240,43 @@ std::ostream& operator<<(std::ostream& out, StorageLink& link) { return out; } +Queue::Queue() = default; +Queue::~Queue() = default; + +bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) { + vespalib::MonitorGuard sync(_queueMonitor); + bool first = true; + while (true) { // Max twice + if (!_queue.empty()) { + LOG(spam, "Picking message from queue"); + msg = std::move(_queue.front()); + _queue.pop(); + return true; + } + if (timeout == 0 || !first) { + return false; + } + sync.wait(timeout); + first = false; + } + + return false; +} + +void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { + vespalib::MonitorGuard sync(_queueMonitor); + _queue.emplace(std::move(msg)); + sync.unsafeSignalUnlock(); +} + +void Queue::signal() { + vespalib::MonitorGuard sync(_queueMonitor); + sync.unsafeSignalUnlock(); +} + +size_t Queue::size() const { + vespalib::MonitorGuard sync(_queueMonitor); + return _queue.size(); +} + } diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index f1f679a1e26..d6df7bf8afb 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -23,7 +23,9 @@ #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/document/util/printable.h> +#include <vespa/vespalib/util/sync.h> #include <atomic> +#include <queue> namespace storage { @@ -182,6 +184,36 @@ private: friend struct StorageLinkTest; }; +class Queue { +private: + using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>; + QueueType _queue; + vespalib::Monitor _queueMonitor; + +public: + Queue(); + ~Queue(); + + /** + * Returns the next event from the event queue + * @param msg The next event + * @param timeout Millisecs to wait if the queue is empty + * (0 = don't wait, -1 = forever) + * @return true or false if the queue was empty. + */ + bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout); + + /** + * Enqueue msg in FIFO order. + */ + void enqueue(std::shared_ptr<api::StorageMessage> msg); + + /** Signal queue monitor. */ + void signal(); + + size_t size() const; +}; + std::ostream& operator<<(std::ostream& out, StorageLink& link); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index db3d865faca..eec8ac3e327 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -15,6 +15,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.filestor.manager"); @@ -88,6 +89,13 @@ FileStorManager::print(std::ostream& out, bool verbose, const std::string& inden out << "FileStorManager"; } +namespace { + +uint32_t computeNumResponseThreads(int configured) { + return (configured < 0) ? std::max(1u, std::thread::hardware_concurrency()/4) : configured; +} + +} /** * If live configuration, assuming storageserver makes sure no messages are * incoming during reconfiguration @@ -109,12 +117,16 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads); _filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _partitions, _compReg); + uint32_t numResposeThreads = computeNumResponseThreads(_config->numResponseThreads); + if (numResposeThreads > 0) { + _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResposeThreads, 10000, vespalib::Executor::OptimizeFor::ADAPTIVE); + } for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { LOG(spam, "Setting up disk %u", i); for (uint32_t j = 0; j < numThreads; j++) { - _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i)); + _disks[i].push_back(std::make_shared<PersistenceThread>(_sequencedExecutor.get(), _compReg, _configUri, *_provider, + *_filestorHandler, *_metrics->disks[i]->threads[j], i)); } } else { _filestorHandler->disable(i); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 433b9ddbd39..452b83bb794 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -12,6 +12,7 @@ #include "filestormetrics.h" #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/document/bucket/bucketid.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storage/bucketdb/storbucketdb.h> @@ -67,6 +68,7 @@ class FileStorManager : public StorageLinkQueued, bool _failDiskOnError; std::shared_ptr<FileStorMetrics> _metrics; std::unique_ptr<FileStorHandler> _filestorHandler; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor; mutable vespalib::Monitor _threadMonitor; // Notify to stop sleeping bool _closed; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 8eac55445f9..c53367c16f6 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -10,13 +10,87 @@ #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.thread"); namespace storage { -PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, +namespace { + +class ResultTask : public vespalib::Executor::Task { +public: + ResultTask() : _result(), _resultHandler(nullptr) { } + void setResult(spi::Result::UP result) { + _result = std::move(result); + } + void addResultHandler(const spi::ResultHandler * resultHandler) { + // Only handles a signal handler now, + // Can be extended if necessary later on + assert(_resultHandler == nullptr); + _resultHandler = resultHandler; + } + void handle(const spi::Result &result ) { + if (_resultHandler != nullptr) { + _resultHandler->handle(result); + } + } +protected: + spi::Result::UP _result; +private: + const spi::ResultHandler * _resultHandler; +}; + +template<class FunctionType> +class LambdaResultTask : public ResultTask { +public: + LambdaResultTask(FunctionType && func) + : _func(std::move(func)) + {} + ~LambdaResultTask() override {} + void run() override { + handle(*_result); + _func(std::move(_result)); + } +private: + FunctionType _func; +}; + +template <class FunctionType> +std::unique_ptr<ResultTask> +makeResultTask(FunctionType &&function) +{ + return std::make_unique<LambdaResultTask<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); +} + +class ResultTaskOperationDone : public spi::OperationComplete { +public: + ResultTaskOperationDone(vespalib::ISequencedTaskExecutor & executor, document::BucketId bucketId, + std::unique_ptr<ResultTask> task) + : _executor(executor), + _task(std::move(task)), + _executorId(executor.getExecutorId(bucketId.getId())) + { + } + void onComplete(spi::Result::UP result) override { + _task->setResult(std::move(result)); + _executor.executeTask(_executorId, std::move(_task)); + } + void addResultHandler(const spi::ResultHandler * resultHandler) override { + _task->addResultHandler(resultHandler); + } +private: + vespalib::ISequencedTaskExecutor & _executor; + std::unique_ptr<ResultTask> _task; + vespalib::ISequencedTaskExecutor::ExecutorId _executorId; +}; + +} + +PersistenceThread::PersistenceThread(vespalib::ISequencedTaskExecutor * sequencedExecutor, + ServiceLayerComponentRegister& compReg, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, @@ -24,7 +98,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, uint16_t deviceIndex) : _stripeId(filestorHandler.getNextStripeId(deviceIndex)), _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), - _warnOnSlowOperations(5000), + _sequencedExecutor(sequencedExecutor), _spi(provider), _processAllHandler(_env, provider), _mergeHandler(_spi, _env), @@ -91,63 +165,106 @@ PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, Messa } MessageTracker::UP -PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP tracker) +PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) { + MessageTracker & tracker = *trackerUP; auto& metrics = _env._metrics.put[cmd.getLoadType()]; - tracker->setMetric(metrics); + tracker.setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) { - return tracker; + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) { + return trackerUP; } - spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker->context()); - tracker->checkForError(response); - return tracker; + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); + if (_sequencedExecutor == nullptr) { + spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context()); + tracker.checkForError(response); + } else { + auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->sendReply(); + }); + _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); + } + return trackerUP; } MessageTracker::UP -PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP tracker) +PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP) { + MessageTracker & tracker = *trackerUP; auto& metrics = _env._metrics.remove[cmd.getLoadType()]; - tracker->setMetric(metrics); + tracker.setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context())) { - return tracker; + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context())) { + return trackerUP; } - spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker->context()); - if (tracker->checkForError(response)) { - tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - } - if (!response.wasFound()) { - _env._metrics.remove[cmd.getLoadType()].notFound.inc(); + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); + if (_sequencedExecutor == nullptr) { + spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context()); + if (tracker.checkForError(response)) { + tracker.setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); + } + if (!response.wasFound()) { + metrics.notFound.inc(); + } + } else { + // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker + auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP); + if (tracker->checkForError(response)) { + tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); + } + if (!response.wasFound()) { + metrics.notFound.inc(); + } + tracker->sendReply(); + }); + _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), + std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } - return tracker; + return trackerUP; } MessageTracker::UP -PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker) +PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) { + MessageTracker & tracker = *trackerUP; auto& metrics = _env._metrics.update[cmd.getLoadType()]; - tracker->setMetric(metrics); + tracker.setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) { - return tracker; + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) { + return trackerUP; } - - spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context()); - if (tracker->checkForError(response)) { - auto reply = std::make_shared<api::UpdateReply>(cmd); - reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(std::move(reply)); + + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); + if (_sequencedExecutor == nullptr) { + spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context()); + if (tracker.checkForError(response)) { + auto reply = std::make_shared<api::UpdateReply>(cmd); + reply->setOldTimestamp(response.getExistingTimestamp()); + tracker.setReply(std::move(reply)); + } + } else { + // Note that the &cmd capture is OK since its lifetime is guaranteed by the tracker + auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + const spi::UpdateResult & response = dynamic_cast<const spi::UpdateResult &>(*responseUP); + if (tracker->checkForError(response)) { + auto reply = std::make_shared<api::UpdateReply>(cmd); + reply->setOldTimestamp(response.getExistingTimestamp()); + tracker->setReply(std::move(reply)); + } + tracker->sendReply(); + }); + _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } - return tracker; + return trackerUP; } namespace { @@ -770,37 +887,12 @@ PersistenceThread::processMessage(api::StorageMessage& msg, MessageTracker::UP t } else { auto & initiatingCommand = static_cast<api::StorageCommand&>(msg); try { - int64_t startTime(_component->getClock().getTimeInMillis().getTime()); - LOG(debug, "Handling command: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - tracker = handleCommandSplitByType(initiatingCommand, std::move(tracker)); - if (!tracker) { - LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); - } else { - tracker->generateReply(initiatingCommand); - if ((tracker->hasReply() - && tracker->getReply().getResult().failed()) - || tracker->getResult().failed()) - { - _env._metrics.failedOperations.inc(); - } - } - - int64_t stopTime(_component->getClock().getTimeInMillis().getTime()); - if (stopTime - startTime >= _warnOnSlowOperations) { - LOGBT(warning, msg.getType().toString(), - "Slow processing of message %s on disk %u. Processing time: %" PRId64 " ms (>=%d ms)", - msg.toString().c_str(), _env._partition, stopTime - startTime, _warnOnSlowOperations); - } else { - LOGBT(spam, msg.getType().toString(), "Processing time of message %s on disk %u: %" PRId64 " ms", - msg.toString(true).c_str(), _env._partition, stopTime - startTime); - } - - return tracker; + return handleCommandSplitByType(initiatingCommand, std::move(tracker)); } catch (std::exception& e) { LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); - api::StorageReply::SP reply(initiatingCommand.makeReply().release()); + api::StorageReply::SP reply(initiatingCommand.makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); _env._fileStorHandler.sendReply(reply); } @@ -814,7 +906,7 @@ PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage lock) { LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); api::StorageMessage & msg(*lock.second); - auto tracker = std::make_unique<MessageTracker>(_env, std::move(lock.first), std::move(lock.second)); + auto tracker = std::make_unique<MessageTracker>(_env, _env._fileStorHandler, std::move(lock.first), std::move(lock.second)); tracker = processMessage(msg, std::move(tracker)); if (tracker) { tracker->sendReply(); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index fcecc963658..32387bc6826 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -10,6 +10,7 @@ #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/storagecomponent.h> #include <vespa/storage/common/statusmessages.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> namespace storage { @@ -19,9 +20,9 @@ class TestAndSetHelper; class PersistenceThread final : public DiskThread, public Types { public: - PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri, - spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, - FileStorThreadMetrics& metrics, uint16_t deviceIndex); + PersistenceThread(vespalib::ISequencedTaskExecutor *, ServiceLayerComponentRegister&, + const config::ConfigUri & configUri, spi::PersistenceProvider& provider, + FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex); ~PersistenceThread() override; /** Waits for current operation to be finished. */ @@ -48,7 +49,7 @@ public: private: uint32_t _stripeId; PersistenceUtil _env; - uint32_t _warnOnSlowOperations; + vespalib::ISequencedTaskExecutor * _sequencedExecutor; spi::PersistenceProvider& _spi; ProcessAllHandler _processAllHandler; MergeHandler _mergeHandler; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 9095b351f64..72d6a1bb8d3 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -29,23 +29,37 @@ namespace { (id == api::MessageType::REMOVELOCATION_ID || id == api::MessageType::JOINBUCKETS_ID)); } - + const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s; } MessageTracker::MessageTracker(PersistenceUtil & env, + MessageSender & replySender, + FileStorHandler::BucketLockInterface::SP bucketLock, + api::StorageMessage::SP msg) + : MessageTracker(env, replySender, true, std::move(bucketLock), std::move(msg)) +{} +MessageTracker::MessageTracker(PersistenceUtil & env, + MessageSender & replySender, + bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) : _sendReply(true), - _updateBucketInfo(hasBucketInfo(msg->getType().getId())), + _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), _bucketLock(std::move(bucketLock)), _msg(std::move(msg)), _context(_msg->getLoadType(), _msg->getPriority(), _msg->getTrace().getLevel()), _env(env), + _replySender(replySender), _metric(nullptr), _result(api::ReturnCode::OK), _timer(_env._component.getClock()) { } +MessageTracker::UP +MessageTracker::createForTesting(PersistenceUtil &env, MessageSender &replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { + return MessageTracker::UP(new MessageTracker(env, replySender, false, std::move(bucketLock), std::move(msg))); +} + void MessageTracker::setMetric(FileStorThreadMetrics::Op& metric) { metric.count.inc(); @@ -56,6 +70,21 @@ MessageTracker::~MessageTracker() = default; void MessageTracker::sendReply() { + if ( ! _msg->getType().isReply()) { + generateReply(static_cast<api::StorageCommand &>(*_msg)); + } + if ((hasReply() && getReply().getResult().failed()) || getResult().failed()) { + _env._metrics.failedOperations.inc(); + } + vespalib::duration duration = vespalib::from_s(_timer.getElapsedTimeAsDouble()/1000.0); + if (duration >= WARN_ON_SLOW_OPERATIONS) { + LOGBT(warning, _msg->getType().toString(), + "Slow processing of message %s on disk %u. Processing time: %1.1f s (>=%1.1f s)", + _msg->toString().c_str(), _env._partition, vespalib::to_s(duration), vespalib::to_s(WARN_ON_SLOW_OPERATIONS)); + } else { + LOGBT(spam, _msg->getType().toString(), "Processing time of message %s on disk %u: %1.1f s", + _msg->toString(true).c_str(), _env._partition, vespalib::to_s(duration)); + } if (hasReply()) { if ( ! _context.getTrace().getRoot().isEmpty()) { getReply().getTrace().getRoot().addChild(_context.getTrace().getRoot()); @@ -70,7 +99,7 @@ MessageTracker::sendReply() { } LOG(spam, "Sending reply up: %s %" PRIu64, getReply().toString().c_str(), getReply().getMsgId()); - _env._fileStorHandler.sendReply(std::move(_reply)); + _replySender.sendReply(std::move(_reply)); } else { if ( ! _context.getTrace().getRoot().isEmpty()) { _msg->getTrace().getRoot().addChild(_context.getTrace().getRoot()); @@ -105,8 +134,8 @@ MessageTracker::generateReply(api::StorageCommand& cmd) return; } - if (!_reply.get()) { - _reply.reset(cmd.makeReply().release()); + if (!_reply) { + _reply = cmd.makeReply(); _reply->setResult(_result); } @@ -138,7 +167,7 @@ PersistenceUtil::PersistenceUtil( { } -PersistenceUtil::~PersistenceUtil() { } +PersistenceUtil::~PersistenceUtil() = default; void PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 126fa6ca17a..a57ef186b46 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -19,7 +19,8 @@ class MessageTracker : protected Types { public: typedef std::unique_ptr<MessageTracker> UP; - MessageTracker(PersistenceUtil & env, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); + MessageTracker(PersistenceUtil & env, MessageSender & replySender, + FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); ~MessageTracker(); @@ -62,18 +63,28 @@ public: api::ReturnCode getResult() const { return _result; } spi::Context & context() { return _context; } + document::BucketId getBucketId() const { + return _bucketLock->getBucket().getBucketId(); + } void sendReply(); bool checkForError(const spi::Result& response); + static MessageTracker::UP + createForTesting(PersistenceUtil & env, MessageSender & replySender, + FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); + private: + MessageTracker(PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, + FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); bool _sendReply; bool _updateBucketInfo; FileStorHandler::BucketLockInterface::SP _bucketLock; api::StorageMessage::SP _msg; spi::Context _context; PersistenceUtil &_env; + MessageSender &_replySender; FileStorThreadMetrics::Op *_metric; api::StorageReply::SP _reply; api::ReturnCode _result; diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index a86edd359fc..a5564282d17 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -9,12 +9,17 @@ template <typename ResultType> ResultType ProviderErrorWrapper::checkResult(ResultType&& result) const { + handle(result); + return std::forward<ResultType>(result); +} + +void +ProviderErrorWrapper::handle(const spi::Result & result) const { if (result.getErrorCode() == spi::Result::ErrorType::FATAL_ERROR) { trigger_shutdown_listeners(result.getErrorMessage()); } else if (result.getErrorCode() == spi::Result::ErrorType::RESOURCE_EXHAUSTED) { trigger_resource_exhaustion_listeners(result.getErrorMessage()); } - return std::forward<ResultType>(result); } void ProviderErrorWrapper::trigger_shutdown_listeners(vespalib::stringref reason) const { @@ -61,8 +66,7 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste } spi::Result -ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, - spi::BucketInfo::ActiveState newState) +ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) { return checkResult(_impl.setActiveState(bucket, newState)); } @@ -80,76 +84,59 @@ ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::Doc } spi::RemoveResult -ProviderErrorWrapper::remove(const spi::Bucket& bucket, - spi::Timestamp ts, - const document::DocumentId& docId, - spi::Context& context) +ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) { return checkResult(_impl.remove(bucket, ts, docId, context)); } spi::RemoveResult -ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, - spi::Timestamp ts, - const document::DocumentId& docId, - spi::Context& context) +ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts, + const document::DocumentId& docId, spi::Context& context) { return checkResult(_impl.removeIfFound(bucket, ts, docId, context)); } spi::UpdateResult -ProviderErrorWrapper::update(const spi::Bucket& bucket, - spi::Timestamp ts, - const spi::DocumentUpdateSP& docUpdate, - spi::Context& context) +ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts, + spi::DocumentUpdateSP docUpdate, spi::Context& context) { - return checkResult(_impl.update(bucket, ts, docUpdate, context)); + return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context)); } spi::GetResult -ProviderErrorWrapper::get(const spi::Bucket& bucket, - const document::FieldSet& fieldSet, - const document::DocumentId& docId, - spi::Context& context) const +ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet, + const document::DocumentId& docId, spi::Context& context) const { return checkResult(_impl.get(bucket, fieldSet, docId, context)); } spi::CreateIteratorResult -ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, - const document::FieldSet& fieldSet, - const spi::Selection& selection, - spi::IncludedVersions versions, - spi::Context& context) +ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet, + const spi::Selection& selection, spi::IncludedVersions versions, spi::Context& context) { return checkResult(_impl.createIterator(bucket, fieldSet, selection, versions, context)); } spi::IterateResult -ProviderErrorWrapper::iterate(spi::IteratorId iteratorId, - uint64_t maxByteSize, - spi::Context& context) const +ProviderErrorWrapper::iterate(spi::IteratorId iteratorId, uint64_t maxByteSize, spi::Context& context) const { return checkResult(_impl.iterate(iteratorId, maxByteSize, context)); } spi::Result -ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, - spi::Context& context) +ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context& context) { return checkResult(_impl.destroyIterator(iteratorId, context)); } spi::Result -ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, - spi::Context& context) +ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.createBucket(bucket, context)); } spi::Result -ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, - spi::Context& context) +ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context) { return checkResult(_impl.deleteBucket(bucket, context)); } @@ -161,34 +148,61 @@ ProviderErrorWrapper::getModifiedBuckets(BucketSpace bucketSpace) const } spi::Result -ProviderErrorWrapper::split(const spi::Bucket& source, - const spi::Bucket& target1, - const spi::Bucket& target2, - spi::Context& context) +ProviderErrorWrapper::split(const spi::Bucket& source, const spi::Bucket& target1, + const spi::Bucket& target2, spi::Context& context) { return checkResult(_impl.split(source, target1, target2, context)); } spi::Result -ProviderErrorWrapper::join(const spi::Bucket& source1, - const spi::Bucket& source2, - const spi::Bucket& target, spi::Context& context) +ProviderErrorWrapper::join(const spi::Bucket& source1, const spi::Bucket& source2, + const spi::Bucket& target, spi::Context& context) { return checkResult(_impl.join(source1, source2, target, context)); } spi::Result -ProviderErrorWrapper::move(const spi::Bucket& source, - spi::PartitionId target, spi::Context& context) +ProviderErrorWrapper::move(const spi::Bucket& source, spi::PartitionId target, spi::Context& context) { return checkResult(_impl.move(source, target, context)); } spi::Result -ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, - spi::Timestamp ts, spi::Context& context) +ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, spi::Context& context) { return checkResult(_impl.removeEntry(bucket, ts, context)); } +void +ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc, + spi::Context &context, spi::OperationComplete::UP onComplete) +{ + onComplete->addResultHandler(this); + _impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete)); +} + +void +ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId, + spi::Context & context, spi::OperationComplete::UP onComplete) +{ + onComplete->addResultHandler(this); + _impl.removeAsync(bucket, ts, docId, context, std::move(onComplete)); +} + +void +ProviderErrorWrapper::removeIfFoundAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId, + spi::Context & context, spi::OperationComplete::UP onComplete) +{ + onComplete->addResultHandler(this); + _impl.removeIfFoundAsync(bucket, ts, docId, context, std::move(onComplete)); +} + +void +ProviderErrorWrapper::updateAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentUpdateSP upd, + spi::Context &context, spi::OperationComplete::UP onComplete) +{ + onComplete->addResultHandler(this); + _impl.updateAsync(bucket, ts, std::move(upd), context, std::move(onComplete)); +} + } // ns storage diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 23da566afee..602877e0b02 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -33,7 +33,7 @@ public: } }; -class ProviderErrorWrapper : public spi::PersistenceProvider { +class ProviderErrorWrapper : public spi::PersistenceProvider, public spi::ResultHandler { public: explicit ProviderErrorWrapper(spi::PersistenceProvider& impl) : _impl(impl), @@ -50,7 +50,7 @@ public: spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; + spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; @@ -72,9 +72,16 @@ public: } void register_error_listener(std::shared_ptr<ProviderErrorListener> listener); + + void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; + private: template <typename ResultType> ResultType checkResult(ResultType&& result) const; + void handle(const spi::Result &) const override; void trigger_shutdown_listeners(vespalib::stringref reason) const; void trigger_resource_exhaustion_listeners(vespalib::stringref reason) const; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index fa2b0cda018..a0b75b7602d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -28,45 +28,6 @@ using document::FixedBucketSpaces; namespace storage { -Queue::Queue() = default; -Queue::~Queue() = default; - -bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) { - vespalib::MonitorGuard sync(_queueMonitor); - bool first = true; - while (true) { // Max twice - if (!_queue.empty()) { - LOG(spam, "Picking message from queue"); - msg = std::move(_queue.front()); - _queue.pop(); - return true; - } - if (timeout == 0 || !first) { - return false; - } - sync.wait(timeout); - first = false; - } - - return false; -} - -void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { - vespalib::MonitorGuard sync(_queueMonitor); - _queue.emplace(std::move(msg)); - sync.unsafeSignalUnlock(); -} - -void Queue::signal() { - vespalib::MonitorGuard sync(_queueMonitor); - sync.unsafeSignalUnlock(); -} - -size_t Queue::size() const { - vespalib::MonitorGuard sync(_queueMonitor); - return _queue.size(); -} - StorageTransportContext::StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg) : _docAPIMsg(std::move(msg)) { } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index c08ad214768..1d64c8a8911 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -44,36 +44,6 @@ class VisitorThread; class FNetListener; class RPCRequestWrapper; -class Queue { -private: - using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>; - QueueType _queue; - vespalib::Monitor _queueMonitor; - -public: - Queue(); - ~Queue(); - - /** - * Returns the next event from the event queue - * @param msg The next event - * @param timeout Millisecs to wait if the queue is empty - * (0 = don't wait, -1 = forever) - * @return true or false if the queue was empty. - */ - bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout); - - /** - * Enqueue msg in FIFO order. - */ - void enqueue(std::shared_ptr<api::StorageMessage> msg); - - /** Signal queue monitor. */ - void signal(); - - size_t size() const; -}; - class StorageTransportContext : public api::TransportContext { public: StorageTransportContext(std::unique_ptr<documentapi::DocumentMessage> msg); diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp index 5efd638ec26..2e90e1ae3ee 100644 --- a/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp @@ -23,6 +23,6 @@ TestComponentRegister::TestComponentRegister(ComponentRegisterImpl::UP compReg) // register status pages without a server } -TestComponentRegister::~TestComponentRegister() {} +TestComponentRegister::~TestComponentRegister() = default; } |