diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-02 15:05:51 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-02 15:05:51 +0100 |
commit | b7713d8cccb1d5a2d852f4910706642029c02c4e (patch) | |
tree | 8e34dcd2ffe29e79b8182b95817bc4d600b7f25f /storage | |
parent | e6a1e21f1f7e3c54d28723af08b90aa0a76a8028 (diff) |
Move wait out of check_result.
Pass elapsed time along with result and update metric after having
checked the result.
Diffstat (limited to 'storage')
6 files changed, 39 insertions, 24 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp index 1eb5f614950..4bca987152e 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp @@ -21,20 +21,24 @@ spi::Result spi_result_ok; spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked"); document::BucketIdFactory bucket_id_factory; const char *test_op = "put"; +metrics::DoubleAverageMetric dummy_metric("dummy", metrics::DoubleAverageMetric::Tags(), "dummy desc"); ApplyBucketDiffEntryResult make_result(spi::Result &spi_result, const DocumentId &doc_id) { - std::promise<std::unique_ptr<spi::Result>> result_promise; - result_promise.set_value(std::make_unique<spi::Result>(spi_result)); + std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise; + result_promise.set_value(std::make_pair(std::make_unique<spi::Result>(spi_result), 0.1)); spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id))); - return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op); + return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op, dummy_metric); } void check_results(ResultVector results) { for (auto& result : results) { + result.wait(); + } + for (auto& result : results) { result.check_result(); } } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 63700b30ea5..7f1665a92bb 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -1,15 +1,15 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "apply_bucket_diff_entry_complete.h" +#include <vespa/persistence/spi/result.h> #include <cassert> namespace storage { -ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) +ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock) : _result_handler(nullptr), _result_promise(std::move(result_promise)), - _start_time(clock), - _latency_metric(latency_metric) + _start_time(clock) { } @@ -21,8 +21,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) if (_result_handler != nullptr) { _result_handler->handle(*result); } - _result_promise.set_value(std::move(result)); - _latency_metric.addValue(_start_time.getElapsedTimeAsDouble()); + _result_promise.set_value(std::make_pair(std::move(result), _start_time.getElapsedTimeAsDouble())); } void diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index f1e1d184665..f492727b0e6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -4,7 +4,6 @@ #include <vespa/persistence/spi/operationcomplete.h> #include <vespa/storageframework/generic/clock/timer.h> -#include <vespa/metrics/valuemetric.h> #include <future> namespace storage { @@ -15,13 +14,12 @@ namespace storage { */ class ApplyBucketDiffEntryComplete : public spi::OperationComplete { - using ResultPromise = std::promise<std::unique_ptr<spi::Result>>; + using ResultPromise = std::promise<std::pair<std::unique_ptr<spi::Result>, double>>; const spi::ResultHandler* _result_handler; ResultPromise _result_promise; framework::MilliSecTimer _start_time; - metrics::DoubleAverageMetric& _latency_metric; public: - ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); + ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock); ~ApplyBucketDiffEntryComplete(); void onComplete(std::unique_ptr<spi::Result> result) override; void addResultHandler(const spi::ResultHandler* resultHandler) override; diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp index 8e81273e617..d582168fcf9 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp @@ -7,11 +7,12 @@ namespace storage { -ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op) +ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op, metrics::DoubleAverageMetric& latency_metric) : _future_result(std::move(future_result)), _bucket(bucket), _doc_id(std::move(doc_id)), - _op(op) + _op(op), + _latency_metric(latency_metric) { } @@ -20,19 +21,26 @@ ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResul ApplyBucketDiffEntryResult::~ApplyBucketDiffEntryResult() = default; void -ApplyBucketDiffEntryResult::check_result() +ApplyBucketDiffEntryResult::wait() { assert(_future_result.valid()); _future_result.wait(); +} + +void +ApplyBucketDiffEntryResult::check_result() +{ + assert(_future_result.valid()); auto result = _future_result.get(); - if (result->hasError()) { + if (result.first->hasError()) { vespalib::asciistream ss; ss << "Failed " << _op << " for " << _doc_id.toString() << " in " << _bucket - << ": " << result->toString(); + << ": " << result.first->toString(); throw std::runtime_error(ss.str()); } + _latency_metric.addValue(result.second); } } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h index f2308ef23a4..f7653cd35a5 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h @@ -4,6 +4,7 @@ #include <vespa/document/base/documentid.h> #include <vespa/persistence/spi/bucket.h> +#include <vespa/metrics/valuemetric.h> #include <future> namespace storage::spi { class Result; } @@ -14,16 +15,18 @@ namespace storage { * Result of a bucket diff entry spi operation (putAsync or removeAsync) */ class ApplyBucketDiffEntryResult { - using FutureResult = std::future<std::unique_ptr<spi::Result>>; + using FutureResult = std::future<std::pair<std::unique_ptr<spi::Result>, double>>; FutureResult _future_result; spi::Bucket _bucket; document::DocumentId _doc_id; const char* _op; + metrics::DoubleAverageMetric& _latency_metric; public: - ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op); + ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op, metrics::DoubleAverageMetric& latency_metric); ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs); ~ApplyBucketDiffEntryResult(); + void wait(); void check_result(); }; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7b3fb849f52..cab35e77bac 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -486,21 +486,21 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket, spi::Context& context, const document::DocumentTypeRepo& repo) const { - std::promise<std::unique_ptr<spi::Result>> result_promise; + std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise; auto future_result = result_promise.get_future(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); DocumentId docId = doc->getId(); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.put_latency); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock); _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); - return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "put"); + return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "put", _env._metrics.merge_handler_metrics.put_latency); } else { DocumentId docId(e._docName); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.remove_latency); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock); _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete)); - return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "remove"); + return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "remove", _env._metrics.merge_handler_metrics.remove_latency); } } @@ -611,6 +611,9 @@ MergeHandler::applyDiffLocally( byteCount += e._headerBlob.size() + e._bodyBlob.size(); } for (auto &result_to_check : async_results) { + result_to_check.wait(); + } + for (auto &result_to_check : async_results) { result_to_check.check_result(); } |