summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/bucketdatabasetest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/bucketdatabasetest.cpp')
-rw-r--r--storage/src/tests/distributor/bucketdatabasetest.cpp242
1 files changed, 195 insertions, 47 deletions
diff --git a/storage/src/tests/distributor/bucketdatabasetest.cpp b/storage/src/tests/distributor/bucketdatabasetest.cpp
index 92e0c534e31..cfb54edbe78 100644
--- a/storage/src/tests/distributor/bucketdatabasetest.cpp
+++ b/storage/src/tests/distributor/bucketdatabasetest.cpp
@@ -1,5 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketdatabasetest.h"
+#include <vespa/vespalib/util/benchmark_timer.h>
+#include <chrono>
#include <iomanip>
#include <algorithm>
@@ -12,15 +14,25 @@ void BucketDatabaseTest::SetUp() {
}
namespace {
- BucketCopy BC(uint32_t nodeIdx) {
- return BucketCopy(0, nodeIdx, api::BucketInfo());
- }
- BucketInfo BI(uint32_t nodeIdx) {
- BucketInfo bi;
- bi.addNode(BC(nodeIdx), toVector<uint16_t>(0));
- return bi;
- }
+BucketCopy BC(uint32_t nodeIdx) {
+ return BucketCopy(0, nodeIdx, api::BucketInfo());
+}
+
+BucketInfo BI(uint32_t nodeIdx) {
+ BucketInfo bi;
+ bi.addNode(BC(nodeIdx), toVector<uint16_t>(0));
+ return bi;
+}
+
+BucketInfo BI3(uint32_t node0, uint32_t node1, uint32_t node2) {
+ BucketInfo bi;
+ bi.addNode(BC(node0), toVector<uint16_t>(node0, node1, node2));
+ bi.addNode(BC(node1), toVector<uint16_t>(node0, node1, node2));
+ bi.addNode(BC(node2), toVector<uint16_t>(node0, node1, node2));
+ return bi;
+}
+
}
TEST_P(BucketDatabaseTest, testClear) {
@@ -63,34 +75,23 @@ TEST_P(BucketDatabaseTest, testUpdateGetAndRemove) {
namespace {
-struct ModifyProcessor : public BucketDatabase::MutableEntryProcessor
-{
- bool process(BucketDatabase::Entry& e) override {
- if (e.getBucketId() == document::BucketId(16, 0x0b)) {
- e.getBucketInfo() = BI(7);
- } else if (e.getBucketId() == document::BucketId(16, 0x2a)) {
- e->clear();
- e->addNode(BC(4), toVector<uint16_t>(0));
- e->addNode(BC(5), toVector<uint16_t>(0));
- }
-
- return true;
- }
-};
-
struct ListAllProcessor : public BucketDatabase::EntryProcessor {
std::ostringstream ost;
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
ost << e << "\n";
return true;
}
};
-struct DummyProcessor : public BucketDatabase::EntryProcessor {
- std::ostringstream ost;
+std::string dump_db(const BucketDatabase& db) {
+ ListAllProcessor proc;
+ db.forEach(proc, document::BucketId());
+ return proc.ost.str();
+}
- bool process(const BucketDatabase::Entry&) override {
+struct DummyProcessor : public BucketDatabase::EntryProcessor {
+ bool process(const BucketDatabase::ConstEntryRef&) override {
return true;
}
};
@@ -99,7 +100,7 @@ struct DummyProcessor : public BucketDatabase::EntryProcessor {
struct StoppingProcessor : public BucketDatabase::EntryProcessor {
std::ostringstream ost;
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
ost << e << "\n";
if (e.getBucketId() == document::BucketId(16, 0x2a)) {
@@ -156,25 +157,6 @@ TEST_P(BucketDatabaseTest, testIterating) {
"node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"),
proc.ost.str());
}
-
- {
- ModifyProcessor alterProc;
- db().forEach(alterProc, document::BucketId());
- // Verify content after altering
- ListAllProcessor proc;
- db().forEach(proc);
-
- EXPECT_EQ(
- std::string(
- "BucketId(0x4000000000000010) : "
- "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
- "BucketId(0x400000000000002a) : "
- "node(idx=4,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false), "
- "node(idx=5,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
- "BucketId(0x400000000000000b) : "
- "node(idx=7,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"),
- proc.ost.str());
- }
}
std::string
@@ -552,4 +534,170 @@ TEST_P(BucketDatabaseTest, testChildCount) {
EXPECT_EQ(0u, db().childCount(BucketId(3, 5)));
}
+using Merger = BucketDatabase::Merger;
+using TrailingInserter = BucketDatabase::TrailingInserter;
+using Result = BucketDatabase::MergingProcessor::Result;
+
+namespace {
+
+struct KeepUnchangedMergingProcessor : BucketDatabase::MergingProcessor {
+ Result merge(Merger&) override {
+ return Result::KeepUnchanged;
+ }
+};
+
+struct SkipBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _skip_bucket;
+ explicit SkipBucketMergingProcessor(BucketId skip_bucket) : _skip_bucket(skip_bucket) {}
+
+ Result merge(Merger& m) override {
+ return (m.bucket_id() == _skip_bucket) ? Result::Skip : Result::KeepUnchanged;
+ }
+};
+
+struct UpdateBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _update_bucket;
+ explicit UpdateBucketMergingProcessor(BucketId update_bucket) : _update_bucket(update_bucket) {}
+
+ Result merge(Merger& m) override {
+ if (m.bucket_id() == _update_bucket) {
+ auto& e = m.current_entry();
+ // Add a replica and alter the current one.
+ e->addNode(BucketCopy(123456, 0, api::BucketInfo(2, 3, 4)), toVector<uint16_t>(0));
+ e->addNode(BucketCopy(234567, 1, api::BucketInfo(3, 4, 5)), toVector<uint16_t>(1));
+ return Result::Update;
+ }
+ return Result::KeepUnchanged;
+ }
+};
+
+struct InsertBeforeBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _before_bucket;
+ explicit InsertBeforeBucketMergingProcessor(BucketId before_bucket) : _before_bucket(before_bucket) {}
+
+ Result merge(Merger& m) override {
+ if (m.bucket_id() == _before_bucket) {
+ // Assumes _before_bucket is > the inserted bucket
+ m.insert_before_current(BucketDatabase::Entry(document::BucketId(16, 2), BI(2)));
+ }
+ return Result::KeepUnchanged;
+ }
+};
+
+struct InsertAtEndMergingProcessor : BucketDatabase::MergingProcessor {
+ Result merge(Merger&) override {
+ return Result::KeepUnchanged;
+ }
+
+ void insert_remaining_at_end(TrailingInserter& inserter) override {
+ inserter.insert_at_end(BucketDatabase::Entry(document::BucketId(16, 3), BI(3)));
+ }
+};
+
+}
+
+TEST_P(BucketDatabaseTest, merge_keep_unchanged_result_does_not_alter_db_contents) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ KeepUnchangedMergingProcessor proc;
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_entry_skipping_removes_entry_from_db) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+ db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3)));
+
+ SkipBucketMergingProcessor proc(BucketId(16, 2));
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_update_result_updates_entry_in_db) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ UpdateBucketMergingProcessor proc(BucketId(16, 1));
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), "
+ "node(idx=0,crc=0x2,docs=3/3,bytes=4/4,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_can_insert_entry_before_current_bucket) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3)));
+
+ InsertBeforeBucketMergingProcessor proc(BucketId(16, 1));
+ db().merge(proc);
+
+ // Bucket (...)00002 is inserted by the merge processor
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_can_insert_entry_at_end) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ InsertAtEndMergingProcessor proc;
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, DISABLED_benchmark_const_iteration) {
+ constexpr uint32_t superbuckets = 1u << 16u;
+ constexpr uint32_t sub_buckets = 14;
+ constexpr uint32_t n_buckets = superbuckets * sub_buckets;
+
+ std::vector<uint64_t> bucket_keys;
+ bucket_keys.reserve(n_buckets);
+
+ for (uint32_t sb = 0; sb < superbuckets; ++sb) {
+ for (uint64_t i = 0; i < sub_buckets; ++i) {
+ document::BucketId bucket(48, (i << 32ULL) | sb);
+ bucket_keys.emplace_back(bucket.toKey());
+ }
+ }
+ std::sort(bucket_keys.begin(), bucket_keys.end());
+ for (uint64_t k : bucket_keys) {
+ db().update(BucketDatabase::Entry(BucketId(BucketId::keyToBucketId(k)), BI3(0, 1, 2)));
+ }
+
+ auto elapsed = vespalib::BenchmarkTimer::benchmark([&] {
+ DummyProcessor proc;
+ db().forEach(proc, document::BucketId());
+ }, 5);
+ fprintf(stderr, "Full DB iteration of %s takes %g seconds\n",
+ db().toString(false).c_str(), elapsed);
+}
+
}