diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-15 22:53:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-15 22:53:58 +0200 |
commit | b66bb8b16e238b8b0219ccc8f6327bad7f7b8abd (patch) | |
tree | 11c074b0e262e18f290747e07535c599bf17c34e /searchcore | |
parent | 01404bb83012174e81986dee2939e529edfa5d94 (diff) |
Revert "- Refactor and use CatchResult in the PersistenceEngine in preparatio…"
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp | 88 |
1 files changed, 31 insertions, 57 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index f3fa3e07229..42720e83917 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -4,7 +4,6 @@ #include "ipersistenceengineowner.h" #include "transport_latch.h" #include <vespa/persistence/spi/bucketexecutor.h> -#include <vespa/persistence/spi/catchresult.h> #include <vespa/vespalib/stllike/hash_set.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/datatype/documenttype.h> @@ -26,7 +25,6 @@ using storage::spi::BucketInfo; using storage::spi::BucketInfoResult; using storage::spi::IncludedVersions; using storage::spi::Result; -using storage::spi::OperationComplete; using vespalib::IllegalStateException; using vespalib::Sequence; using vespalib::make_string; @@ -39,40 +37,27 @@ namespace proton { namespace { class ResultHandlerBase { -private: - virtual Result::UP createResult() const = 0; protected: - std::mutex _lock; - std::atomic<uint32_t> _countDown; - OperationComplete::UP _onComplete; - void countDown(); + std::mutex _lock; + vespalib::CountDownLatch _latch; public: - explicit ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete); - virtual ~ResultHandlerBase(); - - + explicit ResultHandlerBase(uint32_t waitCnt); + ~ResultHandlerBase(); + void await() { _latch.await(); } }; -ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete) +ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt) : _lock(), - _countDown(waitCnt), - _onComplete(std::move(onComplete)) + _latch(waitCnt) {} ResultHandlerBase::~ResultHandlerBase() = default; -void -ResultHandlerBase::countDown() { - if (_countDown.fetch_sub(1) == 1) { - _onComplete->onComplete(createResult()); - } -} - class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler { private: Result _result; public: - explicit GenericResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) : - ResultHandlerBase(waitCnt, std::move(onComplete)), + explicit GenericResultHandler(uint32_t waitCnt) : + ResultHandlerBase(waitCnt), _result() { } ~GenericResultHandler() override; @@ -85,9 +70,9 @@ public: _result = result; } } - countDown(); + _latch.countDown(); } - Result::UP createResult() const override { return std::make_unique<Result>(_result); } + const Result &getResult() const { return _result; } }; GenericResultHandler::~GenericResultHandler() = default; @@ -108,13 +93,13 @@ public: _bucketSet.insert(buckets[i]); } } - std::unique_ptr<BucketIdListResult> getResult() const { + BucketIdListResult getResult() const { BucketIdListResult::List buckets; buckets.reserve(_bucketSet.size()); for (document::BucketId bucketId : _bucketSet) { buckets.push_back(bucketId); } - return std::make_unique<BucketIdListResult>(buckets); + return BucketIdListResult(buckets); } }; @@ -125,8 +110,8 @@ class SynchronizedBucketIdListResultHandler : public ResultHandlerBase, public BucketIdListResultHandler { public: - explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) - : ResultHandlerBase(waitCnt, std::move(onComplete)), + explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt) + : ResultHandlerBase(waitCnt), BucketIdListResultHandler() { } ~SynchronizedBucketIdListResultHandler() override; @@ -135,10 +120,7 @@ public: std::lock_guard<std::mutex> guard(_lock); BucketIdListResultHandler::handle(result); } - countDown(); - } - Result::UP createResult() const override { - return getResult(); + _latch.countDown(); } }; @@ -274,7 +256,7 @@ PersistenceEngine::listBuckets(BucketSpace bucketSpace) const IPersistenceHandler *handler = snap.handlers().get(); handler->handleListBuckets(resultHandler); } - return *resultHandler.getResult(); + return resultHandler.getResult(); } @@ -284,16 +266,14 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState & ReadGuard rguard(_rwMutex); saveClusterState(bucketSpace, calc); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); + GenericResultHandler resultHandler(snap.size()); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetClusterState(calc, resultHandler); } - Result::UP result = futureResult.get(); + resultHandler.await(); _owner.setClusterState(bucketSpace, calc); - return *result; + return resultHandler.getResult(); } @@ -303,14 +283,13 @@ PersistenceEngine::setActiveState(const Bucket& bucket, { ReadGuard rguard(_rwMutex); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace()); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); + GenericResultHandler resultHandler(snap.size()); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleSetActiveState(bucket, newState, resultHandler); } - return *futureResult.get(); + resultHandler.await(); + return resultHandler.getResult(); } @@ -581,9 +560,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const extraModifiedBuckets.swap(_extraModifiedBuckets[bucketSpace]); } HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size(), std::move(catchResult)); + SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size()); for (; snap.handlers().valid(); snap.handlers().next()) { IPersistenceHandler *handler = snap.handlers().get(); handler->handleGetModifiedBuckets(resultHandler); @@ -591,7 +568,8 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const for (const auto & item : extraModifiedBuckets) { resultHandler.handle(*item); } - return dynamic_cast<BucketIdListResult &>(*futureResult.get()); + resultHandler.await(); + return resultHandler.getResult(); } @@ -683,11 +661,9 @@ PersistenceEngine::propagateSavedClusterState(BucketSpace bucketSpace, IPersiste return; // Propagate saved cluster state. // TODO: Fix race with new cluster state setting. - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(1, std::move(catchResult)); + GenericResultHandler resultHandler(1); handler.handleSetClusterState(*clusterState, resultHandler); - futureResult.get(); + resultHandler.await(); } void @@ -695,7 +671,7 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc { BucketIdListResultHandler resultHandler; handler.handleListBuckets(resultHandler); - std::shared_ptr<BucketIdListResult> result(resultHandler.getResult()); + auto result = std::make_shared<BucketIdListResult>(resultHandler.getResult()); std::lock_guard<std::mutex> guard(_lock); _extraModifiedBuckets[bucketSpace].push_back(result); } @@ -749,11 +725,9 @@ PersistenceEngine::populateInitialBucketDB(const WriteGuard & guard, BucketSpace buckets.push_back(item.first); } LOG(info, "Adding %zu active buckets (%zu flawed) to new bucket db", buckets.size(), flawed); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler trHandler(1, std::move(catchResult)); + GenericResultHandler trHandler(1); targetHandler.handlePopulateActiveBuckets(buckets, trHandler); - futureResult.get(); + trHandler.await(); } std::unique_lock<std::shared_mutex> |