diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-06-05 11:04:50 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-05 11:04:50 +0200 |
commit | 2d10eff40215c7b11ae3b01a2fba45ad8135cc61 (patch) | |
tree | 1a9aa6d2318b226c4787802bd06fe3f6291fa1c5 /storage | |
parent | 80a3f22ab612d9cd1e6f0b249605b4342bebaead (diff) | |
parent | 3ec818fe80674e16b282534570a7c28ce80f5b84 (diff) |
Merge pull request #9657 from vespa-engine/vekterli/more-efficient-bucket-db-bulk-apis
Add new DB merging API to distributor BucketDatabase
Diffstat (limited to 'storage')
21 files changed, 1119 insertions, 604 deletions
diff --git a/storage/src/tests/distributor/bucketdatabasetest.cpp b/storage/src/tests/distributor/bucketdatabasetest.cpp index 92e0c534e31..cfb54edbe78 100644 --- a/storage/src/tests/distributor/bucketdatabasetest.cpp +++ b/storage/src/tests/distributor/bucketdatabasetest.cpp @@ -1,5 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketdatabasetest.h" +#include <vespa/vespalib/util/benchmark_timer.h> +#include <chrono> #include <iomanip> #include <algorithm> @@ -12,15 +14,25 @@ void BucketDatabaseTest::SetUp() { } namespace { - BucketCopy BC(uint32_t nodeIdx) { - return BucketCopy(0, nodeIdx, api::BucketInfo()); - } - BucketInfo BI(uint32_t nodeIdx) { - BucketInfo bi; - bi.addNode(BC(nodeIdx), toVector<uint16_t>(0)); - return bi; - } +BucketCopy BC(uint32_t nodeIdx) { + return BucketCopy(0, nodeIdx, api::BucketInfo()); +} + +BucketInfo BI(uint32_t nodeIdx) { + BucketInfo bi; + bi.addNode(BC(nodeIdx), toVector<uint16_t>(0)); + return bi; +} + +BucketInfo BI3(uint32_t node0, uint32_t node1, uint32_t node2) { + BucketInfo bi; + bi.addNode(BC(node0), toVector<uint16_t>(node0, node1, node2)); + bi.addNode(BC(node1), toVector<uint16_t>(node0, node1, node2)); + bi.addNode(BC(node2), toVector<uint16_t>(node0, node1, node2)); + return bi; +} + } TEST_P(BucketDatabaseTest, testClear) { @@ -63,34 +75,23 @@ TEST_P(BucketDatabaseTest, testUpdateGetAndRemove) { namespace { -struct ModifyProcessor : public BucketDatabase::MutableEntryProcessor -{ - bool process(BucketDatabase::Entry& e) override { - if (e.getBucketId() == document::BucketId(16, 0x0b)) { - e.getBucketInfo() = BI(7); - } else if (e.getBucketId() == document::BucketId(16, 0x2a)) { - e->clear(); - e->addNode(BC(4), toVector<uint16_t>(0)); - e->addNode(BC(5), toVector<uint16_t>(0)); - } - - return true; - } -}; - struct ListAllProcessor : public BucketDatabase::EntryProcessor { std::ostringstream ost; - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { ost << e << "\n"; return true; } }; -struct DummyProcessor : public BucketDatabase::EntryProcessor { - std::ostringstream ost; +std::string dump_db(const BucketDatabase& db) { + ListAllProcessor proc; + db.forEach(proc, document::BucketId()); + return proc.ost.str(); +} - bool process(const BucketDatabase::Entry&) override { +struct DummyProcessor : public BucketDatabase::EntryProcessor { + bool process(const BucketDatabase::ConstEntryRef&) override { return true; } }; @@ -99,7 +100,7 @@ struct DummyProcessor : public BucketDatabase::EntryProcessor { struct StoppingProcessor : public BucketDatabase::EntryProcessor { std::ostringstream ost; - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { ost << e << "\n"; if (e.getBucketId() == document::BucketId(16, 0x2a)) { @@ -156,25 +157,6 @@ TEST_P(BucketDatabaseTest, testIterating) { "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"), proc.ost.str()); } - - { - ModifyProcessor alterProc; - db().forEach(alterProc, document::BucketId()); - // Verify content after altering - ListAllProcessor proc; - db().forEach(proc); - - EXPECT_EQ( - std::string( - "BucketId(0x4000000000000010) : " - "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" - "BucketId(0x400000000000002a) : " - "node(idx=4,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false), " - "node(idx=5,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" - "BucketId(0x400000000000000b) : " - "node(idx=7,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"), - proc.ost.str()); - } } std::string @@ -552,4 +534,170 @@ TEST_P(BucketDatabaseTest, testChildCount) { EXPECT_EQ(0u, db().childCount(BucketId(3, 5))); } +using Merger = BucketDatabase::Merger; +using TrailingInserter = BucketDatabase::TrailingInserter; +using Result = BucketDatabase::MergingProcessor::Result; + +namespace { + +struct KeepUnchangedMergingProcessor : BucketDatabase::MergingProcessor { + Result merge(Merger&) override { + return Result::KeepUnchanged; + } +}; + +struct SkipBucketMergingProcessor : BucketDatabase::MergingProcessor { + BucketId _skip_bucket; + explicit SkipBucketMergingProcessor(BucketId skip_bucket) : _skip_bucket(skip_bucket) {} + + Result merge(Merger& m) override { + return (m.bucket_id() == _skip_bucket) ? Result::Skip : Result::KeepUnchanged; + } +}; + +struct UpdateBucketMergingProcessor : BucketDatabase::MergingProcessor { + BucketId _update_bucket; + explicit UpdateBucketMergingProcessor(BucketId update_bucket) : _update_bucket(update_bucket) {} + + Result merge(Merger& m) override { + if (m.bucket_id() == _update_bucket) { + auto& e = m.current_entry(); + // Add a replica and alter the current one. + e->addNode(BucketCopy(123456, 0, api::BucketInfo(2, 3, 4)), toVector<uint16_t>(0)); + e->addNode(BucketCopy(234567, 1, api::BucketInfo(3, 4, 5)), toVector<uint16_t>(1)); + return Result::Update; + } + return Result::KeepUnchanged; + } +}; + +struct InsertBeforeBucketMergingProcessor : BucketDatabase::MergingProcessor { + BucketId _before_bucket; + explicit InsertBeforeBucketMergingProcessor(BucketId before_bucket) : _before_bucket(before_bucket) {} + + Result merge(Merger& m) override { + if (m.bucket_id() == _before_bucket) { + // Assumes _before_bucket is > the inserted bucket + m.insert_before_current(BucketDatabase::Entry(document::BucketId(16, 2), BI(2))); + } + return Result::KeepUnchanged; + } +}; + +struct InsertAtEndMergingProcessor : BucketDatabase::MergingProcessor { + Result merge(Merger&) override { + return Result::KeepUnchanged; + } + + void insert_remaining_at_end(TrailingInserter& inserter) override { + inserter.insert_at_end(BucketDatabase::Entry(document::BucketId(16, 3), BI(3))); + } +}; + +} + +TEST_P(BucketDatabaseTest, merge_keep_unchanged_result_does_not_alter_db_contents) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + + KeepUnchangedMergingProcessor proc; + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_entry_skipping_removes_entry_from_db) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3))); + + SkipBucketMergingProcessor proc(BucketId(16, 2)); + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000003) : " + "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_update_result_updates_entry_in_db) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + + UpdateBucketMergingProcessor proc(BucketId(16, 1)); + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), " + "node(idx=0,crc=0x2,docs=3/3,bytes=4/4,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_can_insert_entry_before_current_bucket) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3))); + + InsertBeforeBucketMergingProcessor proc(BucketId(16, 1)); + db().merge(proc); + + // Bucket (...)00002 is inserted by the merge processor + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000003) : " + "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, merge_can_insert_entry_at_end) { + db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1))); + db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2))); + + InsertAtEndMergingProcessor proc; + db().merge(proc); + + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n" + "BucketId(0x4000000000000003) : " + "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); +} + +TEST_P(BucketDatabaseTest, DISABLED_benchmark_const_iteration) { + constexpr uint32_t superbuckets = 1u << 16u; + constexpr uint32_t sub_buckets = 14; + constexpr uint32_t n_buckets = superbuckets * sub_buckets; + + std::vector<uint64_t> bucket_keys; + bucket_keys.reserve(n_buckets); + + for (uint32_t sb = 0; sb < superbuckets; ++sb) { + for (uint64_t i = 0; i < sub_buckets; ++i) { + document::BucketId bucket(48, (i << 32ULL) | sb); + bucket_keys.emplace_back(bucket.toKey()); + } + } + std::sort(bucket_keys.begin(), bucket_keys.end()); + for (uint64_t k : bucket_keys) { + db().update(BucketDatabase::Entry(BucketId(BucketId::keyToBucketId(k)), BI3(0, 1, 2))); + } + + auto elapsed = vespalib::BenchmarkTimer::benchmark([&] { + DummyProcessor proc; + db().forEach(proc, document::BucketId()); + }, 5); + fprintf(stderr, "Full DB iteration of %s takes %g seconds\n", + db().toString(false).c_str(), elapsed); +} + } diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 1cfc1692edb..cdaa6e9aaa3 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -514,7 +514,7 @@ public: OutdatedNodesMap outdatedNodesMap; state = PendingClusterState::createForClusterStateChange( clock, clusterInfo, sender, - owner.getBucketSpaceRepo(), owner.getReadOnlyBucketSpaceRepo(), + owner.getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1)); } @@ -526,8 +526,7 @@ public: owner.createClusterInfo(oldClusterState)); state = PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, owner.getBucketSpaceRepo(), - owner.getReadOnlyBucketSpaceRepo(), api::Timestamp(1)); + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); } }; @@ -543,6 +542,8 @@ public: { return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState); } + + uint32_t populate_bucket_db_via_request_bucket_info_for_benchmarking(); }; BucketDBUpdaterTest::BucketDBUpdaterTest() @@ -863,14 +864,14 @@ TEST_F(BucketDBUpdaterTest, testBitChange) { const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]); auto sreply = std::make_shared<RequestBucketInfoReply>(req); sreply->setAddress(storageAddress(0)); - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + auto& vec = sreply->getBucketInfo(); if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { int cnt=0; for (int i=0; cnt < 2; i++) { lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); std::vector<uint16_t> distributors; if (distribution.getIdealDistributorNode( - lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), + lib::ClusterState("bits:14 storage:1 distributor:2"), document::BucketId(16, i)) == 0) { @@ -1373,8 +1374,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged( ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), - getReadOnlyBucketSpaceRepo(), api::Timestamp(1))); + clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1))); sortSentMessagesByIndex(sender); @@ -1508,7 +1508,7 @@ TEST_F(BucketDBUpdaterTest, testPendingClusterStateReceive) { OutdatedNodesMap outdatedNodesMap; std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); ASSERT_EQ(messageCount(3), sender.commands.size()); @@ -1617,7 +1617,7 @@ struct BucketDumper : public BucketDatabase::EntryProcessor { } - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { document::BucketId bucketId(e.getBucketId()); ost << (uint32_t)bucketId.getRawId() << ":"; @@ -1661,7 +1661,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, beforeTime)); parseInputData(existingData, beforeTime, *state, includeBucketInfo); @@ -1680,7 +1680,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, afterTime)); parseInputData(newData, afterTime, *state, includeBucketInfo); @@ -1931,7 +1931,7 @@ struct FunctorProcessor : BucketDatabase::EntryProcessor { template <typename F> explicit FunctorProcessor(F&& f) : _f(std::forward<F>(f)) {} - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { _f(e); return true; } @@ -2580,19 +2580,17 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_trans TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { // Need to trigger an initial edge to complete first bucket scan - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("storage:1 distributor:2"), + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0)); _sender.clear(); - lib::ClusterState state("storage:1 distributor:1"); + lib::ClusterState state("distributor:1 storage:1"); setSystemState(state); constexpr uint32_t superbuckets = 1u << 16u; constexpr uint32_t sub_buckets = 14; constexpr uint32_t n_buckets = superbuckets * sub_buckets; - vespalib::BenchmarkTimer timer(1.0); - ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO); @@ -2610,6 +2608,7 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { } } + vespalib::BenchmarkTimer timer(1.0); // Global space has no buckets but will serve as a trigger for merging // buckets into the DB. This lets us measure the overhead of just this part. if (req.getBucketSpace() == FixedBucketSpaces::global_space()) { @@ -2626,6 +2625,77 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { EXPECT_EQ(size_t(0), mutable_global_db().size()); } +uint32_t BucketDBUpdaterTest::populate_bucket_db_via_request_bucket_info_for_benchmarking() { + // Need to trigger an initial edge to complete first bucket scan + setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0); + _sender.clear(); + + lib::ClusterState state("distributor:1 storage:1"); + setSystemState(state); + + constexpr uint32_t superbuckets = 1u << 16u; + constexpr uint32_t sub_buckets = 14; + constexpr uint32_t n_buckets = superbuckets * sub_buckets; + + assert(_bucketSpaces.size() == _sender.commands.size()); + for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { + assert(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.commands[bsi]); + + auto sreply = std::make_shared<RequestBucketInfoReply>(req); + sreply->setAddress(storageAddress(0)); + auto& vec = sreply->getBucketInfo(); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + for (uint32_t sb = 0; sb < superbuckets; ++sb) { + for (uint64_t i = 0; i < sub_buckets; ++i) { + document::BucketId bucket(48, (i << 32ULL) | sb); + vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1))); + } + } + } + getBucketDBUpdater().onRequestBucketInfoReply(sreply); + } + + assert(mutable_default_db().size() == n_buckets); + assert(mutable_global_db().size() == 0); + return n_buckets; +} + +TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_removing_buckets_for_unavailable_storage_nodes) { + const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); + + lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via ownership + vespalib::BenchmarkTimer timer(1.0); + timer.before(); + setSystemState(no_op_state); + timer.after(); + fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); +} + +TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_no_buckets_removed_during_node_remover_db_pass) { + const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); + + // TODO this benchmark is void if we further restrict the pruning elision logic to allow + // elision when storage nodes come online. + lib::ClusterState no_op_state("distributor:1 storage:2"); // Not removing any buckets + vespalib::BenchmarkTimer timer(1.0); + timer.before(); + setSystemState(no_op_state); + timer.after(); + fprintf(stderr, "Took %g seconds to scan %u buckets with no-op action\n", timer.min_time(), n_buckets); +} + +TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_node_remover_db_pass) { + const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); + + lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via all replicas gone + vespalib::BenchmarkTimer timer(1.0); + timer.before(); + setSystemState(no_op_state); + timer.after(); + fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); +} + TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) { auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d"); auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m"); diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp index a3715bb3d27..ef198cb1d3f 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp @@ -35,6 +35,7 @@ namespace storage { using Entry = BucketDatabase::Entry; +using ConstEntryRef = BucketDatabase::ConstEntryRef; using search::datastore::EntryRef; using vespalib::ConstArrayRef; using document::BucketId; @@ -55,6 +56,12 @@ Entry entry_from_replica_array_ref(const BucketId& id, uint32_t gc_timestamp, Co return Entry(id, BucketInfo(gc_timestamp, std::vector<BucketCopy>(replicas.begin(), replicas.end()))); } +ConstEntryRef const_entry_ref_from_replica_array_ref(const BucketId& id, uint32_t gc_timestamp, + ConstArrayRef<BucketCopy> replicas) +{ + return ConstEntryRef(id, ConstBucketInfoRef(gc_timestamp, replicas)); +} + EntryRef entry_ref_from_value(uint64_t value) { return EntryRef(value & 0xffffffffULL); } @@ -127,14 +134,27 @@ void BTreeBucketDatabase::commit_tree_changes() { _tree.getAllocator().trimHoldLists(used_gen); } +Entry BTreeBucketDatabase::entry_from_value(uint64_t bucket_key, uint64_t value) const { + const auto replicas_ref = _store.get(entry_ref_from_value(value)); + const auto bucket = BucketId(BucketId::keyToBucketId(bucket_key)); + return entry_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref); +} + Entry BTreeBucketDatabase::entry_from_iterator(const BTree::ConstIterator& iter) const { if (!iter.valid()) { return Entry::createInvalid(); } + return entry_from_value(iter.getKey(), iter.getData()); +} + +ConstEntryRef BTreeBucketDatabase::const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const { + if (!iter.valid()) { + return ConstEntryRef::createInvalid(); + } const auto value = iter.getData(); const auto replicas_ref = _store.get(entry_ref_from_value(value)); const auto bucket = BucketId(BucketId::keyToBucketId(iter.getKey())); - return entry_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref); + return const_entry_ref_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref); } BucketId BTreeBucketDatabase::bucket_from_valid_iterator(const BTree::ConstIterator& iter) const { @@ -216,8 +236,8 @@ BTreeBucketDatabase::find_parents_internal(const document::BucketId& bucket, std::vector<Entry>& entries) const { const uint64_t bucket_key = bucket.toKey(); - uint64_t parent_key = 0; - auto iter = _tree.begin(); + const auto frozen_view = _tree.getFrozenView(); + auto iter = frozen_view.begin(); // Start at the root level, descending towards the bucket itself. // Try skipping as many levels of the tree as possible as we go. uint32_t bits = 1; @@ -228,11 +248,11 @@ BTreeBucketDatabase::find_parents_internal(const document::BucketId& bucket, entries.emplace_back(entry_from_iterator(iter)); } bits = next_parent_bit_seek_level(bits, candidate, bucket); - parent_key = BucketId(bits, bucket.getRawId()).toKey(); + const auto parent_key = BucketId(bits, bucket.getRawId()).toKey(); assert(parent_key > iter.getKey()); iter.seek(parent_key); } - return iter; // FIXME clang warns here due to copying... + return iter; } /* @@ -286,30 +306,118 @@ void BTreeBucketDatabase::update(const Entry& newEntry) { // FIXME but bit-tree code says "lowerBound" in impl and "after" in declaration??? void BTreeBucketDatabase::forEach(EntryProcessor& proc, const BucketId& after) const { for (auto iter = _tree.upperBound(after.toKey()); iter.valid(); ++iter) { - if (!proc.process(entry_from_iterator(iter))) { + if (!proc.process(const_entry_ref_from_iterator(iter))) { break; } } } -void BTreeBucketDatabase::forEach(MutableEntryProcessor& proc, const BucketId& after) { - for (auto iter = _tree.upperBound(after.toKey()); iter.valid(); ++iter) { - // FIXME this is a horrible API which currently has to update every time since we don't - // know from the function call itself whether something was in fact updated behind the scenes..! - auto entry = entry_from_iterator(iter); - bool should_continue = proc.process(entry); - // TODO optimize this joyful mess :D - update(entry); - if (!should_continue) { - break; +struct BTreeBuilderMerger final : BucketDatabase::Merger { + BTreeBucketDatabase& _db; + BTreeBucketDatabase::BTree::Builder& _builder; + uint64_t _current_key; + uint64_t _current_value; + Entry _cached_entry; + bool _valid_cached_entry; + + BTreeBuilderMerger(BTreeBucketDatabase& db, + BTreeBucketDatabase::BTree::Builder& builder) + : _db(db), + _builder(builder), + _current_key(0), + _current_value(0), + _cached_entry(), + _valid_cached_entry(false) + {} + ~BTreeBuilderMerger() override = default; + + uint64_t bucket_key() const noexcept override { + return _current_key; + } + BucketId bucket_id() const noexcept override { + return BucketId(BucketId::keyToBucketId(_current_key)); + } + Entry& current_entry() override { + if (!_valid_cached_entry) { + _cached_entry = _db.entry_from_value(_current_key, _current_value); + _valid_cached_entry = true; } + return _cached_entry; } - //commit_tree_changes(); TODO should be done as bulk op! + void insert_before_current(const Entry& e) override { + const uint64_t bucket_key = e.getBucketId().toKey(); + assert(bucket_key < _current_key); + + auto replicas_ref = _db._store.add(e.getBucketInfo().getRawNodes()); + const auto new_value = value_from(e.getBucketInfo().getLastGarbageCollectionTime(), replicas_ref); + + _builder.insert(bucket_key, new_value); + } + + void update_iteration_state(uint64_t key, uint64_t value) { + _current_key = key; + _current_value = value; + _valid_cached_entry = false; + } +}; + +struct BTreeTrailingInserter final : BucketDatabase::TrailingInserter { + BTreeBucketDatabase& _db; + BTreeBucketDatabase::BTree::Builder& _builder; + + BTreeTrailingInserter(BTreeBucketDatabase& db, + BTreeBucketDatabase::BTree::Builder& builder) + : _db(db), + _builder(builder) + {} + + ~BTreeTrailingInserter() override = default; + + void insert_at_end(const Entry& e) override { + const uint64_t bucket_key = e.getBucketId().toKey(); + const auto replicas_ref = _db._store.add(e.getBucketInfo().getRawNodes()); + const auto new_value = value_from(e.getBucketInfo().getLastGarbageCollectionTime(), replicas_ref); + _builder.insert(bucket_key, new_value); + } +}; + +// TODO lbound arg? +void BTreeBucketDatabase::merge(MergingProcessor& proc) { + BTreeBucketDatabase::BTree::Builder builder(_tree.getAllocator()); + BTreeBuilderMerger merger(*this, builder); + + // TODO for_each instead? + for (auto iter = _tree.begin(); iter.valid(); ++iter) { + const uint64_t key = iter.getKey(); + const uint64_t value = iter.getData(); + merger.update_iteration_state(key, value); + + auto result = proc.merge(merger); + + if (result == MergingProcessor::Result::KeepUnchanged) { + builder.insert(key, value); // Reuse array store ref with no changes + } else if (result == MergingProcessor::Result::Update) { + assert(merger._valid_cached_entry); // Must actually have been touched + assert(merger._cached_entry.valid()); + _store.remove(entry_ref_from_value(value)); + auto new_replicas_ref = _store.add(merger._cached_entry.getBucketInfo().getRawNodes()); + const auto new_value = value_from(merger._cached_entry.getBucketInfo().getLastGarbageCollectionTime(), new_replicas_ref); + builder.insert(key, new_value); + } else if (result == MergingProcessor::Result::Skip) { + _store.remove(entry_ref_from_value(value)); + } else { + abort(); + } + } + BTreeTrailingInserter inserter(*this, builder); + proc.insert_remaining_at_end(inserter); + + _tree.assign(builder); + commit_tree_changes(); } Entry BTreeBucketDatabase::upperBound(const BucketId& bucket) const { return entry_from_iterator(_tree.upperBound(bucket.toKey())); - } uint64_t BTreeBucketDatabase::size() const { diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h index d896f76db54..eb527071ad7 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h @@ -39,6 +39,8 @@ public: BTreeBucketDatabase(); ~BTreeBucketDatabase() override; + void merge(MergingProcessor&) override; + // Ye olde bucket DB API: Entry get(const document::BucketId& bucket) const override; void remove(const document::BucketId& bucket) override; @@ -48,7 +50,6 @@ public: std::vector<Entry>& entries) const override; void update(const Entry& newEntry) override; void forEach(EntryProcessor&, const document::BucketId& after) const override; - void forEach(MutableEntryProcessor&, const document::BucketId& after) override; Entry upperBound(const document::BucketId& value) const override; uint64_t size() const override; void clear() override; @@ -60,11 +61,17 @@ public: const std::string& indent) const override; private: + Entry entry_from_value(uint64_t bucket_key, uint64_t value) const; Entry entry_from_iterator(const BTree::ConstIterator& iter) const; + ConstEntryRef const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const; document::BucketId bucket_from_valid_iterator(const BTree::ConstIterator& iter) const; void commit_tree_changes(); BTree::ConstIterator find_parents_internal(const document::BucketId& bucket, std::vector<Entry>& entries) const; + + friend struct BTreeBuilderMerger; + friend struct BTreeMergingBuilder; + friend struct BTreeTrailingInserter; }; } diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp index 8a4aadfa5be..f2a561b83eb 100644 --- a/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp @@ -4,17 +4,6 @@ namespace storage { -namespace { - struct GetNextEntryProcessor : public BucketDatabase::EntryProcessor { - BucketDatabase::Entry _entry; - - bool process(const BucketDatabase::Entry& e) override { - _entry = e; - return false; - } - }; -} - BucketDatabase::Entry BucketDatabase::getNext(const document::BucketId& last) const { @@ -32,7 +21,8 @@ BucketDatabase::createAppropriateBucket( return e; } -std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e) +template <typename BucketInfoType> +std::ostream& operator<<(std::ostream& o, const BucketDatabase::EntryBase<BucketInfoType>& e) { if (!e.valid()) { o << "NONEXISTING"; @@ -42,12 +32,19 @@ std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e) return o; } +template <typename BucketInfoType> std::string -BucketDatabase::Entry::toString() const +BucketDatabase::EntryBase<BucketInfoType>::toString() const { std::ostringstream ost; ost << *this; return ost.str(); } +template std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e); +template std::ostream& operator<<(std::ostream& o, const BucketDatabase::ConstEntryRef& e); + +template class BucketDatabase::EntryBase<BucketInfo>; +template class BucketDatabase::EntryBase<ConstBucketInfoRef>; + } diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.h b/storage/src/vespa/storage/bucketdb/bucketdatabase.h index a5a70c488ca..d12c2d82972 100644 --- a/storage/src/vespa/storage/bucketdb/bucketdatabase.h +++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.h @@ -13,42 +13,48 @@ namespace storage { class BucketDatabase : public vespalib::Printable { public: - class Entry { + template <typename BucketInfoType> + class EntryBase { document::BucketId _bucketId; - BucketInfo _info; + BucketInfoType _info; public: - Entry() : _bucketId(0) {} // Invalid entry - Entry(const document::BucketId& bId, BucketInfo bucketInfo) + EntryBase() : _bucketId(0) {} // Invalid entry + EntryBase(const document::BucketId& bId, const BucketInfoType& bucketInfo) + : _bucketId(bId), _info(bucketInfo) {} + EntryBase(const document::BucketId& bId, BucketInfoType&& bucketInfo) : _bucketId(bId), _info(std::move(bucketInfo)) {} - explicit Entry(const document::BucketId& bId) : _bucketId(bId) {} + explicit EntryBase(const document::BucketId& bId) : _bucketId(bId) {} - bool operator==(const Entry& other) const { + bool operator==(const EntryBase& other) const { return (_bucketId == other._bucketId && _info == other._info); } bool valid() const { return (_bucketId.getRawId() != 0); } std::string toString() const; const document::BucketId& getBucketId() const { return _bucketId; } - const BucketInfo& getBucketInfo() const { return _info; } - BucketInfo& getBucketInfo() { return _info; } - BucketInfo* operator->() { return &_info; } - const BucketInfo* operator->() const { return &_info; } + const BucketInfoType& getBucketInfo() const { return _info; } + BucketInfoType& getBucketInfo() { return _info; } + BucketInfoType* operator->() { return &_info; } + const BucketInfoType* operator->() const { return &_info; } - static Entry createInvalid() { - return Entry(); + static EntryBase createInvalid() { + return EntryBase(); } }; - template<typename T> struct Processor { - virtual ~Processor() {} + using Entry = EntryBase<BucketInfo>; + // TODO avoid needing to touch memory just to get to the const entry ref + // TODO -> lazy ID to ConstArrayRef resolve + using ConstEntryRef = EntryBase<ConstBucketInfoRef>; + + struct EntryProcessor { + virtual ~EntryProcessor() = default; /** Return false to stop iterating. */ - virtual bool process(T& e) = 0; + virtual bool process(const ConstEntryRef& e) = 0; }; - typedef Processor<const Entry> EntryProcessor; - typedef Processor<Entry> MutableEntryProcessor; - virtual ~BucketDatabase() {} + ~BucketDatabase() override = default; virtual Entry get(const document::BucketId& bucket) const = 0; virtual void remove(const document::BucketId& bucket) = 0; @@ -76,9 +82,123 @@ public: virtual void forEach( EntryProcessor&, const document::BucketId& after = document::BucketId()) const = 0; - virtual void forEach( - MutableEntryProcessor&, - const document::BucketId& after = document::BucketId()) = 0; + + /** + * Database implementation-specific interface for appending entries + * during a merge() operation. + */ + struct TrailingInserter { + virtual ~TrailingInserter() = default; + /** + * Insert a new database entry at the end of the current bucket space. + * + * Precondition: the entry's bucket ID must sort after all entries that + * have already been iterated over or inserted via insert_at_end(). + */ + virtual void insert_at_end(const Entry&) = 0; + }; + + /** + * Database implementation-specific interface for accessing bucket + * entries and prepending entries during a merge() operation. + */ + struct Merger { + virtual ~Merger() = default; + + // TODO this should ideally be separated into read/write functions, but this + // will suffice for now to avoid too many changes. + + /** + * Bucket key/ID of the currently iterated entry. Unless the information stored + * in the DB Entry is needed, using one of these methods should be preferred to + * getting the bucket ID via current_entry(). The underlying DB is expected to + * have cheap access to the ID but _may_ have expensive access to the entry itself. + */ + virtual uint64_t bucket_key() const noexcept = 0; + virtual document::BucketId bucket_id() const noexcept = 0; + /** + * Returns a mutable representation of the currently iterated database + * entry. If changes are made to this object, Result::Update must be + * returned from merge(). Otherwise, mutation visibility is undefined. + */ + virtual Entry& current_entry() = 0; + /** + * Insert a new entry into the bucket database that is ordered before the + * currently iterated entry. + * + * Preconditions: + * - The entry's bucket ID must sort _before_ the currently iterated + * entry's bucket ID, in "reversed bits" bucket key order. + * - The entry's bucket ID must sort _after_ any entries previously + * inserted with insert_before_current(). + * - The entry's bucket ID must not be the same as a bucket that was + * already iterated over as part of the DB merge() call or inserted + * via a previous call to insert_before_current(). + * Such buckets must be handled by explicitly updating the provided + * entry for the iterated bucket and returning Result::Update. + */ + virtual void insert_before_current(const Entry&) = 0; + }; + + /** + * Interface to be implemented by callers that wish to receive callbacks + * during a bucket merge() operation. + */ + struct MergingProcessor { + // See merge() for semantics on enum values. + enum class Result { + Update, + KeepUnchanged, + Skip + }; + + virtual ~MergingProcessor() = default; + /** + * Invoked for each existing bucket in the database, in bucket key order. + * The provided Merge instance may be used to access the current entry + * and prepend entries to the DB. + * + * Return value semantics: + * - Result::Update: + * when merge() returns, the changes made to the current entry will + * become visible in the bucket database. + * - Result::KeepUnchanged: + * when merge() returns, the entry will remain in the same state as + * it was when merge() was originally called. + * - Result::Skip: + * when merge() returns, the entry will no longer be part of the DB. + * Any entries added via insert_before_current() _will_ be present. + * + */ + virtual Result merge(Merger&) = 0; + /** + * Invoked once after all existing buckets have been iterated over. + * The provided TrailingInserter instance may be used to append + * an arbitrary number of entries to the database. + * + * This is used to handle elements remaining at the end of a linear + * merge operation. + */ + virtual void insert_remaining_at_end(TrailingInserter&) {} + }; + + /** + * Iterate over the bucket database in bucket key order, allowing an arbitrary + * number of buckets to be inserted, updated and skipped in a way that is + * optimized for the backing DB implementation. + * + * Merging happens in two stages: + * 1) The MergeProcessor argument's merge() function is invoked for each existing + * bucket in the database. At this point new buckets ordered before the iterated + * bucket may be inserted and the iterated bucket may be skipped or updated. + * 2) The MergeProcessor argument's insert_remaining_at_end() function is invoked + * once when all buckets have been iterated over. This enables the caller to + * insert new buckets that sort after the last iterated bucket. + * + * Changes made to the database are not guaranteed to be visible until + * merge() returns. + */ + virtual void merge(MergingProcessor&) = 0; /** * Get the first bucket that does _not_ compare less than or equal to @@ -114,6 +234,7 @@ public: virtual uint32_t childCount(const document::BucketId&) const = 0; }; -std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e); +template <typename BucketInfoType> +std::ostream& operator<<(std::ostream& o, const BucketDatabase::EntryBase<BucketInfoType>& e); } // storage diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp index 82fc6686c5f..17efa1658ce 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp @@ -1,18 +1,18 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketinfo.h" +#include "bucketinfo.hpp" #include <vespa/storage/storageutil/utils.h> #include <algorithm> namespace storage { -BucketInfo::BucketInfo() - : _lastGarbageCollection(0), - _nodes() -{} +template class BucketInfoBase<std::vector<BucketCopy>>; +template class BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>>; + +BucketInfo::BucketInfo() : BucketInfoBase() {} BucketInfo::BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes) - : _lastGarbageCollection(lastGarbageCollection), - _nodes(std::move(nodes)) + : BucketInfoBase(lastGarbageCollection, std::move(nodes)) {} BucketInfo::~BucketInfo() = default; @@ -22,38 +22,6 @@ BucketInfo& BucketInfo::operator=(const BucketInfo&) = default; BucketInfo::BucketInfo(BucketInfo&&) noexcept = default; BucketInfo& BucketInfo::operator=(BucketInfo&&) noexcept = default; -std::string -BucketInfo::toString() const { - std::ostringstream ost; - print(ost, true, ""); - return ost.str(); -} - -bool -BucketInfo::emptyAndConsistent() const { - for (uint32_t i = 0; i < _nodes.size(); i++) { - if (!_nodes[i].empty()) return false; - } - return consistentNodes(); -} - -bool -BucketInfo::validAndConsistent() const { - for (uint32_t i = 0; i < _nodes.size(); i++) { - if (!_nodes[i].valid()) return false; - } - return consistentNodes(); -} - -bool -BucketInfo::hasInvalidCopy() const -{ - for (uint32_t i = 0; i < _nodes.size(); i++) { - if (!_nodes[i].valid()) return true; - } - return false; -} - void BucketInfo::updateTrusted() { if (validAndConsistent()) { @@ -90,40 +58,6 @@ BucketInfo::resetTrusted() { updateTrusted(); } -uint16_t -BucketInfo::getTrustedCount() const { - uint32_t trustedCount = 0; - for (uint32_t i = 0; i < _nodes.size(); i++) { - if (_nodes[i].trusted()) { - trustedCount++; - } - } - return trustedCount; -} - -bool -BucketInfo::consistentNodes(bool countInvalidAsConsistent) const -{ - int compareIndex = 0; - for (uint32_t i=1; i<_nodes.size(); i++) { - if (!_nodes[i].consistentWith(_nodes[compareIndex], - countInvalidAsConsistent)) return false; - } - return true; -} - -void -BucketInfo::print(std::ostream& out, bool verbose, const std::string& indent) const -{ - if (_nodes.empty()) { - out << "no nodes"; - } - for (uint32_t i=0; i<_nodes.size(); ++i) { - if (i != 0) out << ", "; - _nodes[i].print(out, verbose, indent); - } -} - namespace { struct Sorter { @@ -225,19 +159,6 @@ BucketInfo::removeNode(unsigned short node, TrustedUpdate update) return false; } -const BucketCopy* -BucketInfo::getNode(uint16_t node) const -{ - for (std::vector<BucketCopy>::const_iterator iter = _nodes.begin(); - iter != _nodes.end(); - iter++) { - if (iter->getNode() == node) { - return &*iter; - } - } - return 0; -} - BucketCopy* BucketInfo::getNodeInternal(uint16_t node) { @@ -251,92 +172,4 @@ BucketInfo::getNodeInternal(uint16_t node) return 0; } -std::vector<uint16_t> -BucketInfo::getNodes() const { - std::vector<uint16_t> result; - - for (uint32_t i = 0; i < _nodes.size(); i++) { - result.push_back(_nodes[i].getNode()); - } - - return result; -} - -uint32_t -BucketInfo::getHighestDocumentCount() const -{ - uint32_t highest = 0; - for (uint32_t i = 0; i < _nodes.size(); ++i) { - highest = std::max(highest, _nodes[i].getDocumentCount()); - } - return highest; -} - -uint32_t -BucketInfo::getHighestTotalDocumentSize() const -{ - uint32_t highest = 0; - for (uint32_t i = 0; i < _nodes.size(); ++i) { - highest = std::max(highest, _nodes[i].getTotalDocumentSize()); - } - return highest; -} - -uint32_t -BucketInfo::getHighestMetaCount() const -{ - uint32_t highest = 0; - for (uint32_t i = 0; i < _nodes.size(); ++i) { - highest = std::max(highest, _nodes[i].getMetaCount()); - } - return highest; -} - -uint32_t -BucketInfo::getHighestUsedFileSize() const -{ - uint32_t highest = 0; - for (uint32_t i = 0; i < _nodes.size(); ++i) { - highest = std::max(highest, _nodes[i].getUsedFileSize()); - } - return highest; -} - -bool -BucketInfo::hasRecentlyCreatedEmptyCopy() const -{ - for (uint32_t i = 0; i < _nodes.size(); ++i) { - if (_nodes[i].wasRecentlyCreated()) { - return true; - } - } - return false; -} - -bool -BucketInfo::operator==(const BucketInfo& other) const -{ - if (_nodes.size() != other._nodes.size()) { - return false; - } - - for (uint32_t i = 0; i < _nodes.size(); ++i) { - if (_nodes[i].getNode() != other._nodes[i].getNode()) { - return false; - } - - if (!(_nodes[i] == other._nodes[i])) { - return false; - } - } - - return true; -}; - -std::ostream& -operator<<(std::ostream& out, const BucketInfo& info) { - info.print(out, false, ""); - return out; -} - } diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h index 8949c0f10da..f0be700d204 100644 --- a/storage/src/vespa/storage/bucketdb/bucketinfo.h +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h @@ -2,6 +2,7 @@ #pragma once #include "bucketcopy.h" +#include <vespa/vespalib/util/arrayref.h> namespace storage { @@ -14,45 +15,37 @@ enum class TrustedUpdate { DEFER }; -class BucketInfo +template <typename NodeSeq> +class BucketInfoBase { -private: +protected: uint32_t _lastGarbageCollection; - std::vector<BucketCopy> _nodes; - + NodeSeq _nodes; public: - BucketInfo(); - BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes); - ~BucketInfo(); - - BucketInfo(const BucketInfo&); - BucketInfo& operator=(const BucketInfo&); - BucketInfo(BucketInfo&&) noexcept; - BucketInfo& operator=(BucketInfo&&) noexcept; + BucketInfoBase() + : _lastGarbageCollection(0), + _nodes() + {} + BucketInfoBase(uint32_t lastGarbageCollection, const NodeSeq& nodes) + : _lastGarbageCollection(lastGarbageCollection), + _nodes(nodes) + {} + BucketInfoBase(uint32_t lastGarbageCollection, NodeSeq&& nodes) + : _lastGarbageCollection(lastGarbageCollection), + _nodes(std::move(nodes)) + {} + ~BucketInfoBase() = default; + + BucketInfoBase(const BucketInfoBase&) = default; + BucketInfoBase& operator=(const BucketInfoBase&) = default; + BucketInfoBase(BucketInfoBase&&) noexcept = default; + BucketInfoBase& operator=(BucketInfoBase&&) noexcept = default; /** * @return Returns the last time when this bucket was "garbage collected". */ uint32_t getLastGarbageCollectionTime() const { return _lastGarbageCollection; } - /** - * Sets the last time the bucket was "garbage collected". - */ - void setLastGarbageCollectionTime(uint32_t timestamp) { - _lastGarbageCollection = timestamp; - } - - /** - Update trusted flags if bucket is now complete and consistent. - */ - void updateTrusted(); - - /** - Removes any historical information on trustedness, and sets the bucket copies to - trusted if they are now complete and consistent. - */ - void resetTrusted(); - /** True if the bucket contains no documents and is consistent. */ bool emptyAndConsistent() const; @@ -84,10 +77,89 @@ public: */ bool consistentNodes(bool countInvalidAsConsistent = false) const; - static bool mayContain(const BucketInfo&) { return true; } void print(std::ostream&, bool verbose, const std::string& indent) const; /** + * Returns the bucket copy struct for the given node, null if nonexisting + */ + const BucketCopy* getNode(uint16_t node) const; + + /** + * Returns the number of nodes this entry has. + */ + uint32_t getNodeCount() const noexcept { return static_cast<uint32_t>(_nodes.size()); } + + /** + * Returns a list of the nodes this entry has. + */ + std::vector<uint16_t> getNodes() const; + + /** + Returns a reference to the node with the given index in the node + array. This operation has undefined behaviour if the index given + is not within the node count. + */ + const BucketCopy& getNodeRef(uint16_t idx) const { + return _nodes[idx]; + } + + const NodeSeq& getRawNodes() const noexcept { + return _nodes; + } + + std::string toString() const; + + uint32_t getHighestDocumentCount() const; + uint32_t getHighestTotalDocumentSize() const; + uint32_t getHighestMetaCount() const; + uint32_t getHighestUsedFileSize() const; + + bool hasRecentlyCreatedEmptyCopy() const; + + bool operator==(const BucketInfoBase& other) const; +}; + +template <typename NodeSeq> +std::ostream& operator<<(std::ostream& out, const BucketInfoBase<NodeSeq>& info) { + info.print(out, false, ""); + return out; +} + +class ConstBucketInfoRef : public BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>> { +public: + using BucketInfoBase::BucketInfoBase; +}; + +class BucketInfo : public BucketInfoBase<std::vector<BucketCopy>> { +public: + BucketInfo(); + BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes); + ~BucketInfo(); + + BucketInfo(const BucketInfo&); + BucketInfo& operator=(const BucketInfo&); + BucketInfo(BucketInfo&&) noexcept; + BucketInfo& operator=(BucketInfo&&) noexcept; + + /** + * Sets the last time the bucket was "garbage collected". + */ + void setLastGarbageCollectionTime(uint32_t timestamp) { + _lastGarbageCollection = timestamp; + } + + /** + Update trusted flags if bucket is now complete and consistent. + */ + void updateTrusted(); + + /** + Removes any historical information on trustedness, and sets the bucket copies to + trusted if they are now complete and consistent. + */ + void resetTrusted(); + + /** Adds the given node. @param recommendedOrder A recommended ordering of nodes. @@ -118,34 +190,6 @@ public: */ bool removeNode(uint16_t node, TrustedUpdate update = TrustedUpdate::UPDATE); - /** - * Returns the bucket copy struct for the given node, null if nonexisting - */ - const BucketCopy* getNode(uint16_t node) const; - - /** - * Returns the number of nodes this entry has. - */ - uint32_t getNodeCount() const noexcept { return static_cast<uint32_t>(_nodes.size()); } - - /** - * Returns a list of the nodes this entry has. - */ - std::vector<uint16_t> getNodes() const; - - /** - Returns a reference to the node with the given index in the node - array. This operation has undefined behaviour if the index given - is not within the node count. - */ - const BucketCopy& getNodeRef(uint16_t idx) const { - return _nodes[idx]; - } - - const std::vector<BucketCopy>& getRawNodes() const noexcept { - return _nodes; - } - void clearTrusted(uint16_t nodeIdx) { getNodeInternal(nodeIdx)->clearTrusted(); } @@ -155,19 +199,6 @@ public: */ void clear() { _nodes.clear(); } - std::string toString() const; - - bool verifyLegal() const { return true; } - - uint32_t getHighestDocumentCount() const; - uint32_t getHighestTotalDocumentSize() const; - uint32_t getHighestMetaCount() const; - uint32_t getHighestUsedFileSize() const; - - bool hasRecentlyCreatedEmptyCopy() const; - - bool operator==(const BucketInfo& other) const; - private: friend class distributor::DistributorTestUtil; @@ -183,7 +214,8 @@ private: void addNodeManual(const BucketCopy& newCopy) { _nodes.push_back(newCopy); } }; -std::ostream& operator<<(std::ostream& out, const BucketInfo& info); +extern template class BucketInfoBase<std::vector<BucketCopy>>; +extern template class BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>>; } diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp new file mode 100644 index 00000000000..562e8d562fb --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp @@ -0,0 +1,165 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketcopy.h" +#include <vespa/vespalib/util/arrayref.h> +#include <iostream> +#include <sstream> + +namespace storage { + +template <typename NodeSeq> +std::string BucketInfoBase<NodeSeq>::toString() const { + std::ostringstream ost; + print(ost, true, ""); + return ost.str(); +} + +template <typename NodeSeq> +bool BucketInfoBase<NodeSeq>::emptyAndConsistent() const { + for (uint32_t i = 0; i < _nodes.size(); i++) { + if (!_nodes[i].empty()) { + return false; + } + } + return consistentNodes(); +} + +template <typename NodeSeq> +bool BucketInfoBase<NodeSeq>::validAndConsistent() const { + for (uint32_t i = 0; i < _nodes.size(); i++) { + if (!_nodes[i].valid()) { + return false; + } + } + return consistentNodes(); +} + +template <typename NodeSeq> +bool BucketInfoBase<NodeSeq>::hasInvalidCopy() const { + for (uint32_t i = 0; i < _nodes.size(); i++) { + if (!_nodes[i].valid()) { + return true; + } + } + return false; +} + +template <typename NodeSeq> +uint16_t BucketInfoBase<NodeSeq>::getTrustedCount() const { + uint32_t trustedCount = 0; + for (uint32_t i = 0; i < _nodes.size(); i++) { + if (_nodes[i].trusted()) { + trustedCount++; + } + } + return trustedCount; +} + +template <typename NodeSeq> +bool BucketInfoBase<NodeSeq>::consistentNodes(bool countInvalidAsConsistent) const { + int compareIndex = 0; + for (uint32_t i = 1; i < _nodes.size(); i++) { + if (!_nodes[i].consistentWith(_nodes[compareIndex], + countInvalidAsConsistent)) return false; + } + return true; +} + +template <typename NodeSeq> +void BucketInfoBase<NodeSeq>::print(std::ostream& out, bool verbose, const std::string& indent) const { + if (_nodes.size() == 0) { + out << "no nodes"; + } + for (uint32_t i = 0; i < _nodes.size(); ++i) { + if (i != 0) { + out << ", "; + } + _nodes[i].print(out, verbose, indent); + } +} + +template <typename NodeSeq> +const BucketCopy* BucketInfoBase<NodeSeq>::getNode(uint16_t node) const { + for (const auto& n : _nodes) { + if (n.getNode() == node) { + return &n; + } + } + return 0; +} + +template <typename NodeSeq> +std::vector<uint16_t> BucketInfoBase<NodeSeq>::getNodes() const { + std::vector<uint16_t> result; + for (uint32_t i = 0; i < _nodes.size(); i++) { + result.emplace_back(_nodes[i].getNode()); + } + return result; +} + +template <typename NodeSeq> +uint32_t BucketInfoBase<NodeSeq>::getHighestDocumentCount() const { + uint32_t highest = 0; + for (uint32_t i = 0; i < _nodes.size(); ++i) { + highest = std::max(highest, _nodes[i].getDocumentCount()); + } + return highest; +} + +template <typename NodeSeq> +uint32_t BucketInfoBase<NodeSeq>::getHighestTotalDocumentSize() const { + uint32_t highest = 0; + for (uint32_t i = 0; i < _nodes.size(); ++i) { + highest = std::max(highest, _nodes[i].getTotalDocumentSize()); + } + return highest; +} + +template <typename NodeSeq> +uint32_t BucketInfoBase<NodeSeq>::getHighestMetaCount() const { + uint32_t highest = 0; + for (uint32_t i = 0; i < _nodes.size(); ++i) { + highest = std::max(highest, _nodes[i].getMetaCount()); + } + return highest; +} + +template <typename NodeSeq> +uint32_t BucketInfoBase<NodeSeq>::getHighestUsedFileSize() const { + uint32_t highest = 0; + for (uint32_t i = 0; i < _nodes.size(); ++i) { + highest = std::max(highest, _nodes[i].getUsedFileSize()); + } + return highest; +} + +template <typename NodeSeq> +bool BucketInfoBase<NodeSeq>::hasRecentlyCreatedEmptyCopy() const { + for (uint32_t i = 0; i < _nodes.size(); ++i) { + if (_nodes[i].wasRecentlyCreated()) { + return true; + } + } + return false; +} + +template <typename NodeSeq> +bool BucketInfoBase<NodeSeq>::operator==(const BucketInfoBase<NodeSeq>& other) const { + if (_nodes.size() != other._nodes.size()) { + return false; + } + + for (uint32_t i = 0; i < _nodes.size(); ++i) { + if (_nodes[i].getNode() != other._nodes[i].getNode()) { + return false; + } + + if (!(_nodes[i] == other._nodes[i])) { + return false; + } + } + + return true; +}; + +} diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp index ad56a4083f8..9dad421324e 100644 --- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp +++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp @@ -152,21 +152,21 @@ void __attribute__((noinline)) log_empty_bucket_insertion(const document::Bucket } +template <typename EntryType> +void MapBucketDatabase::update_internal(EntryType&& new_entry) { + assert(new_entry.valid()); + if (new_entry->getNodeCount() == 0) { + log_empty_bucket_insertion(new_entry.getBucketId()); + } + Entry* found = find(0, 0, new_entry.getBucketId(), true); + assert(found); + *found = std::forward<EntryType>(new_entry); +} + void MapBucketDatabase::update(const Entry& newEntry) { - assert(newEntry.valid()); - if (newEntry->getNodeCount() == 0) { - log_empty_bucket_insertion(newEntry.getBucketId()); - } - LOG_BUCKET_OPERATION_NO_LOCK( - newEntry.getBucketId(), - vespalib::make_string( - "bucketdb insert of %s", newEntry.toString().c_str())); - - Entry* found = find(0, 0, newEntry.getBucketId(), true); - assert(found); - *found = newEntry; + update_internal(newEntry); } void @@ -334,20 +334,32 @@ MapBucketDatabase::upperBound(const document::BucketId& value) const return Entry::createInvalid(); } -template <typename EntryProcessorType> +namespace { + +inline BucketDatabase::ConstEntryRef +to_entry_ref(const BucketDatabase::Entry& e) { + return BucketDatabase::ConstEntryRef( + e.getBucketId(), + ConstBucketInfoRef(e->getLastGarbageCollectionTime(), e->getRawNodes())); +} + +} + bool MapBucketDatabase::forEach(int index, - EntryProcessorType& processor, + EntryProcessor& processor, uint8_t bitCount, const document::BucketId& lowerBound, - bool& process) + bool& process) const { if (index == -1) { return true; } - E& e = _db[index]; - if (e.value != -1 && process && !processor.process(_values[e.value])) { + const E& e = _db[index]; + if (e.value != -1 && process + && !processor.process(to_entry_ref(_values[e.value]))) + { return false; } @@ -377,16 +389,82 @@ MapBucketDatabase::forEach(EntryProcessor& processor, const document::BucketId& after) const { bool process = false; - MapBucketDatabase& mutableSelf(const_cast<MapBucketDatabase&>(*this)); - mutableSelf.forEach(0, processor, 0, after, process); + forEach(0, processor, 0, after, process); } -void -MapBucketDatabase::forEach(MutableEntryProcessor& processor, - const document::BucketId& after) +struct MapDbMerger final : BucketDatabase::Merger { + MapBucketDatabase& _db; + BucketDatabase::Entry& _current_entry; + std::vector<BucketDatabase::Entry>& _to_insert; + + MapDbMerger(MapBucketDatabase& db, + BucketDatabase::Entry& current_entry, + std::vector<BucketDatabase::Entry>& to_insert) + : _db(db), + _current_entry(current_entry), + _to_insert(to_insert) + {} + + uint64_t bucket_key() const noexcept override { + return _current_entry.getBucketId().toKey(); + } + document::BucketId bucket_id() const noexcept override { + return _current_entry.getBucketId(); + } + BucketDatabase::Entry& current_entry() override { + return _current_entry; + } + void insert_before_current(const BucketDatabase::Entry& e) override { + _to_insert.emplace_back(e); // TODO movable + } +}; + +struct MapDbTrailingInserter final : BucketDatabase::TrailingInserter { + MapBucketDatabase& _db; + explicit MapDbTrailingInserter(MapBucketDatabase& db) : _db(db) {} + + void insert_at_end(const BucketDatabase::Entry& e) override { + _db.update(e); + } +}; + +void MapBucketDatabase::merge_internal(int index, + MergingProcessor& processor, + std::vector<Entry>& to_insert, + std::vector<document::BucketId>& to_remove) { - bool process = false; - forEach(0, processor, 0, after, process); + if (index == -1) { + return; + } + E& e = _db[index]; + if (e.value != -1) { + Entry& entry = _values[e.value]; + MapDbMerger merger(*this, entry, to_insert); + auto result = processor.merge(merger); + if (result == MergingProcessor::Result::KeepUnchanged) { + // No-op + } else if (result == MergingProcessor::Result::Update) { + // Also no-op since it's all in-place + } else if (result == MergingProcessor::Result::Skip) { + to_remove.emplace_back(entry.getBucketId()); + } + } + merge_internal(e.e_0, processor, to_insert, to_remove); + merge_internal(e.e_1, processor, to_insert, to_remove); +} + +void MapBucketDatabase::merge(MergingProcessor& processor) { + std::vector<document::BucketId> to_remove; + std::vector<Entry> to_insert; + merge_internal(0, processor, to_insert, to_remove); + for (const auto& bucket : to_remove) { + remove(bucket); + } + for (auto& e : to_insert) { + update_internal(std::move(e)); + } + MapDbTrailingInserter inserter(*this); + processor.insert_remaining_at_end(inserter); } void @@ -482,8 +560,8 @@ MapBucketDatabase::childCount(const document::BucketId& b) const namespace { struct Writer : public BucketDatabase::EntryProcessor { std::ostream& _ost; - Writer(std::ostream& ost) : _ost(ost) {} - bool process(const BucketDatabase::Entry& e) override { + explicit Writer(std::ostream& ost) : _ost(ost) {} + bool process(const BucketDatabase::ConstEntryRef& e) override { _ost << e.toString() << "\n"; return true; } @@ -494,37 +572,16 @@ void MapBucketDatabase::print(std::ostream& out, bool verbose, const std::string& indent) const { + out << "MapBucketDatabase("; (void) indent; if (verbose) { Writer writer(out); forEach(writer); - /* Write out all the gory details to debug - out << "Entries {"; - for (uint32_t i=0, n=_db.size(); i<n; ++i) { - out << "\n" << indent << " " << _db[i].e_0 << "," << _db[i].e_1 - << "," << _db[i].value; - } - out << "\n" << indent << "}"; - out << "Free {"; - for (uint32_t i=0, n=_free.size(); i<n; ++i) { - out << "\n" << indent << " " << _free[i]; - } - out << "\n" << indent << "}"; - out << "Entries {"; - for (uint32_t i=0, n=_values.size(); i<n; ++i) { - out << "\n" << indent << " " << _values[i]; - } - out << "\n" << indent << "}"; - out << "Free {"; - for (uint32_t i=0, n=_freeValues.size(); i<n; ++i) { - out << "\n" << indent << " " << _freeValues[i]; - } - out << "\n" << indent << "}"; - */ } else { out << "Size(" << size() << ") Nodes(" << (_db.size() - _free.size() - 1) << ")"; } + out << ')'; } } // storage diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h index a7a38f1d978..df3e055bd7a 100644 --- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h +++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h @@ -18,16 +18,16 @@ public: void getAll(const document::BucketId& bucket, std::vector<Entry>& entries) const override; void update(const Entry& newEntry) override; void forEach(EntryProcessor&, const document::BucketId& after = document::BucketId()) const override; - void forEach(MutableEntryProcessor&, const document::BucketId& after = document::BucketId()) override; uint64_t size() const override { return _values.size() - _freeValues.size(); }; void clear() override; uint32_t childCount(const document::BucketId&) const override; Entry upperBound(const document::BucketId& value) const override; + void merge(MergingProcessor&) override; + document::BucketId getAppropriateBucket(uint16_t minBits, const document::BucketId& bid) override; void print(std::ostream& out, bool verbose, const std::string& indent) const override; - private: struct E { E() : value(-1), e_0(-1), e_1(-1) {}; @@ -42,14 +42,20 @@ private: int e_1; }; + template <typename EntryType> + void update_internal(EntryType&& new_entry); + BucketDatabase::Entry* find(int idx, uint8_t bitCount, const document::BucketId& bid, bool create); bool remove(int index, uint8_t bitCount, const document::BucketId& bId); int findFirstInOrderNodeInclusive(int index) const; int upperBoundImpl(int index, uint8_t depth, const document::BucketId& value) const; - template <typename EntryProcessorType> - bool forEach(int index, EntryProcessorType& processor, uint8_t bitCount, - const document::BucketId& lowerBound, bool& process); + bool forEach(int index, EntryProcessor& processor, uint8_t bitCount, + const document::BucketId& lowerBound, bool& process) const; + + void merge_internal(int index, MergingProcessor& processor, + std::vector<Entry>& to_insert, + std::vector<document::BucketId>& to_remove); void findParents(int index, uint8_t bitCount, const document::BucketId& bid, std::vector<Entry>& entries) const; void findAll(int index, uint8_t bitCount, const document::BucketId& bid, std::vector<Entry>& entries) const; diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 33fa80ab484..d349164e8ed 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -4,8 +4,8 @@ #include "bucket_db_prune_elision.h" #include "distributor.h" #include "distributor_bucket_space.h" -#include "simpleclusterinformation.h" #include "distributormetricsset.h" +#include "simpleclusterinformation.h" #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> @@ -152,26 +152,20 @@ BucketDBUpdater::removeSuperfluousBuckets( // Remove all buckets not belonging to this distributor, or // being on storage nodes that are no longer up. - NodeRemover proc( + MergingNodeRemover proc( oldClusterState, *new_cluster_state, _distributorComponent.getIndex(), newDistribution, - up_states); - bucketDb.forEach(proc); - - for (const auto & bucket : proc.getBucketsToRemove()) { - bucketDb.remove(bucket); + up_states, + move_to_read_only_db); + + bucketDb.merge(proc); + // Non-owned entries vector only has contents if move_to_read_only_db is true. + // TODO either rewrite in terms of merge() or remove entirely + for (const auto& db_entry : proc.getNonOwnedEntries()) { + readOnlyDb.update(db_entry); // TODO Entry move support } - // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory... - for (const auto& bucket : proc.getNonOwnedBuckets()) { - if (move_to_read_only_db) { - auto db_entry = bucketDb.get(bucket); - readOnlyDb.update(db_entry); // TODO Entry move support - } - bucketDb.remove(bucket); - } - } } @@ -217,7 +211,6 @@ BucketDBUpdater::storageDistributionChanged() std::move(clusterInfo), _sender, _distributorComponent.getBucketSpaceRepo(), - _distributorComponent.getReadOnlyBucketSpaceRepo(), _distributorComponent.getUniqueTimestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); } @@ -273,7 +266,6 @@ BucketDBUpdater::onSetSystemState( std::move(clusterInfo), _sender, _distributorComponent.getBucketSpaceRepo(), - _distributorComponent.getReadOnlyBucketSpaceRepo(), cmd, _outdatedNodesMap, _distributorComponent.getUniqueTimestamp()); @@ -740,40 +732,39 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos, return ""; } -bool -BucketDBUpdater::BucketListGenerator::process(BucketDatabase::Entry& e) -{ - document::BucketId bucketId(e.getBucketId()); - - const BucketCopy* copy(e->getNode(_node)); - if (copy) { - _entries.emplace_back(bucketId, copy->getBucketInfo()); - } - return true; -} - -BucketDBUpdater::NodeRemover::NodeRemover( +BucketDBUpdater::MergingNodeRemover::MergingNodeRemover( const lib::ClusterState& oldState, const lib::ClusterState& s, uint16_t localIndex, const lib::Distribution& distribution, - const char* upStates) + const char* upStates, + bool track_non_owned_entries) : _oldState(oldState), _state(s), + _available_nodes(), _nonOwnedBuckets(), - _removedBuckets(), + _removed_buckets(0), _localIndex(localIndex), _distribution(distribution), _upStates(upStates), + _track_non_owned_entries(track_non_owned_entries), _cachedDecisionSuperbucket(UINT64_MAX), _cachedOwned(false) -{} +{ + // TODO intersection of cluster state and distribution config + const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE); + _available_nodes.resize(storage_count); + for (uint16_t i = 0; i < storage_count; ++i) { + if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) { + _available_nodes[i] = true; + } + } +} void -BucketDBUpdater::NodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const +BucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const { LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg); - LOG_BUCKET_OPERATION_NO_LOCK(bucketId, msg); } namespace { @@ -786,7 +777,7 @@ uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution } bool -BucketDBUpdater::NodeRemover::distributorOwnsBucket( +BucketDBUpdater::MergingNodeRemover::distributorOwnsBucket( const document::BucketId& bucketId) const { // TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor. @@ -818,7 +809,7 @@ BucketDBUpdater::NodeRemover::distributorOwnsBucket( } void -BucketDBUpdater::NodeRemover::setCopiesInEntry( +BucketDBUpdater::MergingNodeRemover::setCopiesInEntry( BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const { @@ -830,77 +821,73 @@ BucketDBUpdater::NodeRemover::setCopiesInEntry( e->addNodes(copies, order); LOG(spam, "Changed %s", e->toString().c_str()); - LOG_BUCKET_OPERATION_NO_LOCK( - e.getBucketId(), - vespalib::make_string("updated bucketdb entry to %s", - e->toString().c_str())); } -void -BucketDBUpdater::NodeRemover::removeEmptyBucket(const document::BucketId& bucketId) +bool +BucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const { - _removedBuckets.push_back(bucketId); - - LOG(spam, - "After system state change %s, bucket %s now has no copies.", - _oldState.getTextualDifference(_state).c_str(), - bucketId.toString().c_str()); - LOG_BUCKET_OPERATION_NO_LOCK(bucketId, "bucket now has no copies"); + const uint16_t n_nodes = e->getNodeCount(); + for (uint16_t i = 0; i < n_nodes; i++) { + const uint16_t node_idx = e->getNodeRef(i).getNode(); + if (!storage_node_is_available(node_idx)) { + return true; + } + } + return false; } -bool -BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e) +BucketDatabase::MergingProcessor::Result +BucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger) { - const document::BucketId& bucketId(e.getBucketId()); + document::BucketId bucketId(merger.bucket_id()); + LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str()); + if (!distributorOwnsBucket(bucketId)) { + // TODO remove in favor of DB snapshotting + if (_track_non_owned_entries) { + _nonOwnedBuckets.emplace_back(merger.current_entry()); + } + return Result::Skip; + } + auto& e = merger.current_entry(); - LOG(spam, "Check for remove: bucket %s", e.toString().c_str()); - if (e->getNodeCount() == 0) { - removeEmptyBucket(e.getBucketId()); - return true; + if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger? + return Result::Skip; } - if (!distributorOwnsBucket(bucketId)) { - _nonOwnedBuckets.push_back(bucketId); - return true; + + if (!has_unavailable_nodes(e)) { + return Result::KeepUnchanged; } std::vector<BucketCopy> remainingCopies; for (uint16_t i = 0; i < e->getNodeCount(); i++) { - Node n(NodeType::STORAGE, e->getNodeRef(i).getNode()); - - // TODO replace with intersection hash set of config and cluster state - if (_state.getNodeState(n).getState().oneOf(_upStates)) { + const uint16_t node_idx = e->getNodeRef(i).getNode(); + if (storage_node_is_available(node_idx)) { remainingCopies.push_back(e->getNodeRef(i)); } } - if (remainingCopies.size() == e->getNodeCount()) { - return true; - } - if (remainingCopies.empty()) { - removeEmptyBucket(bucketId); + ++_removed_buckets; + return Result::Skip; } else { setCopiesInEntry(e, remainingCopies); + return Result::Update; } +} - return true; +bool +BucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept +{ + return ((index < _available_nodes.size()) && _available_nodes[index]); } -BucketDBUpdater::NodeRemover::~NodeRemover() +BucketDBUpdater::MergingNodeRemover::~MergingNodeRemover() { - if ( !_removedBuckets.empty()) { - std::ostringstream ost; - ost << "After system state change " - << _oldState.getTextualDifference(_state) << ", we removed " - << "buckets. Data is unavailable until node comes back up. " - << _removedBuckets.size() << " buckets removed:"; - for (uint32_t i=0; i < 10 && i < _removedBuckets.size(); ++i) { - ost << " " << _removedBuckets[i]; - } - if (_removedBuckets.size() >= 10) { - ost << " ..."; - } - LOGBM(info, "%s", ost.str().c_str()); + if (_removed_buckets != 0) { + LOGBM(info, "After cluster state change %s, %" PRIu64 " buckets no longer " + "have available replicas. Documents in these buckets will " + "be unavailable until nodes come back up", + _oldState.getTextualDifference(_state).c_str(), _removed_buckets); } } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index dbcf6699bdc..663ac488ef4 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -182,54 +182,43 @@ private: friend class BucketDBUpdater_Test; friend class MergeOperation_Test; - class BucketListGenerator - { - public: - BucketListGenerator(uint16_t node, BucketListMerger::BucketList& entries) - : _node(node), _entries(entries) {}; - - bool process(BucketDatabase::Entry&); - - private: - uint16_t _node; - BucketListMerger::BucketList& _entries; - }; - /** Removes all copies of buckets that are on nodes that are down. */ - class NodeRemover : public BucketDatabase::MutableEntryProcessor - { + class MergingNodeRemover : public BucketDatabase::MergingProcessor { public: - NodeRemover(const lib::ClusterState& oldState, - const lib::ClusterState& s, - uint16_t localIndex, - const lib::Distribution& distribution, - const char* upStates); - ~NodeRemover() override; - - bool process(BucketDatabase::Entry& e) override; + MergingNodeRemover(const lib::ClusterState& oldState, + const lib::ClusterState& s, + uint16_t localIndex, + const lib::Distribution& distribution, + const char* upStates, + bool track_non_owned_entries); + ~MergingNodeRemover() override; + + Result merge(BucketDatabase::Merger&) override; void logRemove(const document::BucketId& bucketId, const char* msg) const; bool distributorOwnsBucket(const document::BucketId&) const; - const std::vector<document::BucketId>& getBucketsToRemove() const noexcept { - return _removedBuckets; - } - const std::vector<document::BucketId>& getNonOwnedBuckets() const noexcept { + // TODO this is temporary until explicit DB snapshotting replaces read-only DB usage + const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept { return _nonOwnedBuckets; } private: void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const; - void removeEmptyBucket(const document::BucketId& bucketId); + + bool has_unavailable_nodes(const BucketDatabase::Entry&) const; + bool storage_node_is_available(uint16_t index) const noexcept; const lib::ClusterState _oldState; const lib::ClusterState _state; - std::vector<document::BucketId> _nonOwnedBuckets; - std::vector<document::BucketId> _removedBuckets; + std::vector<bool> _available_nodes; + std::vector<BucketDatabase::Entry> _nonOwnedBuckets; + size_t _removed_buckets; uint16_t _localIndex; const lib::Distribution& _distribution; const char* _upStates; + bool _track_non_owned_entries; mutable uint64_t _cachedDecisionSuperbucket; mutable bool _cachedOwned; diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 3237b4b4d71..96359bcec60 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -114,7 +114,9 @@ public: * Checks whether a bucket needs to be split, and sends a split * if so. */ - void checkBucketForSplit(document::BucketSpace bucketSpace, const BucketDatabase::Entry& e, uint8_t priority) override; + void checkBucketForSplit(document::BucketSpace bucketSpace, + const BucketDatabase::Entry& e, + uint8_t priority) override; const lib::ClusterStateBundle& getClusterStateBundle() const override; diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp index 5a1ff31e2e7..5ae5d8dc3f8 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp @@ -244,7 +244,7 @@ IdealStateManager::generateAll(const document::Bucket &bucket, void IdealStateManager::getBucketStatus( BucketSpace bucketSpace, - const BucketDatabase::Entry& entry, + const BucketDatabase::ConstEntryRef& entry, NodeMaintenanceStatsTracker& statsTracker, std::ostream& out) const { diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h index 3bb6d0dd757..8566c67a51b 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.h +++ b/storage/src/vespa/storage/distributor/idealstatemanager.h @@ -128,14 +128,14 @@ private: StatusBucketVisitor(const IdealStateManager& ism, document::BucketSpace bucketSpace, std::ostream& out) : _statsTracker(), _ism(ism), _bucketSpace(bucketSpace), _out(out) {} - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { _ism.getBucketStatus(_bucketSpace, e, _statsTracker, _out); return true; } }; friend class StatusBucketVisitor; - void getBucketStatus(document::BucketSpace bucketSpace, const BucketDatabase::Entry& entry, + void getBucketStatus(document::BucketSpace bucketSpace, const BucketDatabase::ConstEntryRef& entry, NodeMaintenanceStatsTracker& statsTracker, std::ostream& out) const; void dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const; }; diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index cd18148a662..a89dd52775b 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -483,7 +483,7 @@ struct NextEntryFinder : public BucketDatabase::EntryProcessor { NextEntryFinder(const document::BucketId& id) : _first(true), _last(id), _next() {} - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { document::BucketId bucket(e.getBucketId()); if (_first && bucket == _last) { diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index 216646f0345..13f9c21eed0 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -131,24 +131,30 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat return updated; } -// TODO take in bucket key instead bool -PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const +PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const { - return (_iter < _entries.size() - && _entries[_iter].bucket_key < bucketId.toKey()); + return ((_iter < _entries.size()) + && (_entries[_iter].bucket_key < bucket_key)); } -// TODO take in bucket key instead bool -PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const +PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const { - return _iter < _entries.size() && _entries[_iter].bucket_id() == bucketId; + return _iter < _entries.size() && _entries[_iter].bucket_key == bucket_key; } -bool -PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e) -{ +using MergeResult = BucketDatabase::MergingProcessor::Result; + +MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger) { + const uint64_t bucket_key = merger.bucket_key(); + + while (databaseIteratorHasPassedBucketInfoIterator(bucket_key)) { + LOG(spam, "Found new bucket %s, adding", _entries[_iter].bucket_id().toString().c_str()); + addToMerger(merger, skipAllForSameBucket()); + } + + auto& e = merger.current_entry(); document::BucketId bucketId(e.getBucketId()); LOG(spam, @@ -157,19 +163,10 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e) bucketId.toString().c_str(), e.getBucketInfo().toString().c_str()); - while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) { - LOG(spam, "Found new bucket %s, adding", - _entries[_iter].bucket_id().toString().c_str()); - - _missingEntries.push_back(skipAllForSameBucket()); - } - bool updated(removeCopiesFromNodesThatWereRequested(e, bucketId)); - if (bucketInfoIteratorPointsToBucket(bucketId)) { - LOG(spam, "Updating bucket %s", - _entries[_iter].bucket_id().toString().c_str()); - + if (bucketInfoIteratorPointsToBucket(bucket_key)) { + LOG(spam, "Updating bucket %s", _entries[_iter].bucket_id().toString().c_str()); insertInfo(e, skipAllForSameBucket()); updated = true; } @@ -177,23 +174,24 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e) if (updated) { // Remove bucket if we've previously removed all nodes from it if (e->getNodeCount() == 0) { - _removedBuckets.push_back(bucketId); + return MergeResult::Skip; } else { e.getBucketInfo().updateTrusted(); + return MergeResult::Update; } } - LOG(spam, - "After merging info from nodes [%s], bucket %s had info %s", - requestNodesToString().c_str(), - bucketId.toString().c_str(), - e.getBucketInfo().toString().c_str()); + return MergeResult::KeepUnchanged; +} - return true; +void PendingBucketSpaceDbTransition::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) { + while (_iter < _entries.size()) { + addToInserter(inserter, skipAllForSameBucket()); + } } void -PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& range) +PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, const Range& range) { LOG(spam, "Adding new bucket %s with %d copies", _entries[range.first].bucket_id().toString().c_str(), @@ -204,33 +202,37 @@ PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& r if (e->getLastGarbageCollectionTime() == 0) { e->setLastGarbageCollectionTime( framework::MicroSecTime(_creationTimestamp) - .getSeconds().getTime()); + .getSeconds().getTime()); } e.getBucketInfo().updateTrusted(); - db.update(e); + merger.insert_before_current(e); } void -PendingBucketSpaceDbTransition::mergeIntoBucketDatabase() +PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range) { - BucketDatabase &db(_distributorBucketSpace.getBucketDatabase()); - std::sort(_entries.begin(), _entries.end()); - - db.forEach(*this); - - for (uint32_t i = 0; i < _removedBuckets.size(); ++i) { - db.remove(_removedBuckets[i]); - } - _removedBuckets.clear(); + // TODO dedupe + LOG(spam, "Adding new bucket %s with %d copies", + _entries[range.first].bucket_id().toString().c_str(), + range.second - range.first); - // All of the remaining were not already in the bucket database. - while (_iter < _entries.size()) { - _missingEntries.push_back(skipAllForSameBucket()); + BucketDatabase::Entry e(_entries[range.first].bucket_id(), BucketInfo()); + insertInfo(e, range); + if (e->getLastGarbageCollectionTime() == 0) { + e->setLastGarbageCollectionTime( + framework::MicroSecTime(_creationTimestamp) + .getSeconds().getTime()); } + e.getBucketInfo().updateTrusted(); + inserter.insert_at_end(e); +} - for (uint32_t i = 0; i < _missingEntries.size(); ++i) { - addToBucketDB(db, _missingEntries[i]); - } +void +PendingBucketSpaceDbTransition::mergeIntoBucketDatabase() +{ + BucketDatabase &db(_distributorBucketSpace.getBucketDatabase()); + std::sort(_entries.begin(), _entries.end()); + db.merge(*this); } void diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h index 7eb2974eb52..695f80750aa 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h @@ -20,7 +20,7 @@ class DistributorBucketSpace; * reply result within a bucket space and apply it to the distributor * bucket database when switching to the pending cluster state. */ -class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProcessor +class PendingBucketSpaceDbTransition : public BucketDatabase::MergingProcessor { public: using Entry = dbtransition::Entry; @@ -51,8 +51,8 @@ private: bool _bucketOwnershipTransfer; std::unordered_map<uint16_t, size_t> _rejectedRequests; - // BucketDataBase::MutableEntryProcessor API - bool process(BucketDatabase::Entry& e) override; + BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override; + void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override; /** * Skips through all entries for the same bucket and returns @@ -63,7 +63,8 @@ private: std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range); void insertInfo(BucketDatabase::Entry& info, const Range& range); - void addToBucketDB(BucketDatabase& db, const Range& range); + void addToMerger(BucketDatabase::Merger& merger, const Range& range); + void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range); bool nodeIsOutdated(uint16_t node) const { return (_outdatedNodes.find(node) != _outdatedNodes.end()); @@ -75,8 +76,8 @@ private: bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId); // Helper methods for iterating over _entries - bool databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const; - bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const; + bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const; + bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const; std::string requestNodesToString(); bool distributorChanged(); @@ -99,7 +100,7 @@ public: std::shared_ptr<const ClusterInformation> clusterInfo, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp); - ~PendingBucketSpaceDbTransition(); + ~PendingBucketSpaceDbTransition() override; // Merges all the results with the corresponding bucket database. void mergeIntoBucketDatabase(); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 6cba7084037..8298b126690 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -28,7 +28,6 @@ PendingClusterState::PendingClusterState( const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) @@ -41,7 +40,6 @@ PendingClusterState::PendingClusterState( _creationTimestamp(creationTimestamp), _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), - _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo), _clusterStateVersion(_cmd->getClusterStateBundle().getVersion()), _isVersionedTransition(true), _bucketOwnershipTransfer(false), @@ -56,7 +54,6 @@ PendingClusterState::PendingClusterState( const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), _prevClusterStateBundle(clusterInfo->getClusterStateBundle()), @@ -66,7 +63,6 @@ PendingClusterState::PendingClusterState( _creationTimestamp(creationTimestamp), _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), - _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo), _clusterStateVersion(0), _isVersionedTransition(false), _bucketOwnershipTransfer(true), @@ -76,7 +72,7 @@ PendingClusterState::PendingClusterState( initializeBucketSpaceTransitions(true, OutdatedNodesMap()); } -PendingClusterState::~PendingClusterState() {} +PendingClusterState::~PendingClusterState() = default; void PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap) diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index cedc0573381..7aa35b32b8e 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -46,14 +46,13 @@ public: const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) { // Naked new due to private constructor return std::unique_ptr<PendingClusterState>(new PendingClusterState( - clock, clusterInfo, sender, bucketSpaceRepo, readOnlyBucketSpaceRepo, + clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd, outdatedNodesMap, creationTimestamp)); } @@ -66,13 +65,11 @@ public: const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, api::Timestamp creationTimestamp) { // Naked new due to private constructor return std::unique_ptr<PendingClusterState>(new PendingClusterState( - clock, clusterInfo, sender, bucketSpaceRepo, - readOnlyBucketSpaceRepo, creationTimestamp)); + clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp)); } PendingClusterState(const PendingClusterState &) = delete; @@ -167,7 +164,6 @@ private: const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp); @@ -181,7 +177,6 @@ private: const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, DistributorBucketSpaceRepo& bucketSpaceRepo, - DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, api::Timestamp creationTimestamp); struct BucketSpaceAndNode { @@ -232,7 +227,6 @@ private: DistributorMessageSender& _sender; DistributorBucketSpaceRepo& _bucketSpaceRepo; - DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo; uint32_t _clusterStateVersion; bool _isVersionedTransition; bool _bucketOwnershipTransfer; |