diff options
Diffstat (limited to 'storage/src/tests/distributor/bucketdatabasetest.cpp')
-rw-r--r-- | storage/src/tests/distributor/bucketdatabasetest.cpp | 242 |
1 files changed, 195 insertions, 47 deletions
diff --git a/storage/src/tests/distributor/bucketdatabasetest.cpp b/storage/src/tests/distributor/bucketdatabasetest.cpp index 92e0c534e31..cfb54edbe78 100644 --- a/storage/src/tests/distributor/bucketdatabasetest.cpp +++ b/storage/src/tests/distributor/bucketdatabasetest.cpp @@ -1,5 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketdatabasetest.h" +#include <vespa/vespalib/util/benchmark_timer.h> +#include <chrono> #include <iomanip> #include <algorithm> @@ -12,15 +14,25 @@ void BucketDatabaseTest::SetUp() { } namespace { - BucketCopy BC(uint32_t nodeIdx) { - return BucketCopy(0, nodeIdx, api::BucketInfo()); - } - BucketInfo BI(uint32_t nodeIdx) { - BucketInfo bi; - bi.addNode(BC(nodeIdx), toVector<uint16_t>(0)); - return bi; - } +BucketCopy BC(uint32_t nodeIdx) { + return BucketCopy(0, nodeIdx, api::BucketInfo()); +} + +BucketInfo BI(uint32_t nodeIdx) { + BucketInfo bi; + bi.addNode(BC(nodeIdx), toVector<uint16_t>(0)); + return bi; +} + +BucketInfo BI3(uint32_t node0, uint32_t node1, uint32_t node2) { + BucketInfo bi; + bi.addNode(BC(node0), toVector<uint16_t>(node0, node1, node2)); + bi.addNode(BC(node1), toVector<uint16_t>(node0, node1, node2)); + bi.addNode(BC(node2), toVector<uint16_t>(node0, node1, node2)); + return bi; +} + } TEST_P(BucketDatabaseTest, testClear) { @@ -63,34 +75,23 @@ TEST_P(BucketDatabaseTest, testUpdateGetAndRemove) { namespace { -struct ModifyProcessor : public BucketDatabase::MutableEntryProcessor -{ - bool process(BucketDatabase::Entry& e) override { - if (e.getBucketId() == document::BucketId(16, 0x0b)) { - e.getBucketInfo() = BI(7); - } else if (e.getBucketId() == document::BucketId(16, 0x2a)) { - e->clear(); - e->addNode(BC(4), toVector<uint16_t>(0)); - e->addNode(BC(5), toVector<uint16_t>(0)); - } - - return true; - } -}; - struct ListAllProcessor : public BucketDatabase::EntryProcessor { std::ostringstream ost; - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { ost << e << "\n"; return true; } }; -struct DummyProcessor : public BucketDatabase::EntryProcessor { - std::ostringstream ost; +std::string dump_db(const BucketDatabase& db) { + ListAllProcessor proc; + db.forEach(proc, document::BucketId()); + return proc.ost.str(); +} - bool process(const BucketDatabase::Entry&) override { +struct DummyProcessor : public BucketDatabase::EntryProcessor { + bool process(const BucketDatabase::ConstEntryRef&) override { return true; } }; @@ -99,7 +100,7 @@ struct DummyProcessor : public BucketDatabase::EntryProcessor { struct StoppingProcessor : public BucketDatabase::EntryProcessor { std::ostringstream ost; - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { ost << e << "\n"; if (e.getBucketId() == document::BucketId(16, 0x2a)) { @@ -156,25 +157,6 @@ TEST_P(BucketDatabaseTest, testIterating) { "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"), proc.ost.str()); } - - { - ModifyProcessor alterProc; - db().forEach(alterProc, document::BucketId()); - // Verify content after altering - ListAllProcessor proc; - db().forEach(proc); - - EXPECT_EQ( - std::string( - "BucketId(0x4000000000000010) : " - "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" - "BucketId(0x400000000000002a) : " - "node(idx=4,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false), " - "node(idx=5,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" - "BucketId(0x400000000000000b) : " - "node(idx=7,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"), - proc.ost.str()); - } } std::string @@ -552,4 +534,170 @@ TEST_P(BucketDatabaseTest, testChildCount) { EXPECT_EQ(0u, db().childCount(BucketId(3, 5))); } +using Merger = BucketDatabase::Merger; +using TrailingInserter = BucketDatabase::TrailingInserter; +using Result = BucketDatabase::MergingProcessor::Result; + +namespace { + +struct KeepUnchangedMergingProcessor : BucketDatabase::MergingProcessor { + Result merge(Merger&) override { + return Result::KeepUnchanged; + } +}; + +struct SkipBucketMergingProcessor : BucketDatabase::MergingProcessor { + BucketId _skip_bucket; + explicit SkipBucketMergingProcessor(BucketId skip_bucket) : _skip_bucket(skip_bucket) {} + + Result merge(Merger& m) override { + return (m.bucket_id() == _skip_bucket) ? Result::Skip : Result::KeepUnchanged; + } +}; + +struct UpdateBucketMergingProcessor : BucketDatabase::MergingProcessor { + BucketId _update_bucket; + explicit UpdateBucketMergingProcessor(BucketId update_bucket) : _update_bucket(update_bucket) {} + + Result merge(Merger& m) override { + if (m.bucket_id() == _update_bucket) { + auto& e = m.current_entry(); + // Add a replica and alter the current one. + e->addNode(BucketCopy(123456, 0, api::BucketInfo(2, 3, 4)), toVector<uint16_t>(0)); + e->addNode(BucketCopy(234567, 1, api::BucketInfo(3, 4, 5)), toVector<uint16_t>(1)); + return Result::Update; + } + return Result::KeepUnchanged; + } +}; + +struct InsertBeforeBucketMergingProcessor : BucketDatabase::MergingProcessor { + BucketId _before_bucket; + explicit InsertBeforeBucketMergingProcessor(BucketId before_bucket) : _before_bucket(before_bucket) {} + + Result merge(Merger& m) override { + if (m.bucket_id() == _before_bucket) { + // Assumes _before_bucket is > the inserted bucket + m.insert_before_current(BucketDatabase::Entry(document::BucketId(16, 2), BI(2))); + } + return Result::KeepUnchanged; + } +}; + +struct InsertAtEndMergingProcessor : BucketDatabase::MergingProcessor { + Result merge(Merger&) override { + return Result::KeepUnchanged; + } + + void insert_remaining_at_end(TrailingInserter& inserter) override { + inserter.insert_at_end(BucketDatabase::Entry(document::BucketId(16, 3), BI(3))); + } +}; + +} + +TEST_P(BucketDatabaseTest, merge_keep_unchanged_result_does_not_alter_db_contents) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + + KeepUnchangedMergingProcessor proc; + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_entry_skipping_removes_entry_from_db) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3))); + + SkipBucketMergingProcessor proc(BucketId(16, 2)); + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000003) : " + "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_update_result_updates_entry_in_db) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + + UpdateBucketMergingProcessor proc(BucketId(16, 1)); + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), " + "node(idx=0,crc=0x2,docs=3/3,bytes=4/4,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_can_insert_entry_before_current_bucket) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3))); + + InsertBeforeBucketMergingProcessor proc(BucketId(16, 1)); + db().merge(proc); + + // Bucket (...)00002 is inserted by the merge processor + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000003) : " + "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_can_insert_entry_at_end) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + + InsertAtEndMergingProcessor proc; + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000003) : " + "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, DISABLED_benchmark_const_iteration) { + constexpr uint32_t superbuckets = 1u << 16u; + constexpr uint32_t sub_buckets = 14; + constexpr uint32_t n_buckets = superbuckets * sub_buckets; + + std::vector<uint64_t> bucket_keys; + bucket_keys.reserve(n_buckets); + + for (uint32_t sb = 0; sb < superbuckets; ++sb) { + for (uint64_t i = 0; i < sub_buckets; ++i) { + document::BucketId bucket(48, (i << 32ULL) | sb); + bucket_keys.emplace_back(bucket.toKey()); + } + } + std::sort(bucket_keys.begin(), bucket_keys.end()); + for (uint64_t k : bucket_keys) { + db().update(BucketDatabase::Entry(BucketId(BucketId::keyToBucketId(k)), BI3(0, 1, 2))); + } + + auto elapsed = vespalib::BenchmarkTimer::benchmark([&] { + DummyProcessor proc; + db().forEach(proc, document::BucketId()); + }, 5); + fprintf(stderr, "Full DB iteration of %s takes %g seconds\n", + db().toString(false).c_str(), elapsed); +} + } |