summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-10-15 12:02:54 +0200
committerTor Egge <Tor.Egge@online.no>2021-10-15 12:02:54 +0200
commitf74696aee9b10f9ae993c5cceda6db0cc0c5b334 (patch)
treedc5ac9b5f9b122fc8c04df63674c3e467287459e /storage
parent912d0cb4a321ebb3eb7a1cd0d73bd3371d9bec22 (diff)
Add class representing async state for applying bucket diff to local node.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt2
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp70
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp150
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp73
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h33
-rw-r--r--storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h18
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp30
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h12
9 files changed, 301 insertions, 88 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index 8c0e7fdb11c..f0deec90aae 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -2,7 +2,7 @@
vespa_add_executable(storage_persistence_gtest_runner_app TEST
SOURCES
- apply_bucket_diff_entry_result_test.cpp
+ apply_bucket_diff_state_test.cpp
bucketownershipnotifiertest.cpp
has_mask_remapper_test.cpp
mergehandlertest.cpp
diff --git a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp
deleted file mode 100644
index a9f2acc0fdb..00000000000
--- a/storage/src/tests/persistence/apply_bucket_diff_entry_result_test.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h>
-#include <vespa/document/base/documentid.h>
-#include <vespa/document/bucket/bucketid.h>
-#include <vespa/document/bucket/bucketidfactory.h>
-#include <vespa/document/test/make_document_bucket.h>
-#include <vespa/persistence/spi/result.h>
-#include <gtest/gtest.h>
-
-using document::DocumentId;
-using document::test::makeDocumentBucket;
-
-namespace storage {
-
-using ResultVector = std::vector<ApplyBucketDiffEntryResult>;
-
-namespace {
-
-spi::Result spi_result_ok;
-spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked");
-document::BucketIdFactory bucket_id_factory;
-const char *test_op = "put";
-metrics::DoubleAverageMetric dummy_metric("dummy", metrics::DoubleAverageMetric::Tags(), "dummy desc");
-
-ApplyBucketDiffEntryResult
-make_result(spi::Result &spi_result, const DocumentId &doc_id)
-{
- std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise;
- result_promise.set_value(std::make_pair(std::make_unique<spi::Result>(spi_result), 0.1));
- spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id)));
- return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op, dummy_metric);
-}
-
-void
-check_results(ResultVector results)
-{
- for (auto& result : results) {
- result.wait();
- }
- for (auto& result : results) {
- result.check_result();
- }
-}
-
-}
-
-TEST(ApplyBucketDiffEntryResultTest, ok_results_can_be_checked)
-{
- ResultVector results;
- results.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
- results.push_back(make_result(spi_result_ok, DocumentId("id::test::1")));
- check_results(std::move(results));
-}
-
-TEST(ApplyBucketDiffEntryResultTest, first_failed_result_throws_exception)
-{
- ResultVector results;
- results.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
- results.push_back(make_result(spi_result_fail, DocumentId("id::test::1")));
- results.push_back(make_result(spi_result_fail, DocumentId("id::test::2")));
- try {
- check_results(std::move(results));
- FAIL() << "Failed to throw exception for failed result";
- } catch (std::exception &e) {
- EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what()));
- }
-}
-
-}
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
new file mode 100644
index 00000000000..188b541c1b8
--- /dev/null
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -0,0 +1,150 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/persistence/apply_bucket_diff_entry_result.h>
+#include <vespa/storage/persistence/apply_bucket_diff_state.h>
+#include <vespa/storage/persistence/merge_bucket_info_syncer.h>
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucketidfactory.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/persistence/spi/result.h>
+#include <gtest/gtest.h>
+
+using document::DocumentId;
+using document::test::makeDocumentBucket;
+
+namespace storage {
+
+namespace {
+
+spi::Result spi_result_ok;
+spi::Result spi_result_fail(spi::Result::ErrorType::RESOURCE_EXHAUSTED, "write blocked");
+document::BucketIdFactory bucket_id_factory;
+const char *test_op = "put";
+metrics::DoubleAverageMetric dummy_metric("dummy", metrics::DoubleAverageMetric::Tags(), "dummy desc");
+document::Bucket dummy_document_bucket(makeDocumentBucket(document::BucketId(0, 16)));
+
+class DummyMergeBucketInfoSyncer : public MergeBucketInfoSyncer
+{
+ uint32_t& _sync_count;
+public:
+ DummyMergeBucketInfoSyncer(uint32_t& sync_count)
+ : MergeBucketInfoSyncer(),
+ _sync_count(sync_count)
+ {
+ }
+ void sync_bucket_info(const spi::Bucket& bucket) const override {
+ EXPECT_EQ(bucket, spi::Bucket(dummy_document_bucket));
+ ++_sync_count;
+ }
+};
+
+ApplyBucketDiffEntryResult
+make_result(spi::Result &spi_result, const DocumentId &doc_id)
+{
+ std::promise<std::pair<std::unique_ptr<spi::Result>, double>> result_promise;
+ result_promise.set_value(std::make_pair(std::make_unique<spi::Result>(spi_result), 0.1));
+ spi::Bucket bucket(makeDocumentBucket(bucket_id_factory.getBucketId(doc_id)));
+ return ApplyBucketDiffEntryResult(result_promise.get_future(), bucket, doc_id, test_op, dummy_metric);
+}
+
+void push_ok(ApplyBucketDiffState &state)
+{
+ state.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
+ state.push_back(make_result(spi_result_ok, DocumentId("id::test::1")));
+}
+
+void push_bad(ApplyBucketDiffState &state)
+{
+ state.push_back(make_result(spi_result_ok, DocumentId("id::test::0")));
+ state.push_back(make_result(spi_result_fail, DocumentId("id::test::1")));
+ state.push_back(make_result(spi_result_fail, DocumentId("id::test::2")));
+}
+
+}
+
+class ApplyBucketDiffStateTestBase : public ::testing::Test
+{
+public:
+ uint32_t sync_count;
+ DummyMergeBucketInfoSyncer syncer;
+
+ ApplyBucketDiffStateTestBase()
+ : ::testing::Test(),
+ sync_count(0u),
+ syncer(sync_count)
+ {
+ }
+
+ std::unique_ptr<ApplyBucketDiffState> make_state() {
+ return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket));
+ }
+};
+
+class ApplyBucketDiffStateTest : public ApplyBucketDiffStateTestBase
+{
+public:
+ std::unique_ptr<ApplyBucketDiffState> state;
+
+ ApplyBucketDiffStateTest()
+ : ApplyBucketDiffStateTestBase(),
+ state(make_state())
+ {
+ }
+
+ void reset() {
+ state = make_state();
+ }
+
+ void check_failure() {
+ try {
+ state->check();
+ FAIL() << "Failed to throw exception for failed result";
+ } catch (std::exception &e) {
+ EXPECT_EQ("Failed put for id::test::1 in Bucket(0xeb4700c03842cac4): Result(5, write blocked)", std::string(e.what()));
+ }
+ }
+
+};
+
+TEST_F(ApplyBucketDiffStateTest, ok_results_can_be_checked)
+{
+ push_ok(*state);
+ state->check();
+}
+
+TEST_F(ApplyBucketDiffStateTest, failed_result_errors_ignored)
+{
+ push_bad(*state);
+}
+
+TEST_F(ApplyBucketDiffStateTest, first_failed_result_throws_exception)
+{
+ push_bad(*state);
+ ASSERT_NO_FATAL_FAILURE(check_failure());
+}
+
+TEST_F(ApplyBucketDiffStateTest, sync_bucket_info_if_needed_on_destruct)
+{
+ reset();
+ EXPECT_EQ(0, sync_count);
+ state->mark_stale_bucket_info();
+ EXPECT_EQ(0, sync_count);
+ reset();
+ EXPECT_EQ(1, sync_count);
+}
+
+TEST_F(ApplyBucketDiffStateTest, explicit_sync_bucket_info_works)
+{
+ state->sync_bucket_info();
+ EXPECT_EQ(0, sync_count);
+ state->mark_stale_bucket_info();
+ state->sync_bucket_info();
+ EXPECT_EQ(1, sync_count);
+ state->sync_bucket_info();
+ EXPECT_EQ(1, sync_count);
+ reset();
+ EXPECT_EQ(1, sync_count);
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 083e0406ef0..4b41d3fa778 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -3,6 +3,7 @@ vespa_add_library(storage_spersistence OBJECT
SOURCES
apply_bucket_diff_entry_complete.cpp
apply_bucket_diff_entry_result.cpp
+ apply_bucket_diff_state.cpp
asynchandler.cpp
bucketownershipnotifier.cpp
bucketprocessor.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
new file mode 100644
index 00000000000..4fe24895c54
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -0,0 +1,73 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "apply_bucket_diff_state.h"
+#include "apply_bucket_diff_entry_result.h"
+#include "mergehandler.h"
+
+namespace storage {
+
+ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket)
+ : _async_results(),
+ _merge_bucket_info_syncer(merge_bucket_info_syncer),
+ _bucket(bucket),
+ _stale_bucket_info(false)
+{
+}
+
+ApplyBucketDiffState::~ApplyBucketDiffState()
+{
+ wait();
+ sync_bucket_info();
+}
+
+bool
+ApplyBucketDiffState::empty() const
+{
+ return _async_results.empty();
+}
+
+void
+ApplyBucketDiffState::wait()
+{
+ for (auto &result_to_check : _async_results) {
+ result_to_check.wait();
+ }
+}
+
+void
+ApplyBucketDiffState::check()
+{
+ wait();
+ try {
+ for (auto& result_to_check : _async_results) {
+ result_to_check.check_result();
+ }
+ } catch (std::exception&) {
+ _async_results.clear();
+ throw;
+ }
+ _async_results.clear();
+}
+
+void
+ApplyBucketDiffState::push_back(ApplyBucketDiffEntryResult&& result)
+{
+ _async_results.push_back(std::move(result));
+}
+
+void
+ApplyBucketDiffState::mark_stale_bucket_info()
+{
+ _stale_bucket_info = true;
+}
+
+void
+ApplyBucketDiffState::sync_bucket_info()
+{
+ if (_stale_bucket_info) {
+ _merge_bucket_info_syncer.sync_bucket_info(_bucket);
+ _stale_bucket_info = false;
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
new file mode 100644
index 00000000000..489e75e4a72
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -0,0 +1,33 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/persistence/spi/bucket.h>
+#include <vector>
+
+namespace storage {
+
+class ApplyBucketDiffEntryResult;
+class MergeBucketInfoSyncer;
+
+/*
+ * State of all bucket diff entry spi operation (putAsync or removeAsync)
+ * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply.
+ */
+class ApplyBucketDiffState {
+ std::vector<ApplyBucketDiffEntryResult> _async_results;
+ const MergeBucketInfoSyncer& _merge_bucket_info_syncer;
+ spi::Bucket _bucket;
+ bool _stale_bucket_info;
+public:
+ ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket);
+ ~ApplyBucketDiffState();
+ void push_back(ApplyBucketDiffEntryResult&& result);
+ bool empty() const;
+ void wait();
+ void check();
+ void mark_stale_bucket_info();
+ void sync_bucket_info();
+};
+
+}
diff --git a/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
new file mode 100644
index 00000000000..e05991ad9e3
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/merge_bucket_info_syncer.h
@@ -0,0 +1,18 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace storage::spi { class Bucket; }
+
+namespace storage {
+
+/*
+ * Interface class for syncing bucket info during merge.
+ */
+class MergeBucketInfoSyncer {
+public:
+ virtual ~MergeBucketInfoSyncer() = default;
+ virtual void sync_bucket_info(const spi::Bucket& bucket) const = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 18e1c3d8d5a..7693156ae30 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -4,6 +4,7 @@
#include "persistenceutil.h"
#include "apply_bucket_diff_entry_complete.h"
#include "apply_bucket_diff_entry_result.h"
+#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -507,12 +508,13 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket,
/**
* Apply the diffs needed locally.
*/
-api::BucketInfo
+void
MergeHandler::applyDiffLocally(
const spi::Bucket& bucket,
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
- spi::Context& context) const
+ spi::Context& context,
+ ApplyBucketDiffState& async_results) const
{
// Sort the data to apply by which file they should be added to
LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries",
@@ -522,8 +524,8 @@ MergeHandler::applyDiffLocally(
uint32_t byteCount = 0;
uint32_t addedCount = 0;
uint32_t notNeededByteCount = 0;
- std::vector<ApplyBucketDiffEntryResult> async_results;
+ async_results.mark_stale_bucket_info();
std::vector<spi::DocEntry::UP> entries;
populateMetaData(bucket, MAX_TIMESTAMP, entries, context);
@@ -609,13 +611,6 @@ MergeHandler::applyDiffLocally(
async_results.push_back(applyDiffEntry(bucket, e, context, repo));
byteCount += e._headerBlob.size() + e._bodyBlob.size();
}
- for (auto &result_to_check : async_results) {
- result_to_check.wait();
- }
- for (auto &result_to_check : async_results) {
- result_to_check.check_result();
- }
-
if (byteCount + notNeededByteCount != 0) {
_env._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue(
static_cast<double>(byteCount) / (byteCount + notNeededByteCount));
@@ -623,7 +618,11 @@ MergeHandler::applyDiffLocally(
_env._metrics.merge_handler_metrics.bytesMerged.inc(byteCount);
LOG(debug, "Merge(%s): Applied %u entries locally from ApplyBucketDiff.",
bucket.toString().c_str(), addedCount);
+}
+void
+MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
+{
spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket));
if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) {
LOG(warning, "Failed to get bucket info for %s: %s",
@@ -642,7 +641,6 @@ MergeHandler::applyDiffLocally(
tmpInfo.isActive());
_env.updateBucketDatabase(bucket.getBucket(), providerInfo);
- return providerInfo;
}
namespace {
@@ -1204,6 +1202,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
tracker->setMetric(_env._metrics.applyBucketDiff);
spi::Bucket bucket(cmd.getBucket());
+ ApplyBucketDiffState async_results(*this, bucket);
LOG(debug, "%s", cmd.toString().c_str());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
@@ -1225,9 +1224,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
framework::MilliSecTimer startTime(_clock);
- (void) applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context());
+ applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
+ async_results.check();
+ async_results.sync_bucket_info();
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1285,6 +1286,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
{
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
+ ApplyBucketDiffState async_results(*this, bucket);
std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff());
LOG(debug, "%s", reply.toString().c_str());
@@ -1318,8 +1320,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
- (void) applyDiffLocally(bucket, diff, index, s.context);
+ applyDiffLocally(bucket, diff, index, s.context, async_results);
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
+ async_results.check();
+ async_results.sync_bucket_info();
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 17b0ff29b4b..fa7e21dae78 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -14,6 +14,7 @@
#pragma once
#include "types.h"
+#include "merge_bucket_info_syncer.h"
#include <vespa/persistence/spi/bucket.h>
#include <vespa/persistence/spi/docentry.h>
#include <vespa/storageapi/message/bucket.h>
@@ -28,9 +29,11 @@ namespace spi {
}
class PersistenceUtil;
class ApplyBucketDiffEntryResult;
+class ApplyBucketDiffState;
class MergeStatus;
-class MergeHandler : public Types {
+class MergeHandler : public Types,
+ public MergeBucketInfoSyncer {
public:
enum StateFlag {
@@ -54,11 +57,12 @@ public:
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
spi::Context& context) const;
- api::BucketInfo applyDiffLocally(
- const spi::Bucket& bucket,
+ void applyDiffLocally(const spi::Bucket& bucket,
std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
uint8_t nodeIndex,
- spi::Context& context) const;
+ spi::Context& context,
+ ApplyBucketDiffState& async_results) const;
+ void sync_bucket_info(const spi::Bucket& bucket) const override;
MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;