summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-15 22:54:51 +0200
committerGitHub <noreply@github.com>2021-10-15 22:54:51 +0200
commit8b65d8a6a47cfef806cd44c3c59e8e14d71c90b9 (patch)
tree11c074b0e262e18f290747e07535c599bf17c34e
parent01404bb83012174e81986dee2939e529edfa5d94 (diff)
parentb66bb8b16e238b8b0219ccc8f6327bad7f7b8abd (diff)
Merge pull request #19596 from vespa-engine/revert-19590-balder/unify-and-simplify-more-async-operations
Revert "- Refactor and use CatchResult in the PersistenceEngine in preparatio…"
-rw-r--r--persistence/src/vespa/persistence/spi/CMakeLists.txt1
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.cpp25
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.h22
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp88
5 files changed, 50 insertions, 106 deletions
diff --git a/persistence/src/vespa/persistence/spi/CMakeLists.txt b/persistence/src/vespa/persistence/spi/CMakeLists.txt
index eda3ffb228f..14550ef97a6 100644
--- a/persistence/src/vespa/persistence/spi/CMakeLists.txt
+++ b/persistence/src/vespa/persistence/spi/CMakeLists.txt
@@ -5,7 +5,6 @@ vespa_add_library(persistence_spi OBJECT
attribute_resource_usage.cpp
bucket.cpp
bucketinfo.cpp
- catchresult.cpp
clusterstate.cpp
context.cpp
docentry.cpp
diff --git a/persistence/src/vespa/persistence/spi/catchresult.cpp b/persistence/src/vespa/persistence/spi/catchresult.cpp
deleted file mode 100644
index 3dbe8cfdf7e..00000000000
--- a/persistence/src/vespa/persistence/spi/catchresult.cpp
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "catchresult.h"
-#include "result.h"
-#include <cassert>
-
-namespace storage::spi {
-
-CatchResult::CatchResult()
- : _promisedResult(),
- _resulthandler(nullptr)
-{}
-CatchResult::~CatchResult() = default;
-
-void
-CatchResult::onComplete(std::unique_ptr<Result> result) {
- _promisedResult.set_value(std::move(result));
-}
-void
-CatchResult::addResultHandler(const ResultHandler * resultHandler) {
- assert(_resulthandler == nullptr);
- _resulthandler = resultHandler;
-}
-
-}
diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h
deleted file mode 100644
index 80d4f863971..00000000000
--- a/persistence/src/vespa/persistence/spi/catchresult.h
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "operationcomplete.h"
-#include <future>
-
-namespace storage::spi {
-
-class CatchResult : public OperationComplete {
-public:
- CatchResult();
- ~CatchResult() override;
- std::future<std::unique_ptr<Result>> future_result() {
- return _promisedResult.get_future();
- }
- void onComplete(std::unique_ptr<Result> result) override;
- void addResultHandler(const ResultHandler * resultHandler) override;
-private:
- std::promise<std::unique_ptr<Result>> _promisedResult;
- const ResultHandler *_resulthandler;
-};
-
-}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 575a95269c5..9d40a494ca6 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -1,13 +1,31 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "persistenceprovider.h"
-#include "catchresult.h"
#include <future>
+#include <cassert>
namespace storage::spi {
PersistenceProvider::~PersistenceProvider() = default;
+class CatchResult : public OperationComplete {
+public:
+ CatchResult() : _promisedResult(), _resulthandler(nullptr) {}
+ std::future<Result::UP> future_result() {
+ return _promisedResult.get_future();
+ }
+ void onComplete(Result::UP result) override {
+ _promisedResult.set_value(std::move(result));
+ }
+ void addResultHandler(const ResultHandler * resultHandler) override {
+ assert(_resulthandler == nullptr);
+ _resulthandler = resultHandler;
+ }
+private:
+ std::promise<Result::UP> _promisedResult;
+ const ResultHandler *_resulthandler;
+};
+
Result
PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) {
auto catcher = std::make_unique<CatchResult>();
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index f3fa3e07229..42720e83917 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -4,7 +4,6 @@
#include "ipersistenceengineowner.h"
#include "transport_latch.h"
#include <vespa/persistence/spi/bucketexecutor.h>
-#include <vespa/persistence/spi/catchresult.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/datatype/documenttype.h>
@@ -26,7 +25,6 @@ using storage::spi::BucketInfo;
using storage::spi::BucketInfoResult;
using storage::spi::IncludedVersions;
using storage::spi::Result;
-using storage::spi::OperationComplete;
using vespalib::IllegalStateException;
using vespalib::Sequence;
using vespalib::make_string;
@@ -39,40 +37,27 @@ namespace proton {
namespace {
class ResultHandlerBase {
-private:
- virtual Result::UP createResult() const = 0;
protected:
- std::mutex _lock;
- std::atomic<uint32_t> _countDown;
- OperationComplete::UP _onComplete;
- void countDown();
+ std::mutex _lock;
+ vespalib::CountDownLatch _latch;
public:
- explicit ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete);
- virtual ~ResultHandlerBase();
-
-
+ explicit ResultHandlerBase(uint32_t waitCnt);
+ ~ResultHandlerBase();
+ void await() { _latch.await(); }
};
-ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete)
+ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt)
: _lock(),
- _countDown(waitCnt),
- _onComplete(std::move(onComplete))
+ _latch(waitCnt)
{}
ResultHandlerBase::~ResultHandlerBase() = default;
-void
-ResultHandlerBase::countDown() {
- if (_countDown.fetch_sub(1) == 1) {
- _onComplete->onComplete(createResult());
- }
-}
-
class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler {
private:
Result _result;
public:
- explicit GenericResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) :
- ResultHandlerBase(waitCnt, std::move(onComplete)),
+ explicit GenericResultHandler(uint32_t waitCnt) :
+ ResultHandlerBase(waitCnt),
_result()
{ }
~GenericResultHandler() override;
@@ -85,9 +70,9 @@ public:
_result = result;
}
}
- countDown();
+ _latch.countDown();
}
- Result::UP createResult() const override { return std::make_unique<Result>(_result); }
+ const Result &getResult() const { return _result; }
};
GenericResultHandler::~GenericResultHandler() = default;
@@ -108,13 +93,13 @@ public:
_bucketSet.insert(buckets[i]);
}
}
- std::unique_ptr<BucketIdListResult> getResult() const {
+ BucketIdListResult getResult() const {
BucketIdListResult::List buckets;
buckets.reserve(_bucketSet.size());
for (document::BucketId bucketId : _bucketSet) {
buckets.push_back(bucketId);
}
- return std::make_unique<BucketIdListResult>(buckets);
+ return BucketIdListResult(buckets);
}
};
@@ -125,8 +110,8 @@ class SynchronizedBucketIdListResultHandler : public ResultHandlerBase,
public BucketIdListResultHandler
{
public:
- explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete)
- : ResultHandlerBase(waitCnt, std::move(onComplete)),
+ explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt)
+ : ResultHandlerBase(waitCnt),
BucketIdListResultHandler()
{ }
~SynchronizedBucketIdListResultHandler() override;
@@ -135,10 +120,7 @@ public:
std::lock_guard<std::mutex> guard(_lock);
BucketIdListResultHandler::handle(result);
}
- countDown();
- }
- Result::UP createResult() const override {
- return getResult();
+ _latch.countDown();
}
};
@@ -274,7 +256,7 @@ PersistenceEngine::listBuckets(BucketSpace bucketSpace) const
IPersistenceHandler *handler = snap.handlers().get();
handler->handleListBuckets(resultHandler);
}
- return *resultHandler.getResult();
+ return resultHandler.getResult();
}
@@ -284,16 +266,14 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState &
ReadGuard rguard(_rwMutex);
saveClusterState(bucketSpace, calc);
HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace);
- auto catchResult = std::make_unique<storage::spi::CatchResult>();
- auto futureResult = catchResult->future_result();
- GenericResultHandler resultHandler(snap.size(), std::move(catchResult));
+ GenericResultHandler resultHandler(snap.size());
for (; snap.handlers().valid(); snap.handlers().next()) {
IPersistenceHandler *handler = snap.handlers().get();
handler->handleSetClusterState(calc, resultHandler);
}
- Result::UP result = futureResult.get();
+ resultHandler.await();
_owner.setClusterState(bucketSpace, calc);
- return *result;
+ return resultHandler.getResult();
}
@@ -303,14 +283,13 @@ PersistenceEngine::setActiveState(const Bucket& bucket,
{
ReadGuard rguard(_rwMutex);
HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace());
- auto catchResult = std::make_unique<storage::spi::CatchResult>();
- auto futureResult = catchResult->future_result();
- GenericResultHandler resultHandler(snap.size(), std::move(catchResult));
+ GenericResultHandler resultHandler(snap.size());
for (; snap.handlers().valid(); snap.handlers().next()) {
IPersistenceHandler *handler = snap.handlers().get();
handler->handleSetActiveState(bucket, newState, resultHandler);
}
- return *futureResult.get();
+ resultHandler.await();
+ return resultHandler.getResult();
}
@@ -581,9 +560,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const
extraModifiedBuckets.swap(_extraModifiedBuckets[bucketSpace]);
}
HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace);
- auto catchResult = std::make_unique<storage::spi::CatchResult>();
- auto futureResult = catchResult->future_result();
- SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size(), std::move(catchResult));
+ SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size());
for (; snap.handlers().valid(); snap.handlers().next()) {
IPersistenceHandler *handler = snap.handlers().get();
handler->handleGetModifiedBuckets(resultHandler);
@@ -591,7 +568,8 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const
for (const auto & item : extraModifiedBuckets) {
resultHandler.handle(*item);
}
- return dynamic_cast<BucketIdListResult &>(*futureResult.get());
+ resultHandler.await();
+ return resultHandler.getResult();
}
@@ -683,11 +661,9 @@ PersistenceEngine::propagateSavedClusterState(BucketSpace bucketSpace, IPersiste
return;
// Propagate saved cluster state.
// TODO: Fix race with new cluster state setting.
- auto catchResult = std::make_unique<storage::spi::CatchResult>();
- auto futureResult = catchResult->future_result();
- GenericResultHandler resultHandler(1, std::move(catchResult));
+ GenericResultHandler resultHandler(1);
handler.handleSetClusterState(*clusterState, resultHandler);
- futureResult.get();
+ resultHandler.await();
}
void
@@ -695,7 +671,7 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc
{
BucketIdListResultHandler resultHandler;
handler.handleListBuckets(resultHandler);
- std::shared_ptr<BucketIdListResult> result(resultHandler.getResult());
+ auto result = std::make_shared<BucketIdListResult>(resultHandler.getResult());
std::lock_guard<std::mutex> guard(_lock);
_extraModifiedBuckets[bucketSpace].push_back(result);
}
@@ -749,11 +725,9 @@ PersistenceEngine::populateInitialBucketDB(const WriteGuard & guard, BucketSpace
buckets.push_back(item.first);
}
LOG(info, "Adding %zu active buckets (%zu flawed) to new bucket db", buckets.size(), flawed);
- auto catchResult = std::make_unique<storage::spi::CatchResult>();
- auto futureResult = catchResult->future_result();
- GenericResultHandler trHandler(1, std::move(catchResult));
+ GenericResultHandler trHandler(1);
targetHandler.handlePopulateActiveBuckets(buckets, trHandler);
- futureResult.get();
+ trHandler.await();
}
std::unique_lock<std::shared_mutex>