diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-15 14:44:49 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-15 14:44:49 +0000 |
commit | 98a41274aace2131b34dad2faf253aa506d313c7 (patch) | |
tree | 785bbfff9ed4f413e9a4637a7a285d43b95a73e8 /searchcore | |
parent | e138d9f977423cda5a200eab82383d73c20ee05e (diff) |
- Refactor and use CatchResult in the PersistenceEngine in preparation for making more moretaions async.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp | 88 |
1 files changed, 57 insertions, 31 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 42720e83917..f3fa3e07229 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -4,6 +4,7 @@ #include "ipersistenceengineowner.h" #include "transport_latch.h" #include <vespa/persistence/spi/bucketexecutor.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/vespalib/stllike/hash_set.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/datatype/documenttype.h> @@ -25,6 +26,7 @@ using storage::spi::BucketInfo; using storage::spi::BucketInfoResult; using storage::spi::IncludedVersions; using storage::spi::Result; +using storage::spi::OperationComplete; using vespalib::IllegalStateException; using vespalib::Sequence; using vespalib::make_string; @@ -37,27 +39,40 @@ namespace proton { namespace { class ResultHandlerBase { +private: + virtual Result::UP createResult() const = 0; protected: - std::mutex _lock; - vespalib::CountDownLatch _latch; + std::mutex _lock; + std::atomic<uint32_t> _countDown; + OperationComplete::UP _onComplete; + void countDown(); public: - explicit ResultHandlerBase(uint32_t waitCnt); - ~ResultHandlerBase(); - void await() { _latch.await(); } + explicit ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete); + virtual ~ResultHandlerBase(); + + }; -ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt) +ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete) : _lock(), - _latch(waitCnt) + _countDown(waitCnt), + _onComplete(std::move(onComplete)) {} ResultHandlerBase::~ResultHandlerBase() = default; +void +ResultHandlerBase::countDown() { + if (_countDown.fetch_sub(1) == 1) { + _onComplete->onComplete(createResult()); + } +} + class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler { private: Result _result; public: - explicit GenericResultHandler(uint32_t waitCnt) : - ResultHandlerBase(waitCnt), + explicit GenericResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) : + ResultHandlerBase(waitCnt, std::move(onComplete)), _result() { } ~GenericResultHandler() override; @@ -70,9 +85,9 @@ public: _result = result; } } - _latch.countDown(); + countDown(); } - const Result &getResult() const { return _result; } + Result::UP createResult() const override { return std::make_unique<Result>(_result); } }; GenericResultHandler::~GenericResultHandler() = default; @@ -93,13 +108,13 @@ public: _bucketSet.insert(buckets[i]); } } - BucketIdListResult getResult() const { + std::unique_ptr<BucketIdListResult> getResult() const { BucketIdListResult::List buckets; buckets.reserve(_bucketSet.size()); for (document::BucketId bucketId : _bucketSet) { buckets.push_back(bucketId); } - return BucketIdListResult(buckets); + return std::make_unique<BucketIdListResult>(buckets); } }; @@ -110,8 +125,8 @@ class SynchronizedBucketIdListResultHandler : public ResultHandlerBase, public BucketIdListResultHandler { public: - explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt) - : ResultHandlerBase(waitCnt), + explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) + : ResultHandlerBase(waitCnt, std::move(onComplete)), BucketIdListResultHandler() { } ~SynchronizedBucketIdListResultHandler() override; @@ -120,7 +135,10 @@ public: std::lock_guard<std::mutex> guard(_lock); BucketIdListResultHandler::handle(result); } - _latch.countDown(); + countDown(); + } + Result::UP createResult() const override { + return getResult(); } }; @@ -256,7 +274,7 @@ PersistenceEngine::listBuckets(BucketSpace bucketSpace) const IPersistenceHandler *handler = snap.handlers().get(); handler->handleListBuckets(resultHandler); } - return resultHandler.getResult(); + return *resultHandler.getResult(); } @@ -266,14 +284,16 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState & ReadGuard rguard(_rwMutex); saveClusterState(bucketSpace, calc); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - GenericResultHandler resultHandler(snap.size()); + auto catchResult = std::make_unique<storage::spi::CatchResult>(); + auto futureResult = catchResult->future_result(); + GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetClusterState(calc, resultHandler); } - resultHandler.await(); + Result::UP result = futureResult.get(); _owner.setClusterState(bucketSpace, calc); - return resultHandler.getResult(); + return *result; } @@ -283,13 +303,14 @@ PersistenceEngine::setActiveState(const Bucket& bucket, { ReadGuard rguard(_rwMutex); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace()); - GenericResultHandler resultHandler(snap.size()); + auto catchResult = std::make_unique<storage::spi::CatchResult>(); + auto futureResult = catchResult->future_result(); + GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetActiveState(bucket, newState, resultHandler); } - resultHandler.await(); - return resultHandler.getResult(); + return *futureResult.get(); } @@ -560,7 +581,9 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const extraModifiedBuckets.swap(_extraModifiedBuckets[bucketSpace]); } HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size()); + auto catchResult = std::make_unique<storage::spi::CatchResult>(); + auto futureResult = catchResult->future_result(); + SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size(), std::move(catchResult)); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleGetModifiedBuckets(resultHandler); @@ -568,8 +591,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const for (const auto & item : extraModifiedBuckets) { resultHandler.handle(*item); } - resultHandler.await(); - return resultHandler.getResult(); + return dynamic_cast<BucketIdListResult &>(*futureResult.get()); } @@ -661,9 +683,11 @@ PersistenceEngine::propagateSavedClusterState(BucketSpace bucketSpace, IPersiste return; // Propagate saved cluster state. // TODO: Fix race with new cluster state setting. - GenericResultHandler resultHandler(1); + auto catchResult = std::make_unique<storage::spi::CatchResult>(); + auto futureResult = catchResult->future_result(); + GenericResultHandler resultHandler(1, std::move(catchResult)); handler.handleSetClusterState(*clusterState, resultHandler); - resultHandler.await(); + futureResult.get(); } void @@ -671,7 +695,7 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc { BucketIdListResultHandler resultHandler; handler.handleListBuckets(resultHandler); - auto result = std::make_shared<BucketIdListResult>(resultHandler.getResult()); + std::shared_ptr<BucketIdListResult> result(resultHandler.getResult()); std::lock_guard<std::mutex> guard(_lock); _extraModifiedBuckets[bucketSpace].push_back(result); } @@ -725,9 +749,11 @@ PersistenceEngine::populateInitialBucketDB(const WriteGuard & guard, BucketSpace buckets.push_back(item.first); } LOG(info, "Adding %zu active buckets (%zu flawed) to new bucket db", buckets.size(), flawed); - GenericResultHandler trHandler(1); + auto catchResult = std::make_unique<storage::spi::CatchResult>(); + auto futureResult = catchResult->future_result(); + GenericResultHandler trHandler(1, std::move(catchResult)); targetHandler.handlePopulateActiveBuckets(buckets, trHandler); - trHandler.await(); + futureResult.get(); } std::unique_lock<std::shared_mutex> |