From f19fc4ebc9010db3bd35f53923866c62e2faefa9 Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Wed, 20 Oct 2021 12:41:11 +0200 Subject: Eliminate ApplyBucketDiffEntryResult. --- .../persistence/apply_bucket_diff_state_test.cpp | 60 ++++++++++++------- .../src/vespa/storage/persistence/CMakeLists.txt | 1 - .../apply_bucket_diff_entry_complete.cpp | 9 ++- .../persistence/apply_bucket_diff_entry_complete.h | 16 +++-- .../persistence/apply_bucket_diff_entry_result.cpp | 44 -------------- .../persistence/apply_bucket_diff_entry_result.h | 32 ---------- .../persistence/apply_bucket_diff_state.cpp | 69 ++++++++++------------ .../storage/persistence/apply_bucket_diff_state.h | 14 ++++- .../src/vespa/storage/persistence/mergehandler.cpp | 45 +++++++------- .../src/vespa/storage/persistence/mergehandler.h | 11 ++-- 10 files changed, 128 insertions(+), 173 deletions(-) delete mode 100644 storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp delete mode 100644 storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h (limited to 'storage') diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index 6e1e62d2080..fe77477fa77 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -1,6 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include #include #include #include @@ -26,38 +25,44 @@ document::Bucket dummy_document_bucket(makeDocumentBucket(document::BucketId(0, class DummyMergeBucketInfoSyncer : public MergeBucketInfoSyncer { uint32_t& _sync_count; + vespalib::string _fail; public: DummyMergeBucketInfoSyncer(uint32_t& sync_count) : MergeBucketInfoSyncer(), - _sync_count(sync_count) + _sync_count(sync_count), + _fail() { } + ~DummyMergeBucketInfoSyncer(); void sync_bucket_info(const spi::Bucket& bucket) const override { EXPECT_EQ(bucket, spi::Bucket(dummy_document_bucket)); ++_sync_count; + if (!_fail.empty()) { + throw std::runtime_error(_fail); + } } + void set_fail(vespalib::string fail) { _fail = std::move(fail); } }; -ApplyBucketDiffEntryResult -make_result(spi::Result &spi_result, const DocumentId &doc_id) +DummyMergeBucketInfoSyncer::~DummyMergeBucketInfoSyncer() = default; + +void +make_result(ApplyBucketDiffState& state, spi::Result &spi_result, const DocumentId &doc_id) { - std::promise> result_promise; - result_promise.set_value(std::make_unique(spi_result)); - spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id))); - return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op); + state.on_entry_complete(std::make_unique(spi_result), doc_id, test_op); } void push_ok(ApplyBucketDiffState &state) { - state.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); - state.push_back(make_result(spi_result_ok, DocumentId("id::test::1"))); + make_result(state, spi_result_ok, DocumentId("id::test::0")); + make_result(state, spi_result_ok, DocumentId("id::test::1")); } void push_bad(ApplyBucketDiffState &state) { - state.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); - state.push_back(make_result(spi_result_fail, DocumentId("id::test::1"))); - state.push_back(make_result(spi_result_fail, DocumentId("id::test::2"))); + make_result(state, spi_result_ok, DocumentId("id::test::0")); + make_result(state, spi_result_fail, DocumentId("id::test::1")); + make_result(state, spi_result_fail, DocumentId("id::test::2")); } } @@ -75,15 +80,19 @@ public: { } + ~ApplyBucketDiffStateTestBase(); + std::unique_ptr make_state() { return std::make_unique(syncer, spi::Bucket(dummy_document_bucket)); } }; +ApplyBucketDiffStateTestBase::~ApplyBucketDiffStateTestBase() = default; + class ApplyBucketDiffStateTest : public ApplyBucketDiffStateTestBase { public: - std::unique_ptr state; + std::shared_ptr state; ApplyBucketDiffStateTest() : ApplyBucketDiffStateTestBase(), @@ -95,13 +104,14 @@ public: state = make_state(); } + void check_failure(std::string expected) { + auto future = state->get_future(); + state.reset(); + std::string fail_message = future.get(); + EXPECT_EQ(expected, fail_message); + } void check_failure() { - try { - state->check(); - FAIL() << "Failed to throw exception for failed result"; - } catch (std::exception &e) { - EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what())); - } + check_failure("Failed put for id::test::1 in Bucket(0x0000000000000010): Result(5, write blocked)"); } }; @@ -109,7 +119,7 @@ public: TEST_F(ApplyBucketDiffStateTest, ok_results_can_be_checked) { push_ok(*state); - state->check(); + check_failure(""); } TEST_F(ApplyBucketDiffStateTest, failed_result_errors_ignored) @@ -146,4 +156,12 @@ TEST_F(ApplyBucketDiffStateTest, explicit_sync_bucket_info_works) EXPECT_EQ(1, sync_count); } +TEST_F(ApplyBucketDiffStateTest, failed_sync_bucket_info_is_detected) +{ + vespalib::string fail("sync bucket failed"); + syncer.set_fail(fail); + state->mark_stale_bucket_info(); + check_failure(fail); +} + } diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 4b41d3fa778..c737d2bed28 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -2,7 +2,6 @@ vespa_add_library(storage_spersistence OBJECT SOURCES apply_bucket_diff_entry_complete.cpp - apply_bucket_diff_entry_result.cpp apply_bucket_diff_state.cpp asynchandler.cpp bucketownershipnotifier.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 034089d4eca..1fbe155b16d 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -1,14 +1,17 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "apply_bucket_diff_entry_complete.h" +#include "apply_bucket_diff_state.h" #include #include namespace storage { -ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) +ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) : _result_handler(nullptr), - _result_promise(std::move(result_promise)), + _state(std::move(state)), + _doc_id(std::move(doc_id)), + _op(op), _start_time(clock), _latency_metric(latency_metric) { @@ -24,7 +27,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr result) } double elapsed = _start_time.getElapsedTimeAsDouble(); _latency_metric.addValue(elapsed); - _result_promise.set_value(std::move(result)); + _state->on_entry_complete(std::move(result), _doc_id, _op); } void diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index 6f05ed82fa0..dd2346d9dee 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -2,6 +2,7 @@ #pragma once +#include #include #include #include @@ -9,19 +10,22 @@ namespace storage { +class ApplyBucketDiffState; + /* * Complete handler for a bucket diff entry spi operation (putAsync * or removeAsync) */ class ApplyBucketDiffEntryComplete : public spi::OperationComplete { - using ResultPromise = std::promise>; - const spi::ResultHandler* _result_handler; - ResultPromise _result_promise; - framework::MilliSecTimer _start_time; - metrics::DoubleAverageMetric& _latency_metric; + const spi::ResultHandler* _result_handler; + std::shared_ptr _state; + document::DocumentId _doc_id; + const char* _op; + framework::MilliSecTimer _start_time; + metrics::DoubleAverageMetric& _latency_metric; public: - ApplyBucketDiffEntryComplete(ResultPromise result_promise, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); + ApplyBucketDiffEntryComplete(std::shared_ptr state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); ~ApplyBucketDiffEntryComplete(); void onComplete(std::unique_ptr result) override; void addResultHandler(const spi::ResultHandler* resultHandler) override; diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp deleted file mode 100644 index 9cb5bb1bfd0..00000000000 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.cpp +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "apply_bucket_diff_entry_result.h" -#include -#include -#include - -namespace storage { - -ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op) - : _future_result(std::move(future_result)), - _bucket(bucket), - _doc_id(std::move(doc_id)), - _op(op) -{ -} - -ApplyBucketDiffEntryResult::ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs) = default; - -ApplyBucketDiffEntryResult::~ApplyBucketDiffEntryResult() = default; - -void -ApplyBucketDiffEntryResult::wait() -{ - assert(_future_result.valid()); - _future_result.wait(); -} - -void -ApplyBucketDiffEntryResult::check_result() -{ - assert(_future_result.valid()); - auto result = _future_result.get(); - if (result->hasError()) { - vespalib::asciistream ss; - ss << "Failed " << _op - << " for " << _doc_id.toString() - << " in " << _bucket - << ": " << result->toString(); - throw std::runtime_error(ss.str()); - } -} - -} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h deleted file mode 100644 index 662656f2dbc..00000000000 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_result.h +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include -#include -#include - -namespace storage::spi { class Result; } - -namespace storage { - -/* - * Result of a bucket diff entry spi operation (putAsync or removeAsync) - */ -class ApplyBucketDiffEntryResult { - using FutureResult = std::future>; - FutureResult _future_result; - spi::Bucket _bucket; - document::DocumentId _doc_id; - const char* _op; - -public: - ApplyBucketDiffEntryResult(FutureResult future_result, spi::Bucket bucket, document::DocumentId doc_id, const char *op); - ApplyBucketDiffEntryResult(ApplyBucketDiffEntryResult &&rhs); - ~ApplyBucketDiffEntryResult(); - void wait(); - void check_result(); -}; - -} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index c754796ba51..eb7a5ef5bc6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -1,64 +1,50 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "apply_bucket_diff_state.h" -#include "apply_bucket_diff_entry_result.h" #include "mergehandler.h" +#include +#include +#include + +using storage::spi::Result; namespace storage { ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket) - : _async_results(), - _merge_bucket_info_syncer(merge_bucket_info_syncer), + : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), - _stale_bucket_info(false) + _fail_message(), + _failed_flag(), + _stale_bucket_info(false), + _promise() { } ApplyBucketDiffState::~ApplyBucketDiffState() { - wait(); try { sync_bucket_info(); - } catch (std::exception&) { - } -} - -bool -ApplyBucketDiffState::empty() const -{ - return _async_results.empty(); -} - -void -ApplyBucketDiffState::wait() -{ - if (!_async_results.empty()) { - _async_results.back().wait(); + } catch (std::exception& e) { + if (_fail_message.empty()) { + _fail_message = e.what(); + } } - for (auto &result_to_check : _async_results) { - result_to_check.wait(); + if (_promise.has_value()) { + _promise.value().set_value(_fail_message); } } void -ApplyBucketDiffState::check() +ApplyBucketDiffState::on_entry_complete(std::unique_ptr result, const document::DocumentId &doc_id, const char *op) { - wait(); - try { - for (auto& result_to_check : _async_results) { - result_to_check.check_result(); - } - } catch (std::exception&) { - _async_results.clear(); - throw; + if (result->hasError() && !_failed_flag.test_and_set()) { + vespalib::asciistream ss; + ss << "Failed " << op + << " for " << doc_id.toString() + << " in " << _bucket + << ": " << result->toString(); + _fail_message = ss.str(); } - _async_results.clear(); -} - -void -ApplyBucketDiffState::push_back(ApplyBucketDiffEntryResult&& result) -{ - _async_results.push_back(std::move(result)); } void @@ -76,4 +62,11 @@ ApplyBucketDiffState::sync_bucket_info() } } +std::future +ApplyBucketDiffState::get_future() +{ + _promise = std::promise(); + return _promise.value().get_future(); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 489e75e4a72..af4174b06d6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -3,8 +3,13 @@ #pragma once #include +#include +#include #include +namespace document { class DocumentId; } +namespace storage::spi { class Result; } + namespace storage { class ApplyBucketDiffEntryResult; @@ -15,19 +20,22 @@ class MergeBucketInfoSyncer; * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply. */ class ApplyBucketDiffState { - std::vector _async_results; const MergeBucketInfoSyncer& _merge_bucket_info_syncer; spi::Bucket _bucket; + vespalib::string _fail_message; + std::atomic_flag _failed_flag; bool _stale_bucket_info; + std::optional> _promise; + public: ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket); ~ApplyBucketDiffState(); - void push_back(ApplyBucketDiffEntryResult&& result); - bool empty() const; + void on_entry_complete(std::unique_ptr result, const document::DocumentId &doc_id, const char *op); void wait(); void check(); void mark_stale_bucket_info(); void sync_bucket_info(); + std::future get_future(); }; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 274aba303fb..d2737089bba 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -3,7 +3,6 @@ #include "mergehandler.h" #include "persistenceutil.h" #include "apply_bucket_diff_entry_complete.h" -#include "apply_bucket_diff_entry_result.h" #include "apply_bucket_diff_state.h" #include #include @@ -96,6 +95,17 @@ struct DiffEntryTimestampPredicate { } }; + +void check_apply_diff_sync(std::shared_ptr async_results) { + auto future = async_results->get_future(); + async_results.reset(); + future.wait(); + auto fail_message = future.get(); + if (!fail_message.empty()) { + throw std::runtime_error(fail_message); + } +} + } // anonymous namespace void @@ -483,27 +493,24 @@ MergeHandler::deserializeDiffDocument( return doc; } -ApplyBucketDiffEntryResult -MergeHandler::applyDiffEntry(const spi::Bucket& bucket, +void +MergeHandler::applyDiffEntry(std::shared_ptr async_results, + const spi::Bucket& bucket, const api::ApplyBucketDiffCommand::Entry& e, spi::Context& context, const document::DocumentTypeRepo& repo) const { - std::promise> result_promise; - auto future_result = result_promise.get_future(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); DocumentId docId = doc->getId(); - auto complete = std::make_unique(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.put_latency); + auto complete = std::make_unique(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); - return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "put"); } else { DocumentId docId(e._docName); - auto complete = std::make_unique(std::move(result_promise), _clock, _env._metrics.merge_handler_metrics.remove_latency); + auto complete = std::make_unique(std::move(async_results), std::move(docId), "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete)); - return ApplyBucketDiffEntryResult(std::move(future_result), bucket, std::move(docId), "remove"); } } @@ -516,7 +523,7 @@ MergeHandler::applyDiffLocally( std::vector& diff, uint8_t nodeIndex, spi::Context& context, - ApplyBucketDiffState& async_results) const + std::shared_ptr async_results) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", @@ -527,7 +534,7 @@ MergeHandler::applyDiffLocally( uint32_t addedCount = 0; uint32_t notNeededByteCount = 0; - async_results.mark_stale_bucket_info(); + async_results->mark_stale_bucket_info(); std::vector entries; populateMetaData(bucket, MAX_TIMESTAMP, entries, context); @@ -565,7 +572,7 @@ MergeHandler::applyDiffLocally( ++i; LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - async_results.push_back(applyDiffEntry(bucket, e, context, repo)); + applyDiffEntry(async_results, bucket, e, context, repo); } else { assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put @@ -578,7 +585,7 @@ MergeHandler::applyDiffLocally( "timestamp in %s. Diff slot: %s. Existing slot: %s", bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); - async_results.push_back(applyDiffEntry(bucket, e, context, repo)); + applyDiffEntry(async_results, bucket, e, context, repo); } else { // Duplicate put, just ignore it. LOG(debug, "During diff apply, attempting to add slot " @@ -610,7 +617,7 @@ MergeHandler::applyDiffLocally( LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - async_results.push_back(applyDiffEntry(bucket, e, context, repo)); + applyDiffEntry(async_results, bucket, e, context, repo); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } if (byteCount + notNeededByteCount != 0) { @@ -1204,7 +1211,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket()); - ApplyBucketDiffState async_results(*this, bucket); + auto async_results = std::make_shared(*this, bucket); LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { @@ -1229,8 +1236,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); - async_results.check(); - async_results.sync_bucket_info(); + check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1289,7 +1295,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); - ApplyBucketDiffState async_results(*this, bucket); + auto async_results = std::make_shared(*this, bucket); std::vector& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); @@ -1325,8 +1331,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag framework::MilliSecTimer startTime(_clock); applyDiffLocally(bucket, diff, index, s->context, async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); - async_results.check(); - async_results.sync_bucket_info(); + check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 0ff8f3c0ef8..f6e8ddcf306 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -62,7 +62,7 @@ public: std::vector& diff, uint8_t nodeIndex, spi::Context& context, - ApplyBucketDiffState& async_results) const; + std::shared_ptr async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; @@ -90,10 +90,11 @@ private: * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. */ - ApplyBucketDiffEntryResult applyDiffEntry(const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, - spi::Context& context, - const document::DocumentTypeRepo& repo) const; + void applyDiffEntry(std::shared_ptr async_results, + const spi::Bucket&, + const api::ApplyBucketDiffCommand::Entry&, + spi::Context& context, + const document::DocumentTypeRepo& repo) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, -- cgit v1.2.3