aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-16 00:19:52 +0200
committerGitHub <noreply@github.com>2021-10-16 00:19:52 +0200
commit4d60d2845dba1fcfb2b68bd64a616d50578bf88c (patch)
treee1bc96e148b3a9d7a12963c3f3d1cf9dabefac94
parentbdf432f73eadcdc4b21570ad52d5da3bd3a68fc1 (diff)
parent7abe325aaf70ea093d81cebbd5828a502691cd18 (diff)
Merge pull request #19597 from vespa-engine/balder/factor-out-catchresultv7.484.47
Factor out CatchResult
-rw-r--r--persistence/src/vespa/persistence/spi/CMakeLists.txt1
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.cpp25
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.h22
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp103
5 files changed, 118 insertions, 53 deletions
diff --git a/persistence/src/vespa/persistence/spi/CMakeLists.txt b/persistence/src/vespa/persistence/spi/CMakeLists.txt
index 14550ef97a6..eda3ffb228f 100644
--- a/persistence/src/vespa/persistence/spi/CMakeLists.txt
+++ b/persistence/src/vespa/persistence/spi/CMakeLists.txt
@@ -5,6 +5,7 @@ vespa_add_library(persistence_spi OBJECT
attribute_resource_usage.cpp
bucket.cpp
bucketinfo.cpp
+ catchresult.cpp
clusterstate.cpp
context.cpp
docentry.cpp
diff --git a/persistence/src/vespa/persistence/spi/catchresult.cpp b/persistence/src/vespa/persistence/spi/catchresult.cpp
new file mode 100644
index 00000000000..3dbe8cfdf7e
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/catchresult.cpp
@@ -0,0 +1,25 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "catchresult.h"
+#include "result.h"
+#include <cassert>
+
+namespace storage::spi {
+
+CatchResult::CatchResult()
+ : _promisedResult(),
+ _resulthandler(nullptr)
+{}
+CatchResult::~CatchResult() = default;
+
+void
+CatchResult::onComplete(std::unique_ptr<Result> result) {
+ _promisedResult.set_value(std::move(result));
+}
+void
+CatchResult::addResultHandler(const ResultHandler * resultHandler) {
+ assert(_resulthandler == nullptr);
+ _resulthandler = resultHandler;
+}
+
+}
diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h
new file mode 100644
index 00000000000..80d4f863971
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/catchresult.h
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "operationcomplete.h"
+#include <future>
+
+namespace storage::spi {
+
+class CatchResult : public OperationComplete {
+public:
+ CatchResult();
+ ~CatchResult() override;
+ std::future<std::unique_ptr<Result>> future_result() {
+ return _promisedResult.get_future();
+ }
+ void onComplete(std::unique_ptr<Result> result) override;
+ void addResultHandler(const ResultHandler * resultHandler) override;
+private:
+ std::promise<std::unique_ptr<Result>> _promisedResult;
+ const ResultHandler *_resulthandler;
+};
+
+}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 9d40a494ca6..575a95269c5 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -1,31 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "persistenceprovider.h"
+#include "catchresult.h"
#include <future>
-#include <cassert>
namespace storage::spi {
PersistenceProvider::~PersistenceProvider() = default;
-class CatchResult : public OperationComplete {
-public:
- CatchResult() : _promisedResult(), _resulthandler(nullptr) {}
- std::future<Result::UP> future_result() {
- return _promisedResult.get_future();
- }
- void onComplete(Result::UP result) override {
- _promisedResult.set_value(std::move(result));
- }
- void addResultHandler(const ResultHandler * resultHandler) override {
- assert(_resulthandler == nullptr);
- _resulthandler = resultHandler;
- }
-private:
- std::promise<Result::UP> _promisedResult;
- const ResultHandler *_resulthandler;
-};
-
Result
PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) {
auto catcher = std::make_unique<CatchResult>();
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 42720e83917..136d95a068b 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -4,6 +4,7 @@
#include "ipersistenceengineowner.h"
#include "transport_latch.h"
#include <vespa/persistence/spi/bucketexecutor.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/datatype/documenttype.h>
@@ -25,6 +26,7 @@ using storage::spi::BucketInfo;
using storage::spi::BucketInfoResult;
using storage::spi::IncludedVersions;
using storage::spi::Result;
+using storage::spi::OperationComplete;
using vespalib::IllegalStateException;
using vespalib::Sequence;
using vespalib::make_string;
@@ -37,29 +39,47 @@ namespace proton {
namespace {
class ResultHandlerBase {
+private:
+ virtual Result::UP createResult() const = 0;
protected:
- std::mutex _lock;
- vespalib::CountDownLatch _latch;
+ std::mutex _lock;
+ std::atomic<uint32_t> _countDown;
+ OperationComplete::UP _onComplete;
+ void countDown();
+ void complete() {
+ _onComplete->onComplete(createResult());
+ }
public:
- explicit ResultHandlerBase(uint32_t waitCnt);
- ~ResultHandlerBase();
- void await() { _latch.await(); }
+ ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete);
+ virtual ~ResultHandlerBase();
};
-ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt)
+ResultHandlerBase::ResultHandlerBase(uint32_t waitCnt, OperationComplete::UP onComplete)
: _lock(),
- _latch(waitCnt)
+ _countDown(waitCnt),
+ _onComplete(std::move(onComplete))
{}
ResultHandlerBase::~ResultHandlerBase() = default;
-class GenericResultHandler : public ResultHandlerBase, public IGenericResultHandler {
+void
+ResultHandlerBase::countDown() {
+ if (_countDown.fetch_sub(1) == 1) {
+ complete();
+ }
+}
+
+class GenericResultHandler final : public ResultHandlerBase, public IGenericResultHandler {
private:
Result _result;
public:
- explicit GenericResultHandler(uint32_t waitCnt) :
- ResultHandlerBase(waitCnt),
+ GenericResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete) :
+ ResultHandlerBase(waitCnt, std::move(onComplete)),
_result()
- { }
+ {
+ if (waitCnt == 0) {
+ complete();
+ }
+ }
~GenericResultHandler() override;
void handle(const Result &result) override {
if (result.hasError()) {
@@ -70,9 +90,9 @@ public:
_result = result;
}
}
- _latch.countDown();
+ countDown();
}
- const Result &getResult() const { return _result; }
+ Result::UP createResult() const override { return std::make_unique<Result>(_result); }
};
GenericResultHandler::~GenericResultHandler() = default;
@@ -93,13 +113,13 @@ public:
_bucketSet.insert(buckets[i]);
}
}
- BucketIdListResult getResult() const {
+ std::unique_ptr<BucketIdListResult> getResult() const {
BucketIdListResult::List buckets;
buckets.reserve(_bucketSet.size());
for (document::BucketId bucketId : _bucketSet) {
buckets.push_back(bucketId);
}
- return BucketIdListResult(buckets);
+ return std::make_unique<BucketIdListResult>(buckets);
}
};
@@ -110,17 +130,24 @@ class SynchronizedBucketIdListResultHandler : public ResultHandlerBase,
public BucketIdListResultHandler
{
public:
- explicit SynchronizedBucketIdListResultHandler(uint32_t waitCnt)
- : ResultHandlerBase(waitCnt),
+ SynchronizedBucketIdListResultHandler(uint32_t waitCnt, OperationComplete::UP onComplete)
+ : ResultHandlerBase(waitCnt, std::move(onComplete)),
BucketIdListResultHandler()
- { }
+ {
+ if (waitCnt == 0) {
+ complete();
+ }
+ }
~SynchronizedBucketIdListResultHandler() override;
void handle(const BucketIdListResult &result) override {
{
std::lock_guard<std::mutex> guard(_lock);
BucketIdListResultHandler::handle(result);
}
- _latch.countDown();
+ countDown();
+ }
+ Result::UP createResult() const override {
+ return getResult();
}
};
@@ -256,7 +283,7 @@ PersistenceEngine::listBuckets(BucketSpace bucketSpace) const
IPersistenceHandler *handler = snap.handlers().get();
handler->handleListBuckets(resultHandler);
}
- return resultHandler.getResult();
+ return *resultHandler.getResult();
}
@@ -266,14 +293,16 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState &
ReadGuard rguard(_rwMutex);
saveClusterState(bucketSpace, calc);
HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace);
- GenericResultHandler resultHandler(snap.size());
+ auto catchResult = std::make_unique<storage::spi::CatchResult>();
+ auto futureResult = catchResult->future_result();
+ GenericResultHandler resultHandler(snap.size(), std::move(catchResult));
for (; snap.handlers().valid(); snap.handlers().next()) {
IPersistenceHandler *handler = snap.handlers().get();
handler->handleSetClusterState(calc, resultHandler);
}
- resultHandler.await();
+ Result::UP result = futureResult.get();
_owner.setClusterState(bucketSpace, calc);
- return resultHandler.getResult();
+ return *result;
}
@@ -283,13 +312,14 @@ PersistenceEngine::setActiveState(const Bucket& bucket,
{
ReadGuard rguard(_rwMutex);
HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace());
- GenericResultHandler resultHandler(snap.size());
+ auto catchResult = std::make_unique<storage::spi::CatchResult>();
+ auto futureResult = catchResult->future_result();
+ GenericResultHandler resultHandler(snap.size(), std::move(catchResult));
for (; snap.handlers().valid(); snap.handlers().next()) {
IPersistenceHandler *handler = snap.handlers().get();
handler->handleSetActiveState(bucket, newState, resultHandler);
}
- resultHandler.await();
- return resultHandler.getResult();
+ return *futureResult.get();
}
@@ -560,7 +590,9 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const
extraModifiedBuckets.swap(_extraModifiedBuckets[bucketSpace]);
}
HandlerSnapshot snap = getHandlerSnapshot(rguard, bucketSpace);
- SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size());
+ auto catchResult = std::make_unique<storage::spi::CatchResult>();
+ auto futureResult = catchResult->future_result();
+ SynchronizedBucketIdListResultHandler resultHandler(snap.size() + extraModifiedBuckets.size(), std::move(catchResult));
for (; snap.handlers().valid(); snap.handlers().next()) {
IPersistenceHandler *handler = snap.handlers().get();
handler->handleGetModifiedBuckets(resultHandler);
@@ -568,8 +600,7 @@ PersistenceEngine::getModifiedBuckets(BucketSpace bucketSpace) const
for (const auto & item : extraModifiedBuckets) {
resultHandler.handle(*item);
}
- resultHandler.await();
- return resultHandler.getResult();
+ return dynamic_cast<BucketIdListResult &>(*futureResult.get());
}
@@ -661,9 +692,11 @@ PersistenceEngine::propagateSavedClusterState(BucketSpace bucketSpace, IPersiste
return;
// Propagate saved cluster state.
// TODO: Fix race with new cluster state setting.
- GenericResultHandler resultHandler(1);
+ auto catchResult = std::make_unique<storage::spi::CatchResult>();
+ auto futureResult = catchResult->future_result();
+ GenericResultHandler resultHandler(1, std::move(catchResult));
handler.handleSetClusterState(*clusterState, resultHandler);
- resultHandler.await();
+ futureResult.get();
}
void
@@ -671,7 +704,7 @@ PersistenceEngine::grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenc
{
BucketIdListResultHandler resultHandler;
handler.handleListBuckets(resultHandler);
- auto result = std::make_shared<BucketIdListResult>(resultHandler.getResult());
+ std::shared_ptr<BucketIdListResult> result(resultHandler.getResult());
std::lock_guard<std::mutex> guard(_lock);
_extraModifiedBuckets[bucketSpace].push_back(result);
}
@@ -725,9 +758,11 @@ PersistenceEngine::populateInitialBucketDB(const WriteGuard & guard, BucketSpace
buckets.push_back(item.first);
}
LOG(info, "Adding %zu active buckets (%zu flawed) to new bucket db", buckets.size(), flawed);
- GenericResultHandler trHandler(1);
+ auto catchResult = std::make_unique<storage::spi::CatchResult>();
+ auto futureResult = catchResult->future_result();
+ GenericResultHandler trHandler(1, std::move(catchResult));
targetHandler.handlePopulateActiveBuckets(buckets, trHandler);
- trHandler.await();
+ futureResult.get();
}
std::unique_lock<std::shared_mutex>