diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-10-15 12:02:54 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-10-15 12:02:54 +0200 |
commit | f74696aee9b10f9ae993c5cceda6db0cc0c5b334 (patch) | |
tree | dc5ac9b5f9b122fc8c04df63674c3e467287459e /storage | |
parent | 912d0cb4a321ebb3eb7a1cd0d73bd3371d9bec22 (diff) |
Add class representing async state for applying bucket diff to local node.
Diffstat (limited to 'storage')
9 files changed, 301 insertions, 88 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index 8c0e7fdb11c..f0deec90aae 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -2,7 +2,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST SOURCES - apply_bucket_diff_entry_result_test.cpp + apply_bucket_diff_state_test.cpp bucketownershipnotifiertest.cpp has_mask_remapper_test.cpp mergehandlertest.cpp diff --git a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp deleted file mode 100644 index a9f2acc0fdb..00000000000 --- a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h> -#include <vespa/document/base/documentid.h> -#include <vespa/document/bucket/bucketid.h> -#include <vespa/document/bucket/bucketidfactory.h> -#include <vespa/document/test/make_document_bucket.h> -#include <vespa/persistence/spi/result.h> -#include <gtest/gtest.h> - -using document::DocumentId; -using document::test::makeDocumentBucket; - -namespace storage { - -using ResultVector = std::vector<ApplyBucketDiffEntryResult>; - -namespace { - -spi::Result spi_result_ok; -spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked"); -document::BucketIdFactory bucket_id_factory; -const char *test_op = "put"; -metrics::DoubleAverageMetric dummy_metric("dummy", metrics::DoubleAverageMetric::Tags(), "dummy desc"); - -ApplyBucketDiffEntryResult -make_result(spi::Result &spi_result, const DocumentId &doc_id) -{ - std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise; - result_promise.set_value(std::make_pair(std::make_unique<spi::Result>(spi_result), 0.1)); - spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id))); - return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op, dummy_metric); -} - -void -check_results(ResultVector results) -{ - for (auto& result : results) { - result.wait(); - } - for (auto& result : results) { - result.check_result(); - } -} - -} - -TEST(ApplyBucketDiffEntryResultTest, ok_results_can_be_checked) -{ - ResultVector results; - results.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); - results.push_back(make_result(spi_result_ok, DocumentId("id::test::1"))); - check_results(std::move(results)); -} - -TEST(ApplyBucketDiffEntryResultTest, first_failed_result_throws_exception) -{ - ResultVector results; - results.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); - results.push_back(make_result(spi_result_fail, DocumentId("id::test::1"))); - results.push_back(make_result(spi_result_fail, DocumentId("id::test::2"))); - try { - check_results(std::move(results)); - FAIL() << "Failed to throw exception for failed result"; - } catch (std::exception &e) { - EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what())); - } -} - -} diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp new file mode 100644 index 00000000000..188b541c1b8 --- /dev/null +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -0,0 +1,150 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h> +#include <vespa/storage/persistence/apply_bucket_diff_state.h> +#include <vespa/storage/persistence/merge_bucket_info_syncer.h> +#include <vespa/document/base/documentid.h> +#include <vespa/document/bucket/bucketid.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/persistence/spi/result.h> +#include <gtest/gtest.h> + +using document::DocumentId; +using document::test::makeDocumentBucket; + +namespace storage { + +namespace { + +spi::Result spi_result_ok; +spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked"); +document::BucketIdFactory bucket_id_factory; +const char *test_op = "put"; +metrics::DoubleAverageMetric dummy_metric("dummy", metrics::DoubleAverageMetric::Tags(), "dummy desc"); +document::Bucket dummy_document_bucket(makeDocumentBucket(document::BucketId(0, 16))); + +class DummyMergeBucketInfoSyncer : public MergeBucketInfoSyncer +{ + uint32_t& _sync_count; +public: + DummyMergeBucketInfoSyncer(uint32_t& sync_count) + : MergeBucketInfoSyncer(), + _sync_count(sync_count) + { + } + void sync_bucket_info(const spi::Bucket& bucket) const override { + EXPECT_EQ(bucket, spi::Bucket(dummy_document_bucket)); + ++_sync_count; + } +}; + +ApplyBucketDiffEntryResult +make_result(spi::Result &spi_result, const DocumentId &doc_id) +{ + std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise; + result_promise.set_value(std::make_pair(std::make_unique<spi::Result>(spi_result), 0.1)); + spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id))); + return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op, dummy_metric); +} + +void push_ok(ApplyBucketDiffState &state) +{ + state.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); + state.push_back(make_result(spi_result_ok, DocumentId("id::test::1"))); +} + +void push_bad(ApplyBucketDiffState &state) +{ + state.push_back(make_result(spi_result_ok, DocumentId("id::test::0"))); + state.push_back(make_result(spi_result_fail, DocumentId("id::test::1"))); + state.push_back(make_result(spi_result_fail, DocumentId("id::test::2"))); +} + +} + +class ApplyBucketDiffStateTestBase : public ::testing::Test +{ +public: + uint32_t sync_count; + DummyMergeBucketInfoSyncer syncer; + + ApplyBucketDiffStateTestBase() + : ::testing::Test(), + sync_count(0u), + syncer(sync_count) + { + } + + std::unique_ptr<ApplyBucketDiffState> make_state() { + return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket)); + } +}; + +class ApplyBucketDiffStateTest : public ApplyBucketDiffStateTestBase +{ +public: + std::unique_ptr<ApplyBucketDiffState> state; + + ApplyBucketDiffStateTest() + : ApplyBucketDiffStateTestBase(), + state(make_state()) + { + } + + void reset() { + state = make_state(); + } + + void check_failure() { + try { + state->check(); + FAIL() << "Failed to throw exception for failed result"; + } catch (std::exception &e) { + EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what())); + } + } + +}; + +TEST_F(ApplyBucketDiffStateTest, ok_results_can_be_checked) +{ + push_ok(*state); + state->check(); +} + +TEST_F(ApplyBucketDiffStateTest, failed_result_errors_ignored) +{ + push_bad(*state); +} + +TEST_F(ApplyBucketDiffStateTest, first_failed_result_throws_exception) +{ + push_bad(*state); + ASSERT_NO_FATAL_FAILURE(check_failure()); +} + +TEST_F(ApplyBucketDiffStateTest, sync_bucket_info_if_needed_on_destruct) +{ + reset(); + EXPECT_EQ(0, sync_count); + state->mark_stale_bucket_info(); + EXPECT_EQ(0, sync_count); + reset(); + EXPECT_EQ(1, sync_count); +} + +TEST_F(ApplyBucketDiffStateTest, explicit_sync_bucket_info_works) +{ + state->sync_bucket_info(); + EXPECT_EQ(0, sync_count); + state->mark_stale_bucket_info(); + state->sync_bucket_info(); + EXPECT_EQ(1, sync_count); + state->sync_bucket_info(); + EXPECT_EQ(1, sync_count); + reset(); + EXPECT_EQ(1, sync_count); +} + +} diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 083e0406ef0..4b41d3fa778 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_library(storage_spersistence OBJECT SOURCES apply_bucket_diff_entry_complete.cpp apply_bucket_diff_entry_result.cpp + apply_bucket_diff_state.cpp asynchandler.cpp bucketownershipnotifier.cpp bucketprocessor.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp new file mode 100644 index 00000000000..4fe24895c54 --- /dev/null +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -0,0 +1,73 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "apply_bucket_diff_state.h" +#include "apply_bucket_diff_entry_result.h" +#include "mergehandler.h" + +namespace storage { + +ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket) + : _async_results(), + _merge_bucket_info_syncer(merge_bucket_info_syncer), + _bucket(bucket), + _stale_bucket_info(false) +{ +} + +ApplyBucketDiffState::~ApplyBucketDiffState() +{ + wait(); + sync_bucket_info(); +} + +bool +ApplyBucketDiffState::empty() const +{ + return _async_results.empty(); +} + +void +ApplyBucketDiffState::wait() +{ + for (auto &result_to_check : _async_results) { + result_to_check.wait(); + } +} + +void +ApplyBucketDiffState::check() +{ + wait(); + try { + for (auto& result_to_check : _async_results) { + result_to_check.check_result(); + } + } catch (std::exception&) { + _async_results.clear(); + throw; + } + _async_results.clear(); +} + +void +ApplyBucketDiffState::push_back(ApplyBucketDiffEntryResult&& result) +{ + _async_results.push_back(std::move(result)); +} + +void +ApplyBucketDiffState::mark_stale_bucket_info() +{ + _stale_bucket_info = true; +} + +void +ApplyBucketDiffState::sync_bucket_info() +{ + if (_stale_bucket_info) { + _merge_bucket_info_syncer.sync_bucket_info(_bucket); + _stale_bucket_info = false; + } +} + +} diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h new file mode 100644 index 00000000000..489e75e4a72 --- /dev/null +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -0,0 +1,33 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/persistence/spi/bucket.h> +#include <vector> + +namespace storage { + +class ApplyBucketDiffEntryResult; +class MergeBucketInfoSyncer; + +/* + * State of all bucket diff entry spi operation (putAsync or removeAsync) + * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply. + */ +class ApplyBucketDiffState { + std::vector<ApplyBucketDiffEntryResult> _async_results; + const MergeBucketInfoSyncer& _merge_bucket_info_syncer; + spi::Bucket _bucket; + bool _stale_bucket_info; +public: + ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket); + ~ApplyBucketDiffState(); + void push_back(ApplyBucketDiffEntryResult&& result); + bool empty() const; + void wait(); + void check(); + void mark_stale_bucket_info(); + void sync_bucket_info(); +}; + +} diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h new file mode 100644 index 00000000000..e05991ad9e3 --- /dev/null +++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h @@ -0,0 +1,18 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace storage::spi { class Bucket; } + +namespace storage { + +/* + * Interface class for syncing bucket info during merge. + */ +class MergeBucketInfoSyncer { +public: + virtual ~MergeBucketInfoSyncer() = default; + virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0; +}; + +} diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 18e1c3d8d5a..7693156ae30 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -4,6 +4,7 @@ #include "persistenceutil.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_entry_result.h" +#include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/vespalib/stllike/asciistream.h> @@ -507,12 +508,13 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket, /** * Apply the diffs needed locally. */ -api::BucketInfo +void MergeHandler::applyDiffLocally( const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, - spi::Context& context) const + spi::Context& context, + ApplyBucketDiffState& async_results) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", @@ -522,8 +524,8 @@ MergeHandler::applyDiffLocally( uint32_t byteCount = 0; uint32_t addedCount = 0; uint32_t notNeededByteCount = 0; - std::vector<ApplyBucketDiffEntryResult> async_results; + async_results.mark_stale_bucket_info(); std::vector<spi::DocEntry::UP> entries; populateMetaData(bucket, MAX_TIMESTAMP, entries, context); @@ -609,13 +611,6 @@ MergeHandler::applyDiffLocally( async_results.push_back(applyDiffEntry(bucket, e, context, repo)); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } - for (auto &result_to_check : async_results) { - result_to_check.wait(); - } - for (auto &result_to_check : async_results) { - result_to_check.check_result(); - } - if (byteCount + notNeededByteCount != 0) { _env._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue( static_cast<double>(byteCount) / (byteCount + notNeededByteCount)); @@ -623,7 +618,11 @@ MergeHandler::applyDiffLocally( _env._metrics.merge_handler_metrics.bytesMerged.inc(byteCount); LOG(debug, "Merge(%s): Applied %u entries locally from ApplyBucketDiff.", bucket.toString().c_str(), addedCount); +} +void +MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const +{ spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket)); if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) { LOG(warning, "Failed to get bucket info for %s: %s", @@ -642,7 +641,6 @@ MergeHandler::applyDiffLocally( tmpInfo.isActive()); _env.updateBucketDatabase(bucket.getBucket(), providerInfo); - return providerInfo; } namespace { @@ -1204,6 +1202,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket()); + ApplyBucketDiffState async_results(*this, bucket); LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { @@ -1225,9 +1224,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); - (void) applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context()); + applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); + async_results.check(); + async_results.sync_bucket_info(); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1285,6 +1286,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag { _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); + ApplyBucketDiffState async_results(*this, bucket); std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); @@ -1318,8 +1320,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - (void) applyDiffLocally(bucket, diff, index, s.context); + applyDiffLocally(bucket, diff, index, s.context, async_results); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); + async_results.check(); + async_results.sync_bucket_info(); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 17b0ff29b4b..fa7e21dae78 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -14,6 +14,7 @@ #pragma once #include "types.h" +#include "merge_bucket_info_syncer.h" #include <vespa/persistence/spi/bucket.h> #include <vespa/persistence/spi/docentry.h> #include <vespa/storageapi/message/bucket.h> @@ -28,9 +29,11 @@ namespace spi { } class PersistenceUtil; class ApplyBucketDiffEntryResult; +class ApplyBucketDiffState; class MergeStatus; -class MergeHandler : public Types { +class MergeHandler : public Types, + public MergeBucketInfoSyncer { public: enum StateFlag { @@ -54,11 +57,12 @@ public: std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, spi::Context& context) const; - api::BucketInfo applyDiffLocally( - const spi::Bucket& bucket, + void applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, - spi::Context& context) const; + spi::Context& context, + ApplyBucketDiffState& async_results) const; + void sync_bucket_info(const spi::Bucket& bucket) const override; MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const; MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const; |