summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-05-09 12:03:06 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-06-03 13:08:25 +0000
commit7e9a122d5865db5a24b135c41b6dbf7dedf6a31c (patch)
treee6901c00e5797b7b688e921d9c83961550463c5a /storage
parent34920d57c38c11f8ef8979071992e206fcd0ab03 (diff)
Add new DB merging API to distributor BucketDatabase
Abstracts away how an ordered merge may be performed with the database and an arbitrary sorted bucket sequence, with any number of buckets skipped, updated or inserted as part of the merge. Such an API is required to allow efficient bulk updates of a B-tree backed database, as it is suboptimal to require constant tree mutations. Other changes: - Removed legacy mutable iteration API. Not needed with new merge API. - Const-iteration of bucket database now uses an explicit const reference entry type to avoid needing to construct a temporary entry when we can instead just point directly into the backing ArrayStore. - Micro-optimizations of node remover pass to avoid going via cluster state's node state std::map for each bucket replica entry. Now uses a precomputed bit vector. Also avoid BucketId bit reversing operations as much as possible by using raw bucket keys in more places. - Changed wording and contents of log message that triggers when buckets are removed from the DB due to no remaining nodes containing replicas for the bucket. Now more obvious what the message actually means. - Added several benchmark tests (disabled by default)
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdatabasetest.cpp242
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp102
-rw-r--r--storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp144
-rw-r--r--storage/src/vespa/storage/bucketdb/btree_bucket_database.h9
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketdatabase.cpp23
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketdatabase.h79
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.cpp179
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.h178
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.hpp165
-rw-r--r--storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp155
-rw-r--r--storage/src/vespa/storage/bucketdb/mapbucketdatabase.h16
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp159
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h49
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h4
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp96
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h15
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h10
21 files changed, 1035 insertions, 604 deletions
diff --git a/storage/src/tests/distributor/bucketdatabasetest.cpp b/storage/src/tests/distributor/bucketdatabasetest.cpp
index 92e0c534e31..cfb54edbe78 100644
--- a/storage/src/tests/distributor/bucketdatabasetest.cpp
+++ b/storage/src/tests/distributor/bucketdatabasetest.cpp
@@ -1,5 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketdatabasetest.h"
+#include <vespa/vespalib/util/benchmark_timer.h>
+#include <chrono>
#include <iomanip>
#include <algorithm>
@@ -12,15 +14,25 @@ void BucketDatabaseTest::SetUp() {
}
namespace {
- BucketCopy BC(uint32_t nodeIdx) {
- return BucketCopy(0, nodeIdx, api::BucketInfo());
- }
- BucketInfo BI(uint32_t nodeIdx) {
- BucketInfo bi;
- bi.addNode(BC(nodeIdx), toVector<uint16_t>(0));
- return bi;
- }
+BucketCopy BC(uint32_t nodeIdx) {
+ return BucketCopy(0, nodeIdx, api::BucketInfo());
+}
+
+BucketInfo BI(uint32_t nodeIdx) {
+ BucketInfo bi;
+ bi.addNode(BC(nodeIdx), toVector<uint16_t>(0));
+ return bi;
+}
+
+BucketInfo BI3(uint32_t node0, uint32_t node1, uint32_t node2) {
+ BucketInfo bi;
+ bi.addNode(BC(node0), toVector<uint16_t>(node0, node1, node2));
+ bi.addNode(BC(node1), toVector<uint16_t>(node0, node1, node2));
+ bi.addNode(BC(node2), toVector<uint16_t>(node0, node1, node2));
+ return bi;
+}
+
}
TEST_P(BucketDatabaseTest, testClear) {
@@ -63,34 +75,23 @@ TEST_P(BucketDatabaseTest, testUpdateGetAndRemove) {
namespace {
-struct ModifyProcessor : public BucketDatabase::MutableEntryProcessor
-{
- bool process(BucketDatabase::Entry& e) override {
- if (e.getBucketId() == document::BucketId(16, 0x0b)) {
- e.getBucketInfo() = BI(7);
- } else if (e.getBucketId() == document::BucketId(16, 0x2a)) {
- e->clear();
- e->addNode(BC(4), toVector<uint16_t>(0));
- e->addNode(BC(5), toVector<uint16_t>(0));
- }
-
- return true;
- }
-};
-
struct ListAllProcessor : public BucketDatabase::EntryProcessor {
std::ostringstream ost;
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
ost << e << "\n";
return true;
}
};
-struct DummyProcessor : public BucketDatabase::EntryProcessor {
- std::ostringstream ost;
+std::string dump_db(const BucketDatabase& db) {
+ ListAllProcessor proc;
+ db.forEach(proc, document::BucketId());
+ return proc.ost.str();
+}
- bool process(const BucketDatabase::Entry&) override {
+struct DummyProcessor : public BucketDatabase::EntryProcessor {
+ bool process(const BucketDatabase::ConstEntryRef&) override {
return true;
}
};
@@ -99,7 +100,7 @@ struct DummyProcessor : public BucketDatabase::EntryProcessor {
struct StoppingProcessor : public BucketDatabase::EntryProcessor {
std::ostringstream ost;
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
ost << e << "\n";
if (e.getBucketId() == document::BucketId(16, 0x2a)) {
@@ -156,25 +157,6 @@ TEST_P(BucketDatabaseTest, testIterating) {
"node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"),
proc.ost.str());
}
-
- {
- ModifyProcessor alterProc;
- db().forEach(alterProc, document::BucketId());
- // Verify content after altering
- ListAllProcessor proc;
- db().forEach(proc);
-
- EXPECT_EQ(
- std::string(
- "BucketId(0x4000000000000010) : "
- "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
- "BucketId(0x400000000000002a) : "
- "node(idx=4,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false), "
- "node(idx=5,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
- "BucketId(0x400000000000000b) : "
- "node(idx=7,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"),
- proc.ost.str());
- }
}
std::string
@@ -552,4 +534,170 @@ TEST_P(BucketDatabaseTest, testChildCount) {
EXPECT_EQ(0u, db().childCount(BucketId(3, 5)));
}
+using Merger = BucketDatabase::Merger;
+using TrailingInserter = BucketDatabase::TrailingInserter;
+using Result = BucketDatabase::MergingProcessor::Result;
+
+namespace {
+
+struct KeepUnchangedMergingProcessor : BucketDatabase::MergingProcessor {
+ Result merge(Merger&) override {
+ return Result::KeepUnchanged;
+ }
+};
+
+struct SkipBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _skip_bucket;
+ explicit SkipBucketMergingProcessor(BucketId skip_bucket) : _skip_bucket(skip_bucket) {}
+
+ Result merge(Merger& m) override {
+ return (m.bucket_id() == _skip_bucket) ? Result::Skip : Result::KeepUnchanged;
+ }
+};
+
+struct UpdateBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _update_bucket;
+ explicit UpdateBucketMergingProcessor(BucketId update_bucket) : _update_bucket(update_bucket) {}
+
+ Result merge(Merger& m) override {
+ if (m.bucket_id() == _update_bucket) {
+ auto& e = m.current_entry();
+ // Add a replica and alter the current one.
+ e->addNode(BucketCopy(123456, 0, api::BucketInfo(2, 3, 4)), toVector<uint16_t>(0));
+ e->addNode(BucketCopy(234567, 1, api::BucketInfo(3, 4, 5)), toVector<uint16_t>(1));
+ return Result::Update;
+ }
+ return Result::KeepUnchanged;
+ }
+};
+
+struct InsertBeforeBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _before_bucket;
+ explicit InsertBeforeBucketMergingProcessor(BucketId before_bucket) : _before_bucket(before_bucket) {}
+
+ Result merge(Merger& m) override {
+ if (m.bucket_id() == _before_bucket) {
+ // Assumes _before_bucket is > the inserted bucket
+ m.insert_before_current(BucketDatabase::Entry(document::BucketId(16, 2), BI(2)));
+ }
+ return Result::KeepUnchanged;
+ }
+};
+
+struct InsertAtEndMergingProcessor : BucketDatabase::MergingProcessor {
+ Result merge(Merger&) override {
+ return Result::KeepUnchanged;
+ }
+
+ void insert_remaining_at_end(TrailingInserter& inserter) override {
+ inserter.insert_at_end(BucketDatabase::Entry(document::BucketId(16, 3), BI(3)));
+ }
+};
+
+}
+
+TEST_P(BucketDatabaseTest, merge_keep_unchanged_result_does_not_alter_db_contents) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ KeepUnchangedMergingProcessor proc;
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_entry_skipping_removes_entry_from_db) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+ db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3)));
+
+ SkipBucketMergingProcessor proc(BucketId(16, 2));
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_update_result_updates_entry_in_db) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ UpdateBucketMergingProcessor proc(BucketId(16, 1));
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), "
+ "node(idx=0,crc=0x2,docs=3/3,bytes=4/4,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_can_insert_entry_before_current_bucket) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3)));
+
+ InsertBeforeBucketMergingProcessor proc(BucketId(16, 1));
+ db().merge(proc);
+
+ // Bucket (...)00002 is inserted by the merge processor
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_can_insert_entry_at_end) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ InsertAtEndMergingProcessor proc;
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, DISABLED_benchmark_const_iteration) {
+ constexpr uint32_t superbuckets = 1u << 16u;
+ constexpr uint32_t sub_buckets = 14;
+ constexpr uint32_t n_buckets = superbuckets * sub_buckets;
+
+ std::vector<uint64_t> bucket_keys;
+ bucket_keys.reserve(n_buckets);
+
+ for (uint32_t sb = 0; sb < superbuckets; ++sb) {
+ for (uint64_t i = 0; i < sub_buckets; ++i) {
+ document::BucketId bucket(48, (i << 32ULL) | sb);
+ bucket_keys.emplace_back(bucket.toKey());
+ }
+ }
+ std::sort(bucket_keys.begin(), bucket_keys.end());
+ for (uint64_t k : bucket_keys) {
+ db().update(BucketDatabase::Entry(BucketId(BucketId::keyToBucketId(k)), BI3(0, 1, 2)));
+ }
+
+ auto elapsed = vespalib::BenchmarkTimer::benchmark([&] {
+ DummyProcessor proc;
+ db().forEach(proc, document::BucketId());
+ }, 5);
+ fprintf(stderr, "Full DB iteration of %s takes %g seconds\n",
+ db().toString(false).c_str(), elapsed);
+}
+
}
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 1cfc1692edb..cdaa6e9aaa3 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -514,7 +514,7 @@ public:
OutdatedNodesMap outdatedNodesMap;
state = PendingClusterState::createForClusterStateChange(
clock, clusterInfo, sender,
- owner.getBucketSpaceRepo(), owner.getReadOnlyBucketSpaceRepo(),
+ owner.getBucketSpaceRepo(),
cmd, outdatedNodesMap, api::Timestamp(1));
}
@@ -526,8 +526,7 @@ public:
owner.createClusterInfo(oldClusterState));
state = PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, owner.getBucketSpaceRepo(),
- owner.getReadOnlyBucketSpaceRepo(), api::Timestamp(1));
+ clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1));
}
};
@@ -543,6 +542,8 @@ public:
{
return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState);
}
+
+ uint32_t populate_bucket_db_via_request_bucket_info_for_benchmarking();
};
BucketDBUpdaterTest::BucketDBUpdaterTest()
@@ -863,14 +864,14 @@ TEST_F(BucketDBUpdaterTest, testBitChange) {
const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
auto sreply = std::make_shared<RequestBucketInfoReply>(req);
sreply->setAddress(storageAddress(0));
- api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
+ auto& vec = sreply->getBucketInfo();
if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
int cnt=0;
for (int i=0; cnt < 2; i++) {
lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution();
std::vector<uint16_t> distributors;
if (distribution.getIdealDistributorNode(
- lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"),
+ lib::ClusterState("bits:14 storage:1 distributor:2"),
document::BucketId(16, i))
== 0)
{
@@ -1373,8 +1374,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(),
- getReadOnlyBucketSpaceRepo(), api::Timestamp(1)));
+ clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1)));
sortSentMessagesByIndex(sender);
@@ -1508,7 +1508,7 @@ TEST_F(BucketDBUpdaterTest, testPendingClusterStateReceive) {
OutdatedNodesMap outdatedNodesMap;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, api::Timestamp(1)));
ASSERT_EQ(messageCount(3), sender.commands.size());
@@ -1617,7 +1617,7 @@ struct BucketDumper : public BucketDatabase::EntryProcessor
{
}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
document::BucketId bucketId(e.getBucketId());
ost << (uint32_t)bucketId.getRawId() << ":";
@@ -1661,7 +1661,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d"));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, beforeTime));
parseInputData(existingData, beforeTime, *state, includeBucketInfo);
@@ -1680,7 +1680,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString()));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, afterTime));
parseInputData(newData, afterTime, *state, includeBucketInfo);
@@ -1931,7 +1931,7 @@ struct FunctorProcessor : BucketDatabase::EntryProcessor {
template <typename F>
explicit FunctorProcessor(F&& f) : _f(std::forward<F>(f)) {}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
_f(e);
return true;
}
@@ -2580,19 +2580,17 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_trans
TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
// Need to trigger an initial edge to complete first bucket scan
- ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("storage:1 distributor:2"),
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"),
messageCount(1), 0));
_sender.clear();
- lib::ClusterState state("storage:1 distributor:1");
+ lib::ClusterState state("distributor:1 storage:1");
setSystemState(state);
constexpr uint32_t superbuckets = 1u << 16u;
constexpr uint32_t sub_buckets = 14;
constexpr uint32_t n_buckets = superbuckets * sub_buckets;
- vespalib::BenchmarkTimer timer(1.0);
-
ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO);
@@ -2610,6 +2608,7 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
}
}
+ vespalib::BenchmarkTimer timer(1.0);
// Global space has no buckets but will serve as a trigger for merging
// buckets into the DB. This lets us measure the overhead of just this part.
if (req.getBucketSpace() == FixedBucketSpaces::global_space()) {
@@ -2626,6 +2625,77 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
EXPECT_EQ(size_t(0), mutable_global_db().size());
}
+uint32_t BucketDBUpdaterTest::populate_bucket_db_via_request_bucket_info_for_benchmarking() {
+ // Need to trigger an initial edge to complete first bucket scan
+ setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0);
+ _sender.clear();
+
+ lib::ClusterState state("distributor:1 storage:1");
+ setSystemState(state);
+
+ constexpr uint32_t superbuckets = 1u << 16u;
+ constexpr uint32_t sub_buckets = 14;
+ constexpr uint32_t n_buckets = superbuckets * sub_buckets;
+
+ assert(_bucketSpaces.size() == _sender.commands.size());
+ for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
+ assert(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO);
+ const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.commands[bsi]);
+
+ auto sreply = std::make_shared<RequestBucketInfoReply>(req);
+ sreply->setAddress(storageAddress(0));
+ auto& vec = sreply->getBucketInfo();
+ if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
+ for (uint32_t sb = 0; sb < superbuckets; ++sb) {
+ for (uint64_t i = 0; i < sub_buckets; ++i) {
+ document::BucketId bucket(48, (i << 32ULL) | sb);
+ vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1)));
+ }
+ }
+ }
+ getBucketDBUpdater().onRequestBucketInfoReply(sreply);
+ }
+
+ assert(mutable_default_db().size() == n_buckets);
+ assert(mutable_global_db().size() == 0);
+ return n_buckets;
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_removing_buckets_for_unavailable_storage_nodes) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via ownership
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_no_buckets_removed_during_node_remover_db_pass) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ // TODO this benchmark is void if we further restrict the pruning elision logic to allow
+ // elision when storage nodes come online.
+ lib::ClusterState no_op_state("distributor:1 storage:2"); // Not removing any buckets
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan %u buckets with no-op action\n", timer.min_time(), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_node_remover_db_pass) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via all replicas gone
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
+}
+
TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");
diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
index a3715bb3d27..ef198cb1d3f 100644
--- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
+++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
@@ -35,6 +35,7 @@
namespace storage {
using Entry = BucketDatabase::Entry;
+using ConstEntryRef = BucketDatabase::ConstEntryRef;
using search::datastore::EntryRef;
using vespalib::ConstArrayRef;
using document::BucketId;
@@ -55,6 +56,12 @@ Entry entry_from_replica_array_ref(const BucketId& id, uint32_t gc_timestamp, Co
return Entry(id, BucketInfo(gc_timestamp, std::vector<BucketCopy>(replicas.begin(), replicas.end())));
}
+ConstEntryRef const_entry_ref_from_replica_array_ref(const BucketId& id, uint32_t gc_timestamp,
+ ConstArrayRef<BucketCopy> replicas)
+{
+ return ConstEntryRef(id, ConstBucketInfoRef(gc_timestamp, replicas));
+}
+
EntryRef entry_ref_from_value(uint64_t value) {
return EntryRef(value & 0xffffffffULL);
}
@@ -127,14 +134,27 @@ void BTreeBucketDatabase::commit_tree_changes() {
_tree.getAllocator().trimHoldLists(used_gen);
}
+Entry BTreeBucketDatabase::entry_from_value(uint64_t bucket_key, uint64_t value) const {
+ const auto replicas_ref = _store.get(entry_ref_from_value(value));
+ const auto bucket = BucketId(BucketId::keyToBucketId(bucket_key));
+ return entry_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref);
+}
+
Entry BTreeBucketDatabase::entry_from_iterator(const BTree::ConstIterator& iter) const {
if (!iter.valid()) {
return Entry::createInvalid();
}
+ return entry_from_value(iter.getKey(), iter.getData());
+}
+
+ConstEntryRef BTreeBucketDatabase::const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const {
+ if (!iter.valid()) {
+ return ConstEntryRef::createInvalid();
+ }
const auto value = iter.getData();
const auto replicas_ref = _store.get(entry_ref_from_value(value));
const auto bucket = BucketId(BucketId::keyToBucketId(iter.getKey()));
- return entry_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref);
+ return const_entry_ref_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref);
}
BucketId BTreeBucketDatabase::bucket_from_valid_iterator(const BTree::ConstIterator& iter) const {
@@ -216,8 +236,8 @@ BTreeBucketDatabase::find_parents_internal(const document::BucketId& bucket,
std::vector<Entry>& entries) const
{
const uint64_t bucket_key = bucket.toKey();
- uint64_t parent_key = 0;
- auto iter = _tree.begin();
+ const auto frozen_view = _tree.getFrozenView();
+ auto iter = frozen_view.begin();
// Start at the root level, descending towards the bucket itself.
// Try skipping as many levels of the tree as possible as we go.
uint32_t bits = 1;
@@ -228,11 +248,11 @@ BTreeBucketDatabase::find_parents_internal(const document::BucketId& bucket,
entries.emplace_back(entry_from_iterator(iter));
}
bits = next_parent_bit_seek_level(bits, candidate, bucket);
- parent_key = BucketId(bits, bucket.getRawId()).toKey();
+ const auto parent_key = BucketId(bits, bucket.getRawId()).toKey();
assert(parent_key > iter.getKey());
iter.seek(parent_key);
}
- return iter; // FIXME clang warns here due to copying...
+ return iter;
}
/*
@@ -286,30 +306,118 @@ void BTreeBucketDatabase::update(const Entry& newEntry) {
// FIXME but bit-tree code says "lowerBound" in impl and "after" in declaration???
void BTreeBucketDatabase::forEach(EntryProcessor& proc, const BucketId& after) const {
for (auto iter = _tree.upperBound(after.toKey()); iter.valid(); ++iter) {
- if (!proc.process(entry_from_iterator(iter))) {
+ if (!proc.process(const_entry_ref_from_iterator(iter))) {
break;
}
}
}
-void BTreeBucketDatabase::forEach(MutableEntryProcessor& proc, const BucketId& after) {
- for (auto iter = _tree.upperBound(after.toKey()); iter.valid(); ++iter) {
- // FIXME this is a horrible API which currently has to update every time since we don't
- // know from the function call itself whether something was in fact updated behind the scenes..!
- auto entry = entry_from_iterator(iter);
- bool should_continue = proc.process(entry);
- // TODO optimize this joyful mess :D
- update(entry);
- if (!should_continue) {
- break;
+struct BTreeBuilderMerger final : BucketDatabase::Merger {
+ BTreeBucketDatabase& _db;
+ BTreeBucketDatabase::BTree::Builder& _builder;
+ uint64_t _current_key;
+ uint64_t _current_value;
+ Entry _cached_entry;
+ bool _valid_cached_entry;
+
+ BTreeBuilderMerger(BTreeBucketDatabase& db,
+ BTreeBucketDatabase::BTree::Builder& builder)
+ : _db(db),
+ _builder(builder),
+ _current_key(0),
+ _current_value(0),
+ _cached_entry(),
+ _valid_cached_entry(false)
+ {}
+ ~BTreeBuilderMerger() override = default;
+
+ uint64_t bucket_key() const noexcept override {
+ return _current_key;
+ }
+ BucketId bucket_id() const noexcept override {
+ return BucketId(BucketId::keyToBucketId(_current_key));
+ }
+ Entry& current_entry() override {
+ if (!_valid_cached_entry) {
+ _cached_entry = _db.entry_from_value(_current_key, _current_value);
+ _valid_cached_entry = true;
}
+ return _cached_entry;
}
- //commit_tree_changes(); TODO should be done as bulk op!
+ void insert_before_current(const Entry& e) override {
+ const uint64_t bucket_key = e.getBucketId().toKey();
+ assert(bucket_key < _current_key);
+
+ auto replicas_ref = _db._store.add(e.getBucketInfo().getRawNodes());
+ const auto new_value = value_from(e.getBucketInfo().getLastGarbageCollectionTime(), replicas_ref);
+
+ _builder.insert(bucket_key, new_value);
+ }
+
+ void update_iteration_state(uint64_t key, uint64_t value) {
+ _current_key = key;
+ _current_value = value;
+ _valid_cached_entry = false;
+ }
+};
+
+struct BTreeTrailingInserter final : BucketDatabase::TrailingInserter {
+ BTreeBucketDatabase& _db;
+ BTreeBucketDatabase::BTree::Builder& _builder;
+
+ BTreeTrailingInserter(BTreeBucketDatabase& db,
+ BTreeBucketDatabase::BTree::Builder& builder)
+ : _db(db),
+ _builder(builder)
+ {}
+
+ ~BTreeTrailingInserter() override = default;
+
+ void insert_at_end(const Entry& e) override {
+ const uint64_t bucket_key = e.getBucketId().toKey();
+ const auto replicas_ref = _db._store.add(e.getBucketInfo().getRawNodes());
+ const auto new_value = value_from(e.getBucketInfo().getLastGarbageCollectionTime(), replicas_ref);
+ _builder.insert(bucket_key, new_value);
+ }
+};
+
+// TODO lbound arg?
+void BTreeBucketDatabase::merge(MergingProcessor& proc) {
+ BTreeBucketDatabase::BTree::Builder builder(_tree.getAllocator());
+ BTreeBuilderMerger merger(*this, builder);
+
+ // TODO for_each instead?
+ for (auto iter = _tree.begin(); iter.valid(); ++iter) {
+ const uint64_t key = iter.getKey();
+ const uint64_t value = iter.getData();
+ merger.update_iteration_state(key, value);
+
+ auto result = proc.merge(merger);
+
+ if (result == MergingProcessor::Result::KeepUnchanged) {
+ builder.insert(key, value); // Reuse array store ref with no changes
+ } else if (result == MergingProcessor::Result::Update) {
+ assert(merger._valid_cached_entry); // Must actually have been touched
+ assert(merger._cached_entry.valid());
+ _store.remove(entry_ref_from_value(value));
+ auto new_replicas_ref = _store.add(merger._cached_entry.getBucketInfo().getRawNodes());
+ const auto new_value = value_from(merger._cached_entry.getBucketInfo().getLastGarbageCollectionTime(), new_replicas_ref);
+ builder.insert(key, new_value);
+ } else if (result == MergingProcessor::Result::Skip) {
+ _store.remove(entry_ref_from_value(value));
+ } else {
+ abort();
+ }
+ }
+ BTreeTrailingInserter inserter(*this, builder);
+ proc.insert_remaining_at_end(inserter);
+
+ _tree.assign(builder);
+ commit_tree_changes();
}
Entry BTreeBucketDatabase::upperBound(const BucketId& bucket) const {
return entry_from_iterator(_tree.upperBound(bucket.toKey()));
-
}
uint64_t BTreeBucketDatabase::size() const {
diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
index d896f76db54..eb527071ad7 100644
--- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
+++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
@@ -39,6 +39,8 @@ public:
BTreeBucketDatabase();
~BTreeBucketDatabase() override;
+ void merge(MergingProcessor&) override;
+
// Ye olde bucket DB API:
Entry get(const document::BucketId& bucket) const override;
void remove(const document::BucketId& bucket) override;
@@ -48,7 +50,6 @@ public:
std::vector<Entry>& entries) const override;
void update(const Entry& newEntry) override;
void forEach(EntryProcessor&, const document::BucketId& after) const override;
- void forEach(MutableEntryProcessor&, const document::BucketId& after) override;
Entry upperBound(const document::BucketId& value) const override;
uint64_t size() const override;
void clear() override;
@@ -60,11 +61,17 @@ public:
const std::string& indent) const override;
private:
+ Entry entry_from_value(uint64_t bucket_key, uint64_t value) const;
Entry entry_from_iterator(const BTree::ConstIterator& iter) const;
+ ConstEntryRef const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const;
document::BucketId bucket_from_valid_iterator(const BTree::ConstIterator& iter) const;
void commit_tree_changes();
BTree::ConstIterator find_parents_internal(const document::BucketId& bucket,
std::vector<Entry>& entries) const;
+
+ friend struct BTreeBuilderMerger;
+ friend struct BTreeMergingBuilder;
+ friend struct BTreeTrailingInserter;
};
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp
index 8a4aadfa5be..f2a561b83eb 100644
--- a/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp
@@ -4,17 +4,6 @@
namespace storage {
-namespace {
- struct GetNextEntryProcessor : public BucketDatabase::EntryProcessor {
- BucketDatabase::Entry _entry;
-
- bool process(const BucketDatabase::Entry& e) override {
- _entry = e;
- return false;
- }
- };
-}
-
BucketDatabase::Entry
BucketDatabase::getNext(const document::BucketId& last) const
{
@@ -32,7 +21,8 @@ BucketDatabase::createAppropriateBucket(
return e;
}
-std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e)
+template <typename BucketInfoType>
+std::ostream& operator<<(std::ostream& o, const BucketDatabase::EntryBase<BucketInfoType>& e)
{
if (!e.valid()) {
o << "NONEXISTING";
@@ -42,12 +32,19 @@ std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e)
return o;
}
+template <typename BucketInfoType>
std::string
-BucketDatabase::Entry::toString() const
+BucketDatabase::EntryBase<BucketInfoType>::toString() const
{
std::ostringstream ost;
ost << *this;
return ost.str();
}
+template std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e);
+template std::ostream& operator<<(std::ostream& o, const BucketDatabase::ConstEntryRef& e);
+
+template class BucketDatabase::EntryBase<BucketInfo>;
+template class BucketDatabase::EntryBase<ConstBucketInfoRef>;
+
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.h b/storage/src/vespa/storage/bucketdb/bucketdatabase.h
index a5a70c488ca..c9968fd0ad8 100644
--- a/storage/src/vespa/storage/bucketdb/bucketdatabase.h
+++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.h
@@ -13,42 +13,48 @@ namespace storage {
class BucketDatabase : public vespalib::Printable
{
public:
- class Entry {
+ template <typename BucketInfoType>
+ class EntryBase {
document::BucketId _bucketId;
- BucketInfo _info;
+ BucketInfoType _info;
public:
- Entry() : _bucketId(0) {} // Invalid entry
- Entry(const document::BucketId& bId, BucketInfo bucketInfo)
+ EntryBase() : _bucketId(0) {} // Invalid entry
+ EntryBase(const document::BucketId& bId, const BucketInfoType& bucketInfo)
+ : _bucketId(bId), _info(bucketInfo) {}
+ EntryBase(const document::BucketId& bId, BucketInfoType&& bucketInfo)
: _bucketId(bId), _info(std::move(bucketInfo)) {}
- explicit Entry(const document::BucketId& bId) : _bucketId(bId) {}
+ explicit EntryBase(const document::BucketId& bId) : _bucketId(bId) {}
- bool operator==(const Entry& other) const {
+ bool operator==(const EntryBase& other) const {
return (_bucketId == other._bucketId && _info == other._info);
}
bool valid() const { return (_bucketId.getRawId() != 0); }
std::string toString() const;
const document::BucketId& getBucketId() const { return _bucketId; }
- const BucketInfo& getBucketInfo() const { return _info; }
- BucketInfo& getBucketInfo() { return _info; }
- BucketInfo* operator->() { return &_info; }
- const BucketInfo* operator->() const { return &_info; }
+ const BucketInfoType& getBucketInfo() const { return _info; }
+ BucketInfoType& getBucketInfo() { return _info; }
+ BucketInfoType* operator->() { return &_info; }
+ const BucketInfoType* operator->() const { return &_info; }
- static Entry createInvalid() {
- return Entry();
+ static EntryBase createInvalid() {
+ return EntryBase();
}
};
- template<typename T> struct Processor {
- virtual ~Processor() {}
+ using Entry = EntryBase<BucketInfo>;
+ // TODO avoid needing to touch memory just to get to the const entry ref
+ // TODO -> lazy ID to ConstArrayRef resolve
+ using ConstEntryRef = EntryBase<ConstBucketInfoRef>;
+
+ struct EntryProcessor {
+ virtual ~EntryProcessor() = default;
/** Return false to stop iterating. */
- virtual bool process(T& e) = 0;
+ virtual bool process(const ConstEntryRef& e) = 0;
};
- typedef Processor<const Entry> EntryProcessor;
- typedef Processor<Entry> MutableEntryProcessor;
- virtual ~BucketDatabase() {}
+ ~BucketDatabase() override = default;
virtual Entry get(const document::BucketId& bucket) const = 0;
virtual void remove(const document::BucketId& bucket) = 0;
@@ -76,9 +82,37 @@ public:
virtual void forEach(
EntryProcessor&,
const document::BucketId& after = document::BucketId()) const = 0;
- virtual void forEach(
- MutableEntryProcessor&,
- const document::BucketId& after = document::BucketId()) = 0;
+
+ struct TrailingInserter {
+ virtual ~TrailingInserter() = default;
+ virtual void insert_at_end(const Entry&) = 0;
+ };
+
+ struct Merger {
+ virtual ~Merger() = default;
+ // Visibility of changes to this object when MergingProcessor::Result::Update
+ // is _not_ returned is undefined.
+ // TODO this should ideally be separated into read/write functions, but this
+ // will suffice for now to avoid too many changes.
+ virtual uint64_t bucket_key() const noexcept = 0;
+ virtual document::BucketId bucket_id() const noexcept = 0;
+ virtual Entry& current_entry() = 0;
+ virtual void insert_before_current(const Entry&) = 0;
+ };
+
+ struct MergingProcessor {
+ enum class Result {
+ Update,
+ KeepUnchanged,
+ Skip
+ };
+
+ virtual ~MergingProcessor() = default;
+ virtual Result merge(Merger&) = 0;
+ virtual void insert_remaining_at_end(TrailingInserter&) {}
+ };
+
+ virtual void merge(MergingProcessor&) = 0;
/**
* Get the first bucket that does _not_ compare less than or equal to
@@ -114,6 +148,7 @@ public:
virtual uint32_t childCount(const document::BucketId&) const = 0;
};
-std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e);
+template <typename BucketInfoType>
+std::ostream& operator<<(std::ostream& o, const BucketDatabase::EntryBase<BucketInfoType>& e);
} // storage
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
index 82fc6686c5f..17efa1658ce 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
@@ -1,18 +1,18 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketinfo.h"
+#include "bucketinfo.hpp"
#include <vespa/storage/storageutil/utils.h>
#include <algorithm>
namespace storage {
-BucketInfo::BucketInfo()
- : _lastGarbageCollection(0),
- _nodes()
-{}
+template class BucketInfoBase<std::vector<BucketCopy>>;
+template class BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>>;
+
+BucketInfo::BucketInfo() : BucketInfoBase() {}
BucketInfo::BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes)
- : _lastGarbageCollection(lastGarbageCollection),
- _nodes(std::move(nodes))
+ : BucketInfoBase(lastGarbageCollection, std::move(nodes))
{}
BucketInfo::~BucketInfo() = default;
@@ -22,38 +22,6 @@ BucketInfo& BucketInfo::operator=(const BucketInfo&) = default;
BucketInfo::BucketInfo(BucketInfo&&) noexcept = default;
BucketInfo& BucketInfo::operator=(BucketInfo&&) noexcept = default;
-std::string
-BucketInfo::toString() const {
- std::ostringstream ost;
- print(ost, true, "");
- return ost.str();
-}
-
-bool
-BucketInfo::emptyAndConsistent() const {
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (!_nodes[i].empty()) return false;
- }
- return consistentNodes();
-}
-
-bool
-BucketInfo::validAndConsistent() const {
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (!_nodes[i].valid()) return false;
- }
- return consistentNodes();
-}
-
-bool
-BucketInfo::hasInvalidCopy() const
-{
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (!_nodes[i].valid()) return true;
- }
- return false;
-}
-
void
BucketInfo::updateTrusted() {
if (validAndConsistent()) {
@@ -90,40 +58,6 @@ BucketInfo::resetTrusted() {
updateTrusted();
}
-uint16_t
-BucketInfo::getTrustedCount() const {
- uint32_t trustedCount = 0;
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (_nodes[i].trusted()) {
- trustedCount++;
- }
- }
- return trustedCount;
-}
-
-bool
-BucketInfo::consistentNodes(bool countInvalidAsConsistent) const
-{
- int compareIndex = 0;
- for (uint32_t i=1; i<_nodes.size(); i++) {
- if (!_nodes[i].consistentWith(_nodes[compareIndex],
- countInvalidAsConsistent)) return false;
- }
- return true;
-}
-
-void
-BucketInfo::print(std::ostream& out, bool verbose, const std::string& indent) const
-{
- if (_nodes.empty()) {
- out << "no nodes";
- }
- for (uint32_t i=0; i<_nodes.size(); ++i) {
- if (i != 0) out << ", ";
- _nodes[i].print(out, verbose, indent);
- }
-}
-
namespace {
struct Sorter {
@@ -225,19 +159,6 @@ BucketInfo::removeNode(unsigned short node, TrustedUpdate update)
return false;
}
-const BucketCopy*
-BucketInfo::getNode(uint16_t node) const
-{
- for (std::vector<BucketCopy>::const_iterator iter = _nodes.begin();
- iter != _nodes.end();
- iter++) {
- if (iter->getNode() == node) {
- return &*iter;
- }
- }
- return 0;
-}
-
BucketCopy*
BucketInfo::getNodeInternal(uint16_t node)
{
@@ -251,92 +172,4 @@ BucketInfo::getNodeInternal(uint16_t node)
return 0;
}
-std::vector<uint16_t>
-BucketInfo::getNodes() const {
- std::vector<uint16_t> result;
-
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- result.push_back(_nodes[i].getNode());
- }
-
- return result;
-}
-
-uint32_t
-BucketInfo::getHighestDocumentCount() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getDocumentCount());
- }
- return highest;
-}
-
-uint32_t
-BucketInfo::getHighestTotalDocumentSize() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getTotalDocumentSize());
- }
- return highest;
-}
-
-uint32_t
-BucketInfo::getHighestMetaCount() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getMetaCount());
- }
- return highest;
-}
-
-uint32_t
-BucketInfo::getHighestUsedFileSize() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getUsedFileSize());
- }
- return highest;
-}
-
-bool
-BucketInfo::hasRecentlyCreatedEmptyCopy() const
-{
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- if (_nodes[i].wasRecentlyCreated()) {
- return true;
- }
- }
- return false;
-}
-
-bool
-BucketInfo::operator==(const BucketInfo& other) const
-{
- if (_nodes.size() != other._nodes.size()) {
- return false;
- }
-
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- if (_nodes[i].getNode() != other._nodes[i].getNode()) {
- return false;
- }
-
- if (!(_nodes[i] == other._nodes[i])) {
- return false;
- }
- }
-
- return true;
-};
-
-std::ostream&
-operator<<(std::ostream& out, const BucketInfo& info) {
- info.print(out, false, "");
- return out;
-}
-
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h
index 8949c0f10da..f0be700d204 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.h
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h
@@ -2,6 +2,7 @@
#pragma once
#include "bucketcopy.h"
+#include <vespa/vespalib/util/arrayref.h>
namespace storage {
@@ -14,45 +15,37 @@ enum class TrustedUpdate {
DEFER
};
-class BucketInfo
+template <typename NodeSeq>
+class BucketInfoBase
{
-private:
+protected:
uint32_t _lastGarbageCollection;
- std::vector<BucketCopy> _nodes;
-
+ NodeSeq _nodes;
public:
- BucketInfo();
- BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes);
- ~BucketInfo();
-
- BucketInfo(const BucketInfo&);
- BucketInfo& operator=(const BucketInfo&);
- BucketInfo(BucketInfo&&) noexcept;
- BucketInfo& operator=(BucketInfo&&) noexcept;
+ BucketInfoBase()
+ : _lastGarbageCollection(0),
+ _nodes()
+ {}
+ BucketInfoBase(uint32_t lastGarbageCollection, const NodeSeq& nodes)
+ : _lastGarbageCollection(lastGarbageCollection),
+ _nodes(nodes)
+ {}
+ BucketInfoBase(uint32_t lastGarbageCollection, NodeSeq&& nodes)
+ : _lastGarbageCollection(lastGarbageCollection),
+ _nodes(std::move(nodes))
+ {}
+ ~BucketInfoBase() = default;
+
+ BucketInfoBase(const BucketInfoBase&) = default;
+ BucketInfoBase& operator=(const BucketInfoBase&) = default;
+ BucketInfoBase(BucketInfoBase&&) noexcept = default;
+ BucketInfoBase& operator=(BucketInfoBase&&) noexcept = default;
/**
* @return Returns the last time when this bucket was "garbage collected".
*/
uint32_t getLastGarbageCollectionTime() const { return _lastGarbageCollection; }
- /**
- * Sets the last time the bucket was "garbage collected".
- */
- void setLastGarbageCollectionTime(uint32_t timestamp) {
- _lastGarbageCollection = timestamp;
- }
-
- /**
- Update trusted flags if bucket is now complete and consistent.
- */
- void updateTrusted();
-
- /**
- Removes any historical information on trustedness, and sets the bucket copies to
- trusted if they are now complete and consistent.
- */
- void resetTrusted();
-
/** True if the bucket contains no documents and is consistent. */
bool emptyAndConsistent() const;
@@ -84,10 +77,89 @@ public:
*/
bool consistentNodes(bool countInvalidAsConsistent = false) const;
- static bool mayContain(const BucketInfo&) { return true; }
void print(std::ostream&, bool verbose, const std::string& indent) const;
/**
+ * Returns the bucket copy struct for the given node, null if nonexisting
+ */
+ const BucketCopy* getNode(uint16_t node) const;
+
+ /**
+ * Returns the number of nodes this entry has.
+ */
+ uint32_t getNodeCount() const noexcept { return static_cast<uint32_t>(_nodes.size()); }
+
+ /**
+ * Returns a list of the nodes this entry has.
+ */
+ std::vector<uint16_t> getNodes() const;
+
+ /**
+ Returns a reference to the node with the given index in the node
+ array. This operation has undefined behaviour if the index given
+ is not within the node count.
+ */
+ const BucketCopy& getNodeRef(uint16_t idx) const {
+ return _nodes[idx];
+ }
+
+ const NodeSeq& getRawNodes() const noexcept {
+ return _nodes;
+ }
+
+ std::string toString() const;
+
+ uint32_t getHighestDocumentCount() const;
+ uint32_t getHighestTotalDocumentSize() const;
+ uint32_t getHighestMetaCount() const;
+ uint32_t getHighestUsedFileSize() const;
+
+ bool hasRecentlyCreatedEmptyCopy() const;
+
+ bool operator==(const BucketInfoBase& other) const;
+};
+
+template <typename NodeSeq>
+std::ostream& operator<<(std::ostream& out, const BucketInfoBase<NodeSeq>& info) {
+ info.print(out, false, "");
+ return out;
+}
+
+class ConstBucketInfoRef : public BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>> {
+public:
+ using BucketInfoBase::BucketInfoBase;
+};
+
+class BucketInfo : public BucketInfoBase<std::vector<BucketCopy>> {
+public:
+ BucketInfo();
+ BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes);
+ ~BucketInfo();
+
+ BucketInfo(const BucketInfo&);
+ BucketInfo& operator=(const BucketInfo&);
+ BucketInfo(BucketInfo&&) noexcept;
+ BucketInfo& operator=(BucketInfo&&) noexcept;
+
+ /**
+ * Sets the last time the bucket was "garbage collected".
+ */
+ void setLastGarbageCollectionTime(uint32_t timestamp) {
+ _lastGarbageCollection = timestamp;
+ }
+
+ /**
+ Update trusted flags if bucket is now complete and consistent.
+ */
+ void updateTrusted();
+
+ /**
+ Removes any historical information on trustedness, and sets the bucket copies to
+ trusted if they are now complete and consistent.
+ */
+ void resetTrusted();
+
+ /**
Adds the given node.
@param recommendedOrder A recommended ordering of nodes.
@@ -118,34 +190,6 @@ public:
*/
bool removeNode(uint16_t node, TrustedUpdate update = TrustedUpdate::UPDATE);
- /**
- * Returns the bucket copy struct for the given node, null if nonexisting
- */
- const BucketCopy* getNode(uint16_t node) const;
-
- /**
- * Returns the number of nodes this entry has.
- */
- uint32_t getNodeCount() const noexcept { return static_cast<uint32_t>(_nodes.size()); }
-
- /**
- * Returns a list of the nodes this entry has.
- */
- std::vector<uint16_t> getNodes() const;
-
- /**
- Returns a reference to the node with the given index in the node
- array. This operation has undefined behaviour if the index given
- is not within the node count.
- */
- const BucketCopy& getNodeRef(uint16_t idx) const {
- return _nodes[idx];
- }
-
- const std::vector<BucketCopy>& getRawNodes() const noexcept {
- return _nodes;
- }
-
void clearTrusted(uint16_t nodeIdx) {
getNodeInternal(nodeIdx)->clearTrusted();
}
@@ -155,19 +199,6 @@ public:
*/
void clear() { _nodes.clear(); }
- std::string toString() const;
-
- bool verifyLegal() const { return true; }
-
- uint32_t getHighestDocumentCount() const;
- uint32_t getHighestTotalDocumentSize() const;
- uint32_t getHighestMetaCount() const;
- uint32_t getHighestUsedFileSize() const;
-
- bool hasRecentlyCreatedEmptyCopy() const;
-
- bool operator==(const BucketInfo& other) const;
-
private:
friend class distributor::DistributorTestUtil;
@@ -183,7 +214,8 @@ private:
void addNodeManual(const BucketCopy& newCopy) { _nodes.push_back(newCopy); }
};
-std::ostream& operator<<(std::ostream& out, const BucketInfo& info);
+extern template class BucketInfoBase<std::vector<BucketCopy>>;
+extern template class BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>>;
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp
new file mode 100644
index 00000000000..562e8d562fb
--- /dev/null
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp
@@ -0,0 +1,165 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucketcopy.h"
+#include <vespa/vespalib/util/arrayref.h>
+#include <iostream>
+#include <sstream>
+
+namespace storage {
+
+template <typename NodeSeq>
+std::string BucketInfoBase<NodeSeq>::toString() const {
+ std::ostringstream ost;
+ print(ost, true, "");
+ return ost.str();
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::emptyAndConsistent() const {
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (!_nodes[i].empty()) {
+ return false;
+ }
+ }
+ return consistentNodes();
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::validAndConsistent() const {
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (!_nodes[i].valid()) {
+ return false;
+ }
+ }
+ return consistentNodes();
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::hasInvalidCopy() const {
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (!_nodes[i].valid()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+template <typename NodeSeq>
+uint16_t BucketInfoBase<NodeSeq>::getTrustedCount() const {
+ uint32_t trustedCount = 0;
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (_nodes[i].trusted()) {
+ trustedCount++;
+ }
+ }
+ return trustedCount;
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::consistentNodes(bool countInvalidAsConsistent) const {
+ int compareIndex = 0;
+ for (uint32_t i = 1; i < _nodes.size(); i++) {
+ if (!_nodes[i].consistentWith(_nodes[compareIndex],
+ countInvalidAsConsistent)) return false;
+ }
+ return true;
+}
+
+template <typename NodeSeq>
+void BucketInfoBase<NodeSeq>::print(std::ostream& out, bool verbose, const std::string& indent) const {
+ if (_nodes.size() == 0) {
+ out << "no nodes";
+ }
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ if (i != 0) {
+ out << ", ";
+ }
+ _nodes[i].print(out, verbose, indent);
+ }
+}
+
+template <typename NodeSeq>
+const BucketCopy* BucketInfoBase<NodeSeq>::getNode(uint16_t node) const {
+ for (const auto& n : _nodes) {
+ if (n.getNode() == node) {
+ return &n;
+ }
+ }
+ return 0;
+}
+
+template <typename NodeSeq>
+std::vector<uint16_t> BucketInfoBase<NodeSeq>::getNodes() const {
+ std::vector<uint16_t> result;
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ result.emplace_back(_nodes[i].getNode());
+ }
+ return result;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestDocumentCount() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getDocumentCount());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestTotalDocumentSize() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getTotalDocumentSize());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestMetaCount() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getMetaCount());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestUsedFileSize() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getUsedFileSize());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::hasRecentlyCreatedEmptyCopy() const {
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ if (_nodes[i].wasRecentlyCreated()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::operator==(const BucketInfoBase<NodeSeq>& other) const {
+ if (_nodes.size() != other._nodes.size()) {
+ return false;
+ }
+
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ if (_nodes[i].getNode() != other._nodes[i].getNode()) {
+ return false;
+ }
+
+ if (!(_nodes[i] == other._nodes[i])) {
+ return false;
+ }
+ }
+
+ return true;
+};
+
+}
diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
index ad56a4083f8..9dad421324e 100644
--- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
+++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
@@ -152,21 +152,21 @@ void __attribute__((noinline)) log_empty_bucket_insertion(const document::Bucket
}
+template <typename EntryType>
+void MapBucketDatabase::update_internal(EntryType&& new_entry) {
+ assert(new_entry.valid());
+ if (new_entry->getNodeCount() == 0) {
+ log_empty_bucket_insertion(new_entry.getBucketId());
+ }
+ Entry* found = find(0, 0, new_entry.getBucketId(), true);
+ assert(found);
+ *found = std::forward<EntryType>(new_entry);
+}
+
void
MapBucketDatabase::update(const Entry& newEntry)
{
- assert(newEntry.valid());
- if (newEntry->getNodeCount() == 0) {
- log_empty_bucket_insertion(newEntry.getBucketId());
- }
- LOG_BUCKET_OPERATION_NO_LOCK(
- newEntry.getBucketId(),
- vespalib::make_string(
- "bucketdb insert of %s", newEntry.toString().c_str()));
-
- Entry* found = find(0, 0, newEntry.getBucketId(), true);
- assert(found);
- *found = newEntry;
+ update_internal(newEntry);
}
void
@@ -334,20 +334,32 @@ MapBucketDatabase::upperBound(const document::BucketId& value) const
return Entry::createInvalid();
}
-template <typename EntryProcessorType>
+namespace {
+
+inline BucketDatabase::ConstEntryRef
+to_entry_ref(const BucketDatabase::Entry& e) {
+ return BucketDatabase::ConstEntryRef(
+ e.getBucketId(),
+ ConstBucketInfoRef(e->getLastGarbageCollectionTime(), e->getRawNodes()));
+}
+
+}
+
bool
MapBucketDatabase::forEach(int index,
- EntryProcessorType& processor,
+ EntryProcessor& processor,
uint8_t bitCount,
const document::BucketId& lowerBound,
- bool& process)
+ bool& process) const
{
if (index == -1) {
return true;
}
- E& e = _db[index];
- if (e.value != -1 && process && !processor.process(_values[e.value])) {
+ const E& e = _db[index];
+ if (e.value != -1 && process
+ && !processor.process(to_entry_ref(_values[e.value])))
+ {
return false;
}
@@ -377,16 +389,82 @@ MapBucketDatabase::forEach(EntryProcessor& processor,
const document::BucketId& after) const
{
bool process = false;
- MapBucketDatabase& mutableSelf(const_cast<MapBucketDatabase&>(*this));
- mutableSelf.forEach(0, processor, 0, after, process);
+ forEach(0, processor, 0, after, process);
}
-void
-MapBucketDatabase::forEach(MutableEntryProcessor& processor,
- const document::BucketId& after)
+struct MapDbMerger final : BucketDatabase::Merger {
+ MapBucketDatabase& _db;
+ BucketDatabase::Entry& _current_entry;
+ std::vector<BucketDatabase::Entry>& _to_insert;
+
+ MapDbMerger(MapBucketDatabase& db,
+ BucketDatabase::Entry& current_entry,
+ std::vector<BucketDatabase::Entry>& to_insert)
+ : _db(db),
+ _current_entry(current_entry),
+ _to_insert(to_insert)
+ {}
+
+ uint64_t bucket_key() const noexcept override {
+ return _current_entry.getBucketId().toKey();
+ }
+ document::BucketId bucket_id() const noexcept override {
+ return _current_entry.getBucketId();
+ }
+ BucketDatabase::Entry& current_entry() override {
+ return _current_entry;
+ }
+ void insert_before_current(const BucketDatabase::Entry& e) override {
+ _to_insert.emplace_back(e); // TODO movable
+ }
+};
+
+struct MapDbTrailingInserter final : BucketDatabase::TrailingInserter {
+ MapBucketDatabase& _db;
+ explicit MapDbTrailingInserter(MapBucketDatabase& db) : _db(db) {}
+
+ void insert_at_end(const BucketDatabase::Entry& e) override {
+ _db.update(e);
+ }
+};
+
+void MapBucketDatabase::merge_internal(int index,
+ MergingProcessor& processor,
+ std::vector<Entry>& to_insert,
+ std::vector<document::BucketId>& to_remove)
{
- bool process = false;
- forEach(0, processor, 0, after, process);
+ if (index == -1) {
+ return;
+ }
+ E& e = _db[index];
+ if (e.value != -1) {
+ Entry& entry = _values[e.value];
+ MapDbMerger merger(*this, entry, to_insert);
+ auto result = processor.merge(merger);
+ if (result == MergingProcessor::Result::KeepUnchanged) {
+ // No-op
+ } else if (result == MergingProcessor::Result::Update) {
+ // Also no-op since it's all in-place
+ } else if (result == MergingProcessor::Result::Skip) {
+ to_remove.emplace_back(entry.getBucketId());
+ }
+ }
+ merge_internal(e.e_0, processor, to_insert, to_remove);
+ merge_internal(e.e_1, processor, to_insert, to_remove);
+}
+
+void MapBucketDatabase::merge(MergingProcessor& processor) {
+ std::vector<document::BucketId> to_remove;
+ std::vector<Entry> to_insert;
+ merge_internal(0, processor, to_insert, to_remove);
+ for (const auto& bucket : to_remove) {
+ remove(bucket);
+ }
+ for (auto& e : to_insert) {
+ update_internal(std::move(e));
+ }
+ MapDbTrailingInserter inserter(*this);
+ processor.insert_remaining_at_end(inserter);
}
void
@@ -482,8 +560,8 @@ MapBucketDatabase::childCount(const document::BucketId& b) const
namespace {
struct Writer : public BucketDatabase::EntryProcessor {
std::ostream& _ost;
- Writer(std::ostream& ost) : _ost(ost) {}
- bool process(const BucketDatabase::Entry& e) override {
+ explicit Writer(std::ostream& ost) : _ost(ost) {}
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
_ost << e.toString() << "\n";
return true;
}
@@ -494,37 +572,16 @@ void
MapBucketDatabase::print(std::ostream& out, bool verbose,
const std::string& indent) const
{
+ out << "MapBucketDatabase(";
(void) indent;
if (verbose) {
Writer writer(out);
forEach(writer);
- /* Write out all the gory details to debug
- out << "Entries {";
- for (uint32_t i=0, n=_db.size(); i<n; ++i) {
- out << "\n" << indent << " " << _db[i].e_0 << "," << _db[i].e_1
- << "," << _db[i].value;
- }
- out << "\n" << indent << "}";
- out << "Free {";
- for (uint32_t i=0, n=_free.size(); i<n; ++i) {
- out << "\n" << indent << " " << _free[i];
- }
- out << "\n" << indent << "}";
- out << "Entries {";
- for (uint32_t i=0, n=_values.size(); i<n; ++i) {
- out << "\n" << indent << " " << _values[i];
- }
- out << "\n" << indent << "}";
- out << "Free {";
- for (uint32_t i=0, n=_freeValues.size(); i<n; ++i) {
- out << "\n" << indent << " " << _freeValues[i];
- }
- out << "\n" << indent << "}";
- */
} else {
out << "Size(" << size() << ") Nodes("
<< (_db.size() - _free.size() - 1) << ")";
}
+ out << ')';
}
} // storage
diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h
index a7a38f1d978..df3e055bd7a 100644
--- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h
+++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h
@@ -18,16 +18,16 @@ public:
void getAll(const document::BucketId& bucket, std::vector<Entry>& entries) const override;
void update(const Entry& newEntry) override;
void forEach(EntryProcessor&, const document::BucketId& after = document::BucketId()) const override;
- void forEach(MutableEntryProcessor&, const document::BucketId& after = document::BucketId()) override;
uint64_t size() const override { return _values.size() - _freeValues.size(); };
void clear() override;
uint32_t childCount(const document::BucketId&) const override;
Entry upperBound(const document::BucketId& value) const override;
+ void merge(MergingProcessor&) override;
+
document::BucketId getAppropriateBucket(uint16_t minBits, const document::BucketId& bid) override;
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
private:
struct E {
E() : value(-1), e_0(-1), e_1(-1) {};
@@ -42,14 +42,20 @@ private:
int e_1;
};
+ template <typename EntryType>
+ void update_internal(EntryType&& new_entry);
+
BucketDatabase::Entry* find(int idx, uint8_t bitCount, const document::BucketId& bid, bool create);
bool remove(int index, uint8_t bitCount, const document::BucketId& bId);
int findFirstInOrderNodeInclusive(int index) const;
int upperBoundImpl(int index, uint8_t depth, const document::BucketId& value) const;
- template <typename EntryProcessorType>
- bool forEach(int index, EntryProcessorType& processor, uint8_t bitCount,
- const document::BucketId& lowerBound, bool& process);
+ bool forEach(int index, EntryProcessor& processor, uint8_t bitCount,
+ const document::BucketId& lowerBound, bool& process) const;
+
+ void merge_internal(int index, MergingProcessor& processor,
+ std::vector<Entry>& to_insert,
+ std::vector<document::BucketId>& to_remove);
void findParents(int index, uint8_t bitCount, const document::BucketId& bid, std::vector<Entry>& entries) const;
void findAll(int index, uint8_t bitCount, const document::BucketId& bid, std::vector<Entry>& entries) const;
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 33fa80ab484..6236965a255 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -4,8 +4,8 @@
#include "bucket_db_prune_elision.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
-#include "simpleclusterinformation.h"
#include "distributormetricsset.h"
+#include "simpleclusterinformation.h"
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
@@ -152,26 +152,20 @@ BucketDBUpdater::removeSuperfluousBuckets(
// Remove all buckets not belonging to this distributor, or
// being on storage nodes that are no longer up.
- NodeRemover proc(
+ MergingNodeRemover proc(
oldClusterState,
*new_cluster_state,
_distributorComponent.getIndex(),
newDistribution,
- up_states);
- bucketDb.forEach(proc);
-
- for (const auto & bucket : proc.getBucketsToRemove()) {
- bucketDb.remove(bucket);
+ up_states,
+ move_to_read_only_db);
+
+ bucketDb.merge(proc);
+ // Non-owned entries vector only has contents if move_to_read_only_db is true.
+ // TODO either rewrite in terms of merge() or remove entirely
+ for (const auto& db_entry : proc.getNonOwnedEntries()) {
+ readOnlyDb.update(db_entry); // TODO Entry move support
}
- // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory...
- for (const auto& bucket : proc.getNonOwnedBuckets()) {
- if (move_to_read_only_db) {
- auto db_entry = bucketDb.get(bucket);
- readOnlyDb.update(db_entry); // TODO Entry move support
- }
- bucketDb.remove(bucket);
- }
-
}
}
@@ -217,7 +211,6 @@ BucketDBUpdater::storageDistributionChanged()
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
- _distributorComponent.getReadOnlyBucketSpaceRepo(),
_distributorComponent.getUniqueTimestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
}
@@ -273,7 +266,6 @@ BucketDBUpdater::onSetSystemState(
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
- _distributorComponent.getReadOnlyBucketSpaceRepo(),
cmd,
_outdatedNodesMap,
_distributorComponent.getUniqueTimestamp());
@@ -740,40 +732,39 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
return "";
}
-bool
-BucketDBUpdater::BucketListGenerator::process(BucketDatabase::Entry& e)
-{
- document::BucketId bucketId(e.getBucketId());
-
- const BucketCopy* copy(e->getNode(_node));
- if (copy) {
- _entries.emplace_back(bucketId, copy->getBucketInfo());
- }
- return true;
-}
-
-BucketDBUpdater::NodeRemover::NodeRemover(
+BucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
const lib::ClusterState& oldState,
const lib::ClusterState& s,
uint16_t localIndex,
const lib::Distribution& distribution,
- const char* upStates)
+ const char* upStates,
+ bool track_non_owned_entries)
: _oldState(oldState),
_state(s),
+ _available_nodes(),
_nonOwnedBuckets(),
- _removedBuckets(),
+ _removed_buckets(0),
_localIndex(localIndex),
_distribution(distribution),
_upStates(upStates),
+ _track_non_owned_entries(track_non_owned_entries),
_cachedDecisionSuperbucket(UINT64_MAX),
_cachedOwned(false)
-{}
+{
+ // TODO intersection of cluster state and distribution config
+ const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
+ _available_nodes.resize(storage_count);
+ for (uint16_t i = 0; i < storage_count; ++i) {
+ if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) {
+ _available_nodes[i] = true;
+ }
+ }
+}
void
-BucketDBUpdater::NodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
+BucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
{
LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
- LOG_BUCKET_OPERATION_NO_LOCK(bucketId, msg);
}
namespace {
@@ -786,7 +777,7 @@ uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution
}
bool
-BucketDBUpdater::NodeRemover::distributorOwnsBucket(
+BucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
const document::BucketId& bucketId) const
{
// TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
@@ -818,7 +809,7 @@ BucketDBUpdater::NodeRemover::distributorOwnsBucket(
}
void
-BucketDBUpdater::NodeRemover::setCopiesInEntry(
+BucketDBUpdater::MergingNodeRemover::setCopiesInEntry(
BucketDatabase::Entry& e,
const std::vector<BucketCopy>& copies) const
{
@@ -830,77 +821,75 @@ BucketDBUpdater::NodeRemover::setCopiesInEntry(
e->addNodes(copies, order);
LOG(spam, "Changed %s", e->toString().c_str());
- LOG_BUCKET_OPERATION_NO_LOCK(
- e.getBucketId(),
- vespalib::make_string("updated bucketdb entry to %s",
- e->toString().c_str()));
}
-void
-BucketDBUpdater::NodeRemover::removeEmptyBucket(const document::BucketId& bucketId)
+bool
+BucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const
{
- _removedBuckets.push_back(bucketId);
-
- LOG(spam,
- "After system state change %s, bucket %s now has no copies.",
- _oldState.getTextualDifference(_state).c_str(),
- bucketId.toString().c_str());
- LOG_BUCKET_OPERATION_NO_LOCK(bucketId, "bucket now has no copies");
+ const uint16_t n_nodes = e->getNodeCount();
+ for (uint16_t i = 0; i < n_nodes; i++) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (!storage_node_is_available(node_idx)) {
+ return true;
+ }
+ }
+ return false;
}
-bool
-BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e)
+BucketDatabase::MergingProcessor::Result
+BucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger)
{
- const document::BucketId& bucketId(e.getBucketId());
+ using Result = BucketDatabase::MergingProcessor::Result;
- LOG(spam, "Check for remove: bucket %s", e.toString().c_str());
- if (e->getNodeCount() == 0) {
- removeEmptyBucket(e.getBucketId());
- return true;
- }
+ document::BucketId bucketId(merger.bucket_id());
+ LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str());
if (!distributorOwnsBucket(bucketId)) {
- _nonOwnedBuckets.push_back(bucketId);
- return true;
+ // TODO remove in favor of DB snapshotting
+ if (_track_non_owned_entries) {
+ _nonOwnedBuckets.emplace_back(merger.current_entry());
+ }
+ return Result::Skip;
+ }
+ auto& e = merger.current_entry();
+
+ if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger?
+ return Result::Skip;
+ }
+
+ if (!has_unavailable_nodes(e)) {
+ return Result::KeepUnchanged;
}
std::vector<BucketCopy> remainingCopies;
for (uint16_t i = 0; i < e->getNodeCount(); i++) {
- Node n(NodeType::STORAGE, e->getNodeRef(i).getNode());
-
- // TODO replace with intersection hash set of config and cluster state
- if (_state.getNodeState(n).getState().oneOf(_upStates)) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (storage_node_is_available(node_idx)) {
remainingCopies.push_back(e->getNodeRef(i));
}
}
- if (remainingCopies.size() == e->getNodeCount()) {
- return true;
- }
-
if (remainingCopies.empty()) {
- removeEmptyBucket(bucketId);
+ ++_removed_buckets;
+ return Result::Skip;
} else {
setCopiesInEntry(e, remainingCopies);
+ return Result::Update;
}
+}
- return true;
+bool
+BucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept
+{
+ return ((index < _available_nodes.size()) && _available_nodes[index]);
}
-BucketDBUpdater::NodeRemover::~NodeRemover()
+BucketDBUpdater::MergingNodeRemover::~MergingNodeRemover()
{
- if ( !_removedBuckets.empty()) {
- std::ostringstream ost;
- ost << "After system state change "
- << _oldState.getTextualDifference(_state) << ", we removed "
- << "buckets. Data is unavailable until node comes back up. "
- << _removedBuckets.size() << " buckets removed:";
- for (uint32_t i=0; i < 10 && i < _removedBuckets.size(); ++i) {
- ost << " " << _removedBuckets[i];
- }
- if (_removedBuckets.size() >= 10) {
- ost << " ...";
- }
- LOGBM(info, "%s", ost.str().c_str());
+ if (_removed_buckets != 0) {
+ LOGBM(info, "After cluster state change %s, %" PRIu64 " buckets no longer "
+ "have available replicas. Documents in these buckets will "
+ "be unavailable until nodes come back up",
+ _oldState.getTextualDifference(_state).c_str(), _removed_buckets);
}
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index dbcf6699bdc..663ac488ef4 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -182,54 +182,43 @@ private:
friend class BucketDBUpdater_Test;
friend class MergeOperation_Test;
- class BucketListGenerator
- {
- public:
- BucketListGenerator(uint16_t node, BucketListMerger::BucketList& entries)
- : _node(node), _entries(entries) {};
-
- bool process(BucketDatabase::Entry&);
-
- private:
- uint16_t _node;
- BucketListMerger::BucketList& _entries;
- };
-
/**
Removes all copies of buckets that are on nodes that are down.
*/
- class NodeRemover : public BucketDatabase::MutableEntryProcessor
- {
+ class MergingNodeRemover : public BucketDatabase::MergingProcessor {
public:
- NodeRemover(const lib::ClusterState& oldState,
- const lib::ClusterState& s,
- uint16_t localIndex,
- const lib::Distribution& distribution,
- const char* upStates);
- ~NodeRemover() override;
-
- bool process(BucketDatabase::Entry& e) override;
+ MergingNodeRemover(const lib::ClusterState& oldState,
+ const lib::ClusterState& s,
+ uint16_t localIndex,
+ const lib::Distribution& distribution,
+ const char* upStates,
+ bool track_non_owned_entries);
+ ~MergingNodeRemover() override;
+
+ Result merge(BucketDatabase::Merger&) override;
void logRemove(const document::BucketId& bucketId, const char* msg) const;
bool distributorOwnsBucket(const document::BucketId&) const;
- const std::vector<document::BucketId>& getBucketsToRemove() const noexcept {
- return _removedBuckets;
- }
- const std::vector<document::BucketId>& getNonOwnedBuckets() const noexcept {
+ // TODO this is temporary until explicit DB snapshotting replaces read-only DB usage
+ const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept {
return _nonOwnedBuckets;
}
private:
void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
- void removeEmptyBucket(const document::BucketId& bucketId);
+
+ bool has_unavailable_nodes(const BucketDatabase::Entry&) const;
+ bool storage_node_is_available(uint16_t index) const noexcept;
const lib::ClusterState _oldState;
const lib::ClusterState _state;
- std::vector<document::BucketId> _nonOwnedBuckets;
- std::vector<document::BucketId> _removedBuckets;
+ std::vector<bool> _available_nodes;
+ std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
+ size_t _removed_buckets;
uint16_t _localIndex;
const lib::Distribution& _distribution;
const char* _upStates;
+ bool _track_non_owned_entries;
mutable uint64_t _cachedDecisionSuperbucket;
mutable bool _cachedOwned;
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 3237b4b4d71..96359bcec60 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -114,7 +114,9 @@ public:
* Checks whether a bucket needs to be split, and sends a split
* if so.
*/
- void checkBucketForSplit(document::BucketSpace bucketSpace, const BucketDatabase::Entry& e, uint8_t priority) override;
+ void checkBucketForSplit(document::BucketSpace bucketSpace,
+ const BucketDatabase::Entry& e,
+ uint8_t priority) override;
const lib::ClusterStateBundle& getClusterStateBundle() const override;
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 5a1ff31e2e7..5ae5d8dc3f8 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -244,7 +244,7 @@ IdealStateManager::generateAll(const document::Bucket &bucket,
void
IdealStateManager::getBucketStatus(
BucketSpace bucketSpace,
- const BucketDatabase::Entry& entry,
+ const BucketDatabase::ConstEntryRef& entry,
NodeMaintenanceStatsTracker& statsTracker,
std::ostream& out) const
{
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index 3bb6d0dd757..8566c67a51b 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -128,14 +128,14 @@ private:
StatusBucketVisitor(const IdealStateManager& ism, document::BucketSpace bucketSpace, std::ostream& out)
: _statsTracker(), _ism(ism), _bucketSpace(bucketSpace), _out(out) {}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
_ism.getBucketStatus(_bucketSpace, e, _statsTracker, _out);
return true;
}
};
friend class StatusBucketVisitor;
- void getBucketStatus(document::BucketSpace bucketSpace, const BucketDatabase::Entry& entry,
+ void getBucketStatus(document::BucketSpace bucketSpace, const BucketDatabase::ConstEntryRef& entry,
NodeMaintenanceStatsTracker& statsTracker, std::ostream& out) const;
void dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const;
};
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index cd18148a662..a89dd52775b 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -483,7 +483,7 @@ struct NextEntryFinder : public BucketDatabase::EntryProcessor {
NextEntryFinder(const document::BucketId& id)
: _first(true), _last(id), _next() {}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
document::BucketId bucket(e.getBucketId());
if (_first && bucket == _last) {
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
index 216646f0345..13f9c21eed0 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -131,24 +131,30 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat
return updated;
}
-// TODO take in bucket key instead
bool
-PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const
+PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const
{
- return (_iter < _entries.size()
- && _entries[_iter].bucket_key < bucketId.toKey());
+ return ((_iter < _entries.size())
+ && (_entries[_iter].bucket_key < bucket_key));
}
-// TODO take in bucket key instead
bool
-PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const
+PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const
{
- return _iter < _entries.size() && _entries[_iter].bucket_id() == bucketId;
+ return _iter < _entries.size() && _entries[_iter].bucket_key == bucket_key;
}
-bool
-PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e)
-{
+using MergeResult = BucketDatabase::MergingProcessor::Result;
+
+MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger) {
+ const uint64_t bucket_key = merger.bucket_key();
+
+ while (databaseIteratorHasPassedBucketInfoIterator(bucket_key)) {
+ LOG(spam, "Found new bucket %s, adding", _entries[_iter].bucket_id().toString().c_str());
+ addToMerger(merger, skipAllForSameBucket());
+ }
+
+ auto& e = merger.current_entry();
document::BucketId bucketId(e.getBucketId());
LOG(spam,
@@ -157,19 +163,10 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e)
bucketId.toString().c_str(),
e.getBucketInfo().toString().c_str());
- while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) {
- LOG(spam, "Found new bucket %s, adding",
- _entries[_iter].bucket_id().toString().c_str());
-
- _missingEntries.push_back(skipAllForSameBucket());
- }
-
bool updated(removeCopiesFromNodesThatWereRequested(e, bucketId));
- if (bucketInfoIteratorPointsToBucket(bucketId)) {
- LOG(spam, "Updating bucket %s",
- _entries[_iter].bucket_id().toString().c_str());
-
+ if (bucketInfoIteratorPointsToBucket(bucket_key)) {
+ LOG(spam, "Updating bucket %s", _entries[_iter].bucket_id().toString().c_str());
insertInfo(e, skipAllForSameBucket());
updated = true;
}
@@ -177,23 +174,24 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e)
if (updated) {
// Remove bucket if we've previously removed all nodes from it
if (e->getNodeCount() == 0) {
- _removedBuckets.push_back(bucketId);
+ return MergeResult::Skip;
} else {
e.getBucketInfo().updateTrusted();
+ return MergeResult::Update;
}
}
- LOG(spam,
- "After merging info from nodes [%s], bucket %s had info %s",
- requestNodesToString().c_str(),
- bucketId.toString().c_str(),
- e.getBucketInfo().toString().c_str());
+ return MergeResult::KeepUnchanged;
+}
- return true;
+void PendingBucketSpaceDbTransition::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) {
+ while (_iter < _entries.size()) {
+ addToInserter(inserter, skipAllForSameBucket());
+ }
}
void
-PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& range)
+PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, const Range& range)
{
LOG(spam, "Adding new bucket %s with %d copies",
_entries[range.first].bucket_id().toString().c_str(),
@@ -204,33 +202,37 @@ PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& r
if (e->getLastGarbageCollectionTime() == 0) {
e->setLastGarbageCollectionTime(
framework::MicroSecTime(_creationTimestamp)
- .getSeconds().getTime());
+ .getSeconds().getTime());
}
e.getBucketInfo().updateTrusted();
- db.update(e);
+ merger.insert_before_current(e);
}
void
-PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
+PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range)
{
- BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
- std::sort(_entries.begin(), _entries.end());
-
- db.forEach(*this);
-
- for (uint32_t i = 0; i < _removedBuckets.size(); ++i) {
- db.remove(_removedBuckets[i]);
- }
- _removedBuckets.clear();
+ // TODO dedupe
+ LOG(spam, "Adding new bucket %s with %d copies",
+ _entries[range.first].bucket_id().toString().c_str(),
+ range.second - range.first);
- // All of the remaining were not already in the bucket database.
- while (_iter < _entries.size()) {
- _missingEntries.push_back(skipAllForSameBucket());
+ BucketDatabase::Entry e(_entries[range.first].bucket_id(), BucketInfo());
+ insertInfo(e, range);
+ if (e->getLastGarbageCollectionTime() == 0) {
+ e->setLastGarbageCollectionTime(
+ framework::MicroSecTime(_creationTimestamp)
+ .getSeconds().getTime());
}
+ e.getBucketInfo().updateTrusted();
+ inserter.insert_at_end(e);
+}
- for (uint32_t i = 0; i < _missingEntries.size(); ++i) {
- addToBucketDB(db, _missingEntries[i]);
- }
+void
+PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
+{
+ BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
+ std::sort(_entries.begin(), _entries.end());
+ db.merge(*this);
}
void
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
index 7eb2974eb52..695f80750aa 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
@@ -20,7 +20,7 @@ class DistributorBucketSpace;
* reply result within a bucket space and apply it to the distributor
* bucket database when switching to the pending cluster state.
*/
-class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProcessor
+class PendingBucketSpaceDbTransition : public BucketDatabase::MergingProcessor
{
public:
using Entry = dbtransition::Entry;
@@ -51,8 +51,8 @@ private:
bool _bucketOwnershipTransfer;
std::unordered_map<uint16_t, size_t> _rejectedRequests;
- // BucketDataBase::MutableEntryProcessor API
- bool process(BucketDatabase::Entry& e) override;
+ BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override;
+ void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override;
/**
* Skips through all entries for the same bucket and returns
@@ -63,7 +63,8 @@ private:
std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
void insertInfo(BucketDatabase::Entry& info, const Range& range);
- void addToBucketDB(BucketDatabase& db, const Range& range);
+ void addToMerger(BucketDatabase::Merger& merger, const Range& range);
+ void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range);
bool nodeIsOutdated(uint16_t node) const {
return (_outdatedNodes.find(node) != _outdatedNodes.end());
@@ -75,8 +76,8 @@ private:
bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
// Helper methods for iterating over _entries
- bool databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const;
- bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const;
+ bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const;
+ bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const;
std::string requestNodesToString();
bool distributorChanged();
@@ -99,7 +100,7 @@ public:
std::shared_ptr<const ClusterInformation> clusterInfo,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp);
- ~PendingBucketSpaceDbTransition();
+ ~PendingBucketSpaceDbTransition() override;
// Merges all the results with the corresponding bucket database.
void mergeIntoBucketDatabase();
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 6cba7084037..8298b126690 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -28,7 +28,6 @@ PendingClusterState::PendingClusterState(
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
@@ -41,7 +40,6 @@ PendingClusterState::PendingClusterState(
_creationTimestamp(creationTimestamp),
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
- _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
_clusterStateVersion(_cmd->getClusterStateBundle().getVersion()),
_isVersionedTransition(true),
_bucketOwnershipTransfer(false),
@@ -56,7 +54,6 @@ PendingClusterState::PendingClusterState(
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
_prevClusterStateBundle(clusterInfo->getClusterStateBundle()),
@@ -66,7 +63,6 @@ PendingClusterState::PendingClusterState(
_creationTimestamp(creationTimestamp),
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
- _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
_clusterStateVersion(0),
_isVersionedTransition(false),
_bucketOwnershipTransfer(true),
@@ -76,7 +72,7 @@ PendingClusterState::PendingClusterState(
initializeBucketSpaceTransitions(true, OutdatedNodesMap());
}
-PendingClusterState::~PendingClusterState() {}
+PendingClusterState::~PendingClusterState() = default;
void
PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap)
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index cedc0573381..7aa35b32b8e 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -46,14 +46,13 @@ public:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
{
// Naked new due to private constructor
return std::unique_ptr<PendingClusterState>(new PendingClusterState(
- clock, clusterInfo, sender, bucketSpaceRepo, readOnlyBucketSpaceRepo,
+ clock, clusterInfo, sender, bucketSpaceRepo,
newStateCmd, outdatedNodesMap, creationTimestamp));
}
@@ -66,13 +65,11 @@ public:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp)
{
// Naked new due to private constructor
return std::unique_ptr<PendingClusterState>(new PendingClusterState(
- clock, clusterInfo, sender, bucketSpaceRepo,
- readOnlyBucketSpaceRepo, creationTimestamp));
+ clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp));
}
PendingClusterState(const PendingClusterState &) = delete;
@@ -167,7 +164,6 @@ private:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp);
@@ -181,7 +177,6 @@ private:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp);
struct BucketSpaceAndNode {
@@ -232,7 +227,6 @@ private:
DistributorMessageSender& _sender;
DistributorBucketSpaceRepo& _bucketSpaceRepo;
- DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo;
uint32_t _clusterStateVersion;
bool _isVersionedTransition;
bool _bucketOwnershipTransfer;