From 4ebd712306e770039e5ca8c8a155d031f73b54a6 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 15 Oct 2021 21:08:01 +0000 Subject: Factor out CatchResult --- .../src/vespa/persistence/spi/CMakeLists.txt | 1 + .../src/vespa/persistence/spi/catchresult.cpp | 25 ++++++++++++++++++++++ .../src/vespa/persistence/spi/catchresult.h | 22 +++++++++++++++++++ .../vespa/persistence/spi/persistenceprovider.cpp | 20 +---------------- 4 files changed, 49 insertions(+), 19 deletions(-) create mode 100644 persistence/src/vespa/persistence/spi/catchresult.cpp create mode 100644 persistence/src/vespa/persistence/spi/catchresult.h diff --git a/persistence/src/vespa/persistence/spi/CMakeLists.txt b/persistence/src/vespa/persistence/spi/CMakeLists.txt index 14550ef97a6..eda3ffb228f 100644 --- a/persistence/src/vespa/persistence/spi/CMakeLists.txt +++ b/persistence/src/vespa/persistence/spi/CMakeLists.txt @@ -5,6 +5,7 @@ vespa_add_library(persistence_spi OBJECT attribute_resource_usage.cpp bucket.cpp bucketinfo.cpp + catchresult.cpp clusterstate.cpp context.cpp docentry.cpp diff --git a/persistence/src/vespa/persistence/spi/catchresult.cpp b/persistence/src/vespa/persistence/spi/catchresult.cpp new file mode 100644 index 00000000000..3dbe8cfdf7e --- /dev/null +++ b/persistence/src/vespa/persistence/spi/catchresult.cpp @@ -0,0 +1,25 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "catchresult.h" +#include "result.h" +#include + +namespace storage::spi { + +CatchResult::CatchResult() + : _promisedResult(), + _resulthandler(nullptr) +{} +CatchResult::~CatchResult() = default; + +void +CatchResult::onComplete(std::unique_ptr result) { + _promisedResult.set_value(std::move(result)); +} +void +CatchResult::addResultHandler(const ResultHandler * resultHandler) { + assert(_resulthandler == nullptr); + _resulthandler = resultHandler; +} + +} diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h new file mode 100644 index 00000000000..80d4f863971 --- /dev/null +++ b/persistence/src/vespa/persistence/spi/catchresult.h @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "operationcomplete.h" +#include + +namespace storage::spi { + +class CatchResult : public OperationComplete { +public: + CatchResult(); + ~CatchResult() override; + std::future> future_result() { + return _promisedResult.get_future(); + } + void onComplete(std::unique_ptr result) override; + void addResultHandler(const ResultHandler * resultHandler) override; +private: + std::promise> _promisedResult; + const ResultHandler *_resulthandler; +}; + +} diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 9d40a494ca6..575a95269c5 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -1,31 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistenceprovider.h" +#include "catchresult.h" #include -#include namespace storage::spi { PersistenceProvider::~PersistenceProvider() = default; -class CatchResult : public OperationComplete { -public: - CatchResult() : _promisedResult(), _resulthandler(nullptr) {} - std::future future_result() { - return _promisedResult.get_future(); - } - void onComplete(Result::UP result) override { - _promisedResult.set_value(std::move(result)); - } - void addResultHandler(const ResultHandler * resultHandler) override { - assert(_resulthandler == nullptr); - _resulthandler = resultHandler; - } -private: - std::promise _promisedResult; - const ResultHandler *_resulthandler; -}; - Result PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) { auto catcher = std::make_unique(); -- cgit v1.2.3 From 7abe325aaf70ea093d81cebbd5828a502691cd18 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 15 Oct 2021 21:59:47 +0000 Subject: Use CatchResult to propagate results, and remember to also reply if there are no handlers. --- .../proton/persistenceengine/persistenceengine.cpp | 103 ++++++++++++++------- 1 file changed, 69 insertions(+), 34 deletions(-) diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 42720e83917..136d95a068b 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -4,6 +4,7 @@ #include "ipersistenceengineowner.h" #include "transport_latch.h" #include +#include #include #include #include @@ -25,6 +26,7 @@ using storage::spi::BucketInfo; using storage::spi::BucketInfoResult; using storage::spi::IncludedVersions; using storage::spi::Result; +using storage::spi::OperationComplete; using vespalib::IllegalStateException; using vespalib::Sequence; using vespalib::make_string; @@ -37,29 +39,47 @@ namespace proton { namespace { class ResultHandlerBase { +private: + virtual Result::UP createResult() const = 0; protected: - std::mutex _lock; - vespalib::CountDownLatch _latch; + std::mutex _lock; + std::atomic _countDown; + OperationComplete::UP _onComplete; + void countDown(); + void complete() { + _onComplete->onComplete(createResult()); + } public: - explicit ResultHandlerBase(uint32_t waitCnt); - ~ResultHandlerBase(); - void await() { _latch.await(); } + ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete); + virtual ~ResultHandlerBase(); }; -ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt) +ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete) : _lock(), - _latch(waitCnt) + _countDown(waitCnt), + _onComplete(std::move(onComplete)) {} ResultHandlerBase::~ResultHandlerBase() = default; -class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler { +void +ResultHandlerBase::countDown() { + if (_countDown.fetch_sub(1) == 1) { + complete(); + } +} + +class GenericResultHandler final : public ResultHandlerBase, public IGenericResultHandler { private: Result _result; public: - explicit GenericResultHandler(uint32_t waitCnt) : - ResultHandlerBase(waitCnt), + GenericResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) : + ResultHandlerBase(waitCnt, std::move(onComplete)), _result() - { } + { + if (waitCnt == 0) { + complete(); + } + } ~GenericResultHandler() override; void handle(const Result &result) override { if (result.hasError()) { @@ -70,9 +90,9 @@ public: _result = result; } } - _latch.countDown(); + countDown(); } - const Result &getResult() const { return _result; } + Result::UP createResult() const override { return std::make_unique(_result); } }; GenericResultHandler::~GenericResultHandler() = default; @@ -93,13 +113,13 @@ public: _bucketSet.insert(buckets[i]); } } - BucketIdListResult getResult() const { + std::unique_ptr getResult() const { BucketIdListResult::List buckets; buckets.reserve(_bucketSet.size()); for (document::BucketId bucketId : _bucketSet) { buckets.push_back(bucketId); } - return BucketIdListResult(buckets); + return std::make_unique(buckets); } }; @@ -110,17 +130,24 @@ class SynchronizedBucketIdListResultHandler : public ResultHandlerBase, public BucketIdListResultHandler { public: - explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt) - : ResultHandlerBase(waitCnt), + SynchronizedBucketIdListResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) + : ResultHandlerBase(waitCnt, std::move(onComplete)), BucketIdListResultHandler() - { } + { + if (waitCnt == 0) { + complete(); + } + } ~SynchronizedBucketIdListResultHandler() override; void handle(const BucketIdListResult &result) override { { std::lock_guard guard(_lock); BucketIdListResultHandler::handle(result); } - _latch.countDown(); + countDown(); + } + Result::UP createResult() const override { + return getResult(); } }; @@ -256,7 +283,7 @@ PersistenceEngine::listBuckets(BucketSpace bucketSpace) const IPersistenceHandler *handler = snap.handlers().get(); handler->handleListBuckets(resultHandler); } - return resultHandler.getResult(); + return *resultHandler.getResult(); } @@ -266,14 +293,16 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState & ReadGuard rguard(_rwMutex); saveClusterState(bucketSpace, calc); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - GenericResultHandler resultHandler(snap.size()); + auto catchResult = std::make_unique(); + auto futureResult = catchResult->future_result(); + GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetClusterState(calc, resultHandler); } - resultHandler.await(); + Result::UP result = futureResult.get(); _owner.setClusterState(bucketSpace, calc); - return resultHandler.getResult(); + return *result; } @@ -283,13 +312,14 @@ PersistenceEngine::setActiveState(const Bucket& bucket, { ReadGuard rguard(_rwMutex); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace()); - GenericResultHandler resultHandler(snap.size()); + auto catchResult = std::make_unique(); + auto futureResult = catchResult->future_result(); + GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetActiveState(bucket, newState, resultHandler); } - resultHandler.await(); - return resultHandler.getResult(); + return *futureResult.get(); } @@ -560,7 +590,9 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const extraModifiedBuckets.swap(_extraModifiedBuckets[bucketSpace]); } HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size()); + auto catchResult = std::make_unique(); + auto futureResult = catchResult->future_result(); + SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size(), std::move(catchResult)); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleGetModifiedBuckets(resultHandler); @@ -568,8 +600,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const for (const auto & item : extraModifiedBuckets) { resultHandler.handle(*item); } - resultHandler.await(); - return resultHandler.getResult(); + return dynamic_cast(*futureResult.get()); } @@ -661,9 +692,11 @@ PersistenceEngine::propagateSavedClusterState(BucketSpace bucketSpace, IPersiste return; // Propagate saved cluster state. // TODO: Fix race with new cluster state setting. - GenericResultHandler resultHandler(1); + auto catchResult = std::make_unique(); + auto futureResult = catchResult->future_result(); + GenericResultHandler resultHandler(1, std::move(catchResult)); handler.handleSetClusterState(*clusterState, resultHandler); - resultHandler.await(); + futureResult.get(); } void @@ -671,7 +704,7 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc { BucketIdListResultHandler resultHandler; handler.handleListBuckets(resultHandler); - auto result = std::make_shared(resultHandler.getResult()); + std::shared_ptr result(resultHandler.getResult()); std::lock_guard guard(_lock); _extraModifiedBuckets[bucketSpace].push_back(result); } @@ -725,9 +758,11 @@ PersistenceEngine::populateInitialBucketDB(const WriteGuard & guard, BucketSpace buckets.push_back(item.first); } LOG(info, "Adding %zu active buckets (%zu flawed) to new bucket db", buckets.size(), flawed); - GenericResultHandler trHandler(1); + auto catchResult = std::make_unique(); + auto futureResult = catchResult->future_result(); + GenericResultHandler trHandler(1, std::move(catchResult)); targetHandler.handlePopulateActiveBuckets(buckets, trHandler); - trHandler.await(); + futureResult.get(); } std::unique_lock -- cgit v1.2.3