diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-15 22:53:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-15 22:53:58 +0200 |
commit | b66bb8b16e238b8b0219ccc8f6327bad7f7b8abd (patch) | |
tree | 11c074b0e262e18f290747e07535c599bf17c34e | |
parent | 01404bb83012174e81986dee2939e529edfa5d94 (diff) |
Revert "- Refactor and use CatchResult in the PersistenceEngine in preparatio…"
5 files changed, 50 insertions, 106 deletions
diff --git a/persistence/src/vespa/persistence/spi/CMakeLists.txt b/persistence/src/vespa/persistence/spi/CMakeLists.txt index eda3ffb228f..14550ef97a6 100644 --- a/persistence/src/vespa/persistence/spi/CMakeLists.txt +++ b/persistence/src/vespa/persistence/spi/CMakeLists.txt @@ -5,7 +5,6 @@ vespa_add_library(persistence_spi OBJECT attribute_resource_usage.cpp bucket.cpp bucketinfo.cpp - catchresult.cpp clusterstate.cpp context.cpp docentry.cpp diff --git a/persistence/src/vespa/persistence/spi/catchresult.cpp b/persistence/src/vespa/persistence/spi/catchresult.cpp deleted file mode 100644 index 3dbe8cfdf7e..00000000000 --- a/persistence/src/vespa/persistence/spi/catchresult.cpp +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "catchresult.h" -#include "result.h" -#include <cassert> - -namespace storage::spi { - -CatchResult::CatchResult() - : _promisedResult(), - _resulthandler(nullptr) -{} -CatchResult::~CatchResult() = default; - -void -CatchResult::onComplete(std::unique_ptr<Result> result) { - _promisedResult.set_value(std::move(result)); -} -void -CatchResult::addResultHandler(const ResultHandler * resultHandler) { - assert(_resulthandler == nullptr); - _resulthandler = resultHandler; -} - -} diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h deleted file mode 100644 index 80d4f863971..00000000000 --- a/persistence/src/vespa/persistence/spi/catchresult.h +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "operationcomplete.h" -#include <future> - -namespace storage::spi { - -class CatchResult : public OperationComplete { -public: - CatchResult(); - ~CatchResult() override; - std::future<std::unique_ptr<Result>> future_result() { - return _promisedResult.get_future(); - } - void onComplete(std::unique_ptr<Result> result) override; - void addResultHandler(const ResultHandler * resultHandler) override; -private: - std::promise<std::unique_ptr<Result>> _promisedResult; - const ResultHandler *_resulthandler; -}; - -} diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 575a95269c5..9d40a494ca6 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -1,13 +1,31 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistenceprovider.h" -#include "catchresult.h" #include <future> +#include <cassert> namespace storage::spi { PersistenceProvider::~PersistenceProvider() = default; +class CatchResult : public OperationComplete { +public: + CatchResult() : _promisedResult(), _resulthandler(nullptr) {} + std::future<Result::UP> future_result() { + return _promisedResult.get_future(); + } + void onComplete(Result::UP result) override { + _promisedResult.set_value(std::move(result)); + } + void addResultHandler(const ResultHandler * resultHandler) override { + assert(_resulthandler == nullptr); + _resulthandler = resultHandler; + } +private: + std::promise<Result::UP> _promisedResult; + const ResultHandler *_resulthandler; +}; + Result PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) { auto catcher = std::make_unique<CatchResult>(); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index f3fa3e07229..42720e83917 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -4,7 +4,6 @@ #include "ipersistenceengineowner.h" #include "transport_latch.h" #include <vespa/persistence/spi/bucketexecutor.h> -#include <vespa/persistence/spi/catchresult.h> #include <vespa/vespalib/stllike/hash_set.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/datatype/documenttype.h> @@ -26,7 +25,6 @@ using storage::spi::BucketInfo; using storage::spi::BucketInfoResult; using storage::spi::IncludedVersions; using storage::spi::Result; -using storage::spi::OperationComplete; using vespalib::IllegalStateException; using vespalib::Sequence; using vespalib::make_string; @@ -39,40 +37,27 @@ namespace proton { namespace { class ResultHandlerBase { -private: - virtual Result::UP createResult() const = 0; protected: - std::mutex _lock; - std::atomic<uint32_t> _countDown; - OperationComplete::UP _onComplete; - void countDown(); + std::mutex _lock; + vespalib::CountDownLatch _latch; public: - explicit ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete); - virtual ~ResultHandlerBase(); - - + explicit ResultHandlerBase(uint32_t waitCnt); + ~ResultHandlerBase(); + void await() { _latch.await(); } }; -ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete) +ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt) : _lock(), - _countDown(waitCnt), - _onComplete(std::move(onComplete)) + _latch(waitCnt) {} ResultHandlerBase::~ResultHandlerBase() = default; -void -ResultHandlerBase::countDown() { - if (_countDown.fetch_sub(1) == 1) { - _onComplete->onComplete(createResult()); - } -} - class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler { private: Result _result; public: - explicit GenericResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) : - ResultHandlerBase(waitCnt, std::move(onComplete)), + explicit GenericResultHandler(uint32_t waitCnt) : + ResultHandlerBase(waitCnt), _result() { } ~GenericResultHandler() override; @@ -85,9 +70,9 @@ public: _result = result; } } - countDown(); + _latch.countDown(); } - Result::UP createResult() const override { return std::make_unique<Result>(_result); } + const Result &getResult() const { return _result; } }; GenericResultHandler::~GenericResultHandler() = default; @@ -108,13 +93,13 @@ public: _bucketSet.insert(buckets[i]); } } - std::unique_ptr<BucketIdListResult> getResult() const { + BucketIdListResult getResult() const { BucketIdListResult::List buckets; buckets.reserve(_bucketSet.size()); for (document::BucketId bucketId : _bucketSet) { buckets.push_back(bucketId); } - return std::make_unique<BucketIdListResult>(buckets); + return BucketIdListResult(buckets); } }; @@ -125,8 +110,8 @@ class SynchronizedBucketIdListResultHandler : public ResultHandlerBase, public BucketIdListResultHandler { public: - explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) - : ResultHandlerBase(waitCnt, std::move(onComplete)), + explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt) + : ResultHandlerBase(waitCnt), BucketIdListResultHandler() { } ~SynchronizedBucketIdListResultHandler() override; @@ -135,10 +120,7 @@ public: std::lock_guard<std::mutex> guard(_lock); BucketIdListResultHandler::handle(result); } - countDown(); - } - Result::UP createResult() const override { - return getResult(); + _latch.countDown(); } }; @@ -274,7 +256,7 @@ PersistenceEngine::listBuckets(BucketSpace bucketSpace) const IPersistenceHandler *handler = snap.handlers().get(); handler->handleListBuckets(resultHandler); } - return *resultHandler.getResult(); + return resultHandler.getResult(); } @@ -284,16 +266,14 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState & ReadGuard rguard(_rwMutex); saveClusterState(bucketSpace, calc); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); + GenericResultHandler resultHandler(snap.size()); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetClusterState(calc, resultHandler); } - Result::UP result = futureResult.get(); + resultHandler.await(); _owner.setClusterState(bucketSpace, calc); - return *result; + return resultHandler.getResult(); } @@ -303,14 +283,13 @@ PersistenceEngine::setActiveState(const Bucket& bucket, { ReadGuard rguard(_rwMutex); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace()); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); + GenericResultHandler resultHandler(snap.size()); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetActiveState(bucket, newState, resultHandler); } - return *futureResult.get(); + resultHandler.await(); + return resultHandler.getResult(); } @@ -581,9 +560,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const extraModifiedBuckets.swap(_extraModifiedBuckets[bucketSpace]); } HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size(), std::move(catchResult)); + SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size()); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleGetModifiedBuckets(resultHandler); @@ -591,7 +568,8 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const for (const auto & item : extraModifiedBuckets) { resultHandler.handle(*item); } - return dynamic_cast<BucketIdListResult &>(*futureResult.get()); + resultHandler.await(); + return resultHandler.getResult(); } @@ -683,11 +661,9 @@ PersistenceEngine::propagateSavedClusterState(BucketSpace bucketSpace, IPersiste return; // Propagate saved cluster state. // TODO: Fix race with new cluster state setting. - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(1, std::move(catchResult)); + GenericResultHandler resultHandler(1); handler.handleSetClusterState(*clusterState, resultHandler); - futureResult.get(); + resultHandler.await(); } void @@ -695,7 +671,7 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc { BucketIdListResultHandler resultHandler; handler.handleListBuckets(resultHandler); - std::shared_ptr<BucketIdListResult> result(resultHandler.getResult()); + auto result = std::make_shared<BucketIdListResult>(resultHandler.getResult()); std::lock_guard<std::mutex> guard(_lock); _extraModifiedBuckets[bucketSpace].push_back(result); } @@ -749,11 +725,9 @@ PersistenceEngine::populateInitialBucketDB(const WriteGuard & guard, BucketSpace buckets.push_back(item.first); } LOG(info, "Adding %zu active buckets (%zu flawed) to new bucket db", buckets.size(), flawed); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler trHandler(1, std::move(catchResult)); + GenericResultHandler trHandler(1); targetHandler.handlePopulateActiveBuckets(buckets, trHandler); - futureResult.get(); + trHandler.await(); } std::unique_lock<std::shared_mutex> |