diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-02 11:27:48 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-02 11:27:48 +0100 |
commit | e6a1e21f1f7e3c54d28723af08b90aa0a76a8028 (patch) | |
tree | c88427ed33d29d1e74066df99231c3508e967121 | |
parent | ac8b4ebae4796b275ff71cc15eb259a22797a913 (diff) |
Apply diff entries using async spi methods during bucket merge.
10 files changed, 227 insertions, 31 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index f922689b941..971d7b8f410 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -2,6 +2,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST SOURCES + apply_bucket_diff_entry_result_test.cpp bucketownershipnotifiertest.cpp has_mask_remapper_test.cpp mergehandlertest.cpp diff --git a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp new file mode 100644 index 00000000000..1eb5f614950 --- /dev/null +++ b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp @@ -0,0 +1,66 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h> +#include <vespa/document/base/documentid.h> +#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/persistence/spi/result.h> +#include <gtest/gtest.h> + +using document::DocumentId; +using document::test::makeDocumentBucket; + +namespace storage { + +using ResultVector = std::vector<ApplyBucketDiffEntryResult>; + +namespace { + +spi::Result spi_result_ok; +spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked"); +document::BucketIdFactory bucket_id_factory; +const char *test_op = "put"; + +ApplyBucketDiffEntryResult +make_result(spi::Result &spi_result, const DocumentId &doc_id) +{ + std::promise<std::unique_ptr<spi::Result>> result_promise; + result_promise.set_value(std::make_unique<spi::Result>(spi_result)); + spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id))); + return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op); +} + +void +check_results(ResultVector results) +{ + for (auto& result : results) { + result.check_result(); + } +} + +} + +TEST(ApplyBucketDiffEntryResultTest, ok_results_can_be_checked) +{ + ResultVector results; + results.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); + results.push_back(make_result(spi_result_ok, DocumentId("id::test::1"))); + check_results(std::move(results)); +} + +TEST(ApplyBucketDiffEntryResultTest, first_failed_result_throws_exception) +{ + ResultVector results; + results.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); + results.push_back(make_result(spi_result_fail, DocumentId("id::test::1"))); + results.push_back(make_result(spi_result_fail, DocumentId("id::test::2"))); + try { + check_results(std::move(results)); + FAIL() << "Failed to throw exception for failed result"; + } catch (std::exception &e) { + EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what())); + } +} + +} diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 335863322d9..81f98136575 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -896,7 +896,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { ExpectedExceptionSpec exceptions[] = { { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, - { PersistenceProviderWrapper::FAIL_PUT, "Failed put" }, + { PersistenceProviderWrapper::FAIL_PUT | PersistenceProviderWrapper::FAIL_REMOVE, "Failed put" }, { PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" }, }; diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index ff8d29f7f45..647d7fa1098 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -1,6 +1,8 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_spersistence OBJECT SOURCES + apply_bucket_diff_entry_complete.cpp + apply_bucket_diff_entry_result.cpp asynchandler.cpp bucketownershipnotifier.cpp bucketprocessor.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp new file mode 100644 index 00000000000..63700b30ea5 --- /dev/null +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -0,0 +1,35 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "apply_bucket_diff_entry_complete.h" +#include <cassert> + +namespace storage { + +ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) + : _result_handler(nullptr), + _result_promise(std::move(result_promise)), + _start_time(clock), + _latency_metric(latency_metric) +{ +} + +ApplyBucketDiffEntryComplete::~ApplyBucketDiffEntryComplete() = default; + +void +ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) +{ + if (_result_handler != nullptr) { + _result_handler->handle(*result); + } + _result_promise.set_value(std::move(result)); + _latency_metric.addValue(_start_time.getElapsedTimeAsDouble()); +} + +void +ApplyBucketDiffEntryComplete::addResultHandler(const spi::ResultHandler* resultHandler) +{ + assert(_result_handler == nullptr); + _result_handler = resultHandler; +} + +} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h new file mode 100644 index 00000000000..f1e1d184665 --- /dev/null +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -0,0 +1,30 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/persistence/spi/operationcomplete.h> +#include <vespa/storageframework/generic/clock/timer.h> +#include <vespa/metrics/valuemetric.h> +#include <future> + +namespace storage { + +/* + * Complete handler for a bucket diff entry spi operation (putAsync + * or removeAsync) + */ +class ApplyBucketDiffEntryComplete : public spi::OperationComplete +{ + using ResultPromise = std::promise<std::unique_ptr<spi::Result>>; + const spi::ResultHandler* _result_handler; + ResultPromise _result_promise; + framework::MilliSecTimer _start_time; + metrics::DoubleAverageMetric& _latency_metric; +public: + ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); + ~ApplyBucketDiffEntryComplete(); + void onComplete(std::unique_ptr<spi::Result> result) override; + void addResultHandler(const spi::ResultHandler* resultHandler) override; +}; + +} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp new file mode 100644 index 00000000000..8e81273e617 --- /dev/null +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp @@ -0,0 +1,38 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "apply_bucket_diff_entry_result.h" +#include <vespa/persistence/spi/result.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <cassert> + +namespace storage { + +ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op) + : _future_result(std::move(future_result)), + _bucket(bucket), + _doc_id(std::move(doc_id)), + _op(op) +{ +} + +ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs) = default; + +ApplyBucketDiffEntryResult::~ApplyBucketDiffEntryResult() = default; + +void +ApplyBucketDiffEntryResult::check_result() +{ + assert(_future_result.valid()); + _future_result.wait(); + auto result = _future_result.get(); + if (result->hasError()) { + vespalib::asciistream ss; + ss << "Failed " << _op + << " for " << _doc_id.toString() + << " in " << _bucket + << ": " << result->toString(); + throw std::runtime_error(ss.str()); + } +} + +} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h new file mode 100644 index 00000000000..f2308ef23a4 --- /dev/null +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h @@ -0,0 +1,30 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/document/base/documentid.h> +#include <vespa/persistence/spi/bucket.h> +#include <future> + +namespace storage::spi { class Result; } + +namespace storage { + +/* + * Result of a bucket diff entry spi operation (putAsync or removeAsync) + */ +class ApplyBucketDiffEntryResult { + using FutureResult = std::future<std::unique_ptr<spi::Result>>; + FutureResult _future_result; + spi::Bucket _bucket; + document::DocumentId _doc_id; + const char* _op; + +public: + ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op); + ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs); + ~ApplyBucketDiffEntryResult(); + void check_result(); +}; + +} diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 51b575548d8..7b3fb849f52 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -2,12 +2,15 @@ #include "mergehandler.h" #include "persistenceutil.h" +#include "apply_bucket_diff_entry_complete.h" +#include "apply_bucket_diff_entry_result.h" #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> #include <algorithm> +#include <future> #include <vespa/log/log.h> LOG_SETUP(".persistence.mergehandler"); @@ -38,22 +41,6 @@ constexpr int getDeleteFlag() { * Throws std::runtime_error if result has an error. */ void -checkResult(const spi::Result& result, - const spi::Bucket& bucket, - const document::DocumentId& docId, - const char* op) -{ - if (result.hasError()) { - vespalib::asciistream ss; - ss << "Failed " << op - << " for " << docId.toString() - << " in " << bucket - << ": " << result.toString(); - throw std::runtime_error(ss.str()); - } -} - -void checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op) { if (result.hasError()) { @@ -493,25 +480,27 @@ MergeHandler::deserializeDiffDocument( return doc; } -void +ApplyBucketDiffEntryResult MergeHandler::applyDiffEntry(const spi::Bucket& bucket, const api::ApplyBucketDiffCommand::Entry& e, spi::Context& context, const document::DocumentTypeRepo& repo) const { + std::promise<std::unique_ptr<spi::Result>> result_promise; + auto future_result = result_promise.get_future(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); DocumentId docId = doc->getId(); - framework::MilliSecTimer start_time(_clock); - checkResult(_spi.put(bucket, timestamp, std::move(doc), context), bucket, docId, "put"); - _env._metrics.merge_handler_metrics.put_latency.addValue(start_time.getElapsedTimeAsDouble()); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.put_latency); + _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); + return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "put"); } else { DocumentId docId(e._docName); - framework::MilliSecTimer start_time(_clock); - checkResult(_spi.remove(bucket, timestamp, docId, context), bucket, docId, "remove"); - _env._metrics.merge_handler_metrics.remove_latency.addValue(start_time.getElapsedTimeAsDouble()); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.remove_latency); + _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete)); + return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "remove"); } } @@ -534,6 +523,7 @@ MergeHandler::applyDiffLocally( uint32_t byteCount = 0; uint32_t addedCount = 0; uint32_t notNeededByteCount = 0; + std::vector<ApplyBucketDiffEntryResult> async_results; std::vector<spi::DocEntry::UP> entries; populateMetaData(bucket, MAX_TIMESTAMP, entries, context); @@ -572,7 +562,7 @@ MergeHandler::applyDiffLocally( ++i; LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(bucket, e, context, repo); + async_results.push_back(applyDiffEntry(bucket, e, context, repo)); } else { assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put @@ -585,7 +575,7 @@ MergeHandler::applyDiffLocally( "timestamp in %s. Diff slot: %s. Existing slot: %s", bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); - applyDiffEntry(bucket, e, context, repo); + async_results.push_back(applyDiffEntry(bucket, e, context, repo)); } else { // Duplicate put, just ignore it. LOG(debug, "During diff apply, attempting to add slot " @@ -617,9 +607,12 @@ MergeHandler::applyDiffLocally( LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(bucket, e, context, repo); + async_results.push_back(applyDiffEntry(bucket, e, context, repo)); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } + for (auto &result_to_check : async_results) { + result_to_check.check_result(); + } if (byteCount + notNeededByteCount != 0) { _env._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue( diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 5e65e1a39ec..64b1448577a 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -24,6 +24,7 @@ namespace storage { namespace spi { struct PersistenceProvider; } class PersistenceUtil; +class ApplyBucketDiffEntryResult; class MergeHandler : public Types { @@ -82,10 +83,10 @@ private: * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. */ - void applyDiffEntry(const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, - spi::Context& context, - const document::DocumentTypeRepo& repo) const; + ApplyBucketDiffEntryResult applyDiffEntry(const spi::Bucket&, + const api::ApplyBucketDiffCommand::Entry&, + spi::Context& context, + const document::DocumentTypeRepo& repo) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, |