summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-06-05 11:04:50 +0200
committerGitHub <noreply@github.com>2019-06-05 11:04:50 +0200
commit2d10eff40215c7b11ae3b01a2fba45ad8135cc61 (patch)
tree1a9aa6d2318b226c4787802bd06fe3f6291fa1c5 /storage
parent80a3f22ab612d9cd1e6f0b249605b4342bebaead (diff)
parent3ec818fe80674e16b282534570a7c28ce80f5b84 (diff)
Merge pull request #9657 from vespa-engine/vekterli/more-efficient-bucket-db-bulk-apis
Add new DB merging API to distributor BucketDatabase
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdatabasetest.cpp242
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp102
-rw-r--r--storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp144
-rw-r--r--storage/src/vespa/storage/bucketdb/btree_bucket_database.h9
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketdatabase.cpp23
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketdatabase.h165
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.cpp179
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.h178
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketinfo.hpp165
-rw-r--r--storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp155
-rw-r--r--storage/src/vespa/storage/bucketdb/mapbucketdatabase.h16
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp157
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h49
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h4
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemanager.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp96
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h15
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h10
21 files changed, 1119 insertions, 604 deletions
diff --git a/storage/src/tests/distributor/bucketdatabasetest.cpp b/storage/src/tests/distributor/bucketdatabasetest.cpp
index 92e0c534e31..cfb54edbe78 100644
--- a/storage/src/tests/distributor/bucketdatabasetest.cpp
+++ b/storage/src/tests/distributor/bucketdatabasetest.cpp
@@ -1,5 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketdatabasetest.h"
+#include <vespa/vespalib/util/benchmark_timer.h>
+#include <chrono>
#include <iomanip>
#include <algorithm>
@@ -12,15 +14,25 @@ void BucketDatabaseTest::SetUp() {
}
namespace {
- BucketCopy BC(uint32_t nodeIdx) {
- return BucketCopy(0, nodeIdx, api::BucketInfo());
- }
- BucketInfo BI(uint32_t nodeIdx) {
- BucketInfo bi;
- bi.addNode(BC(nodeIdx), toVector<uint16_t>(0));
- return bi;
- }
+BucketCopy BC(uint32_t nodeIdx) {
+ return BucketCopy(0, nodeIdx, api::BucketInfo());
+}
+
+BucketInfo BI(uint32_t nodeIdx) {
+ BucketInfo bi;
+ bi.addNode(BC(nodeIdx), toVector<uint16_t>(0));
+ return bi;
+}
+
+BucketInfo BI3(uint32_t node0, uint32_t node1, uint32_t node2) {
+ BucketInfo bi;
+ bi.addNode(BC(node0), toVector<uint16_t>(node0, node1, node2));
+ bi.addNode(BC(node1), toVector<uint16_t>(node0, node1, node2));
+ bi.addNode(BC(node2), toVector<uint16_t>(node0, node1, node2));
+ return bi;
+}
+
}
TEST_P(BucketDatabaseTest, testClear) {
@@ -63,34 +75,23 @@ TEST_P(BucketDatabaseTest, testUpdateGetAndRemove) {
namespace {
-struct ModifyProcessor : public BucketDatabase::MutableEntryProcessor
-{
- bool process(BucketDatabase::Entry& e) override {
- if (e.getBucketId() == document::BucketId(16, 0x0b)) {
- e.getBucketInfo() = BI(7);
- } else if (e.getBucketId() == document::BucketId(16, 0x2a)) {
- e->clear();
- e->addNode(BC(4), toVector<uint16_t>(0));
- e->addNode(BC(5), toVector<uint16_t>(0));
- }
-
- return true;
- }
-};
-
struct ListAllProcessor : public BucketDatabase::EntryProcessor {
std::ostringstream ost;
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
ost << e << "\n";
return true;
}
};
-struct DummyProcessor : public BucketDatabase::EntryProcessor {
- std::ostringstream ost;
+std::string dump_db(const BucketDatabase& db) {
+ ListAllProcessor proc;
+ db.forEach(proc, document::BucketId());
+ return proc.ost.str();
+}
- bool process(const BucketDatabase::Entry&) override {
+struct DummyProcessor : public BucketDatabase::EntryProcessor {
+ bool process(const BucketDatabase::ConstEntryRef&) override {
return true;
}
};
@@ -99,7 +100,7 @@ struct DummyProcessor : public BucketDatabase::EntryProcessor {
struct StoppingProcessor : public BucketDatabase::EntryProcessor {
std::ostringstream ost;
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
ost << e << "\n";
if (e.getBucketId() == document::BucketId(16, 0x2a)) {
@@ -156,25 +157,6 @@ TEST_P(BucketDatabaseTest, testIterating) {
"node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"),
proc.ost.str());
}
-
- {
- ModifyProcessor alterProc;
- db().forEach(alterProc, document::BucketId());
- // Verify content after altering
- ListAllProcessor proc;
- db().forEach(proc);
-
- EXPECT_EQ(
- std::string(
- "BucketId(0x4000000000000010) : "
- "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
- "BucketId(0x400000000000002a) : "
- "node(idx=4,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false), "
- "node(idx=5,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
- "BucketId(0x400000000000000b) : "
- "node(idx=7,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"),
- proc.ost.str());
- }
}
std::string
@@ -552,4 +534,170 @@ TEST_P(BucketDatabaseTest, testChildCount) {
EXPECT_EQ(0u, db().childCount(BucketId(3, 5)));
}
+using Merger = BucketDatabase::Merger;
+using TrailingInserter = BucketDatabase::TrailingInserter;
+using Result = BucketDatabase::MergingProcessor::Result;
+
+namespace {
+
+struct KeepUnchangedMergingProcessor : BucketDatabase::MergingProcessor {
+ Result merge(Merger&) override {
+ return Result::KeepUnchanged;
+ }
+};
+
+struct SkipBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _skip_bucket;
+ explicit SkipBucketMergingProcessor(BucketId skip_bucket) : _skip_bucket(skip_bucket) {}
+
+ Result merge(Merger& m) override {
+ return (m.bucket_id() == _skip_bucket) ? Result::Skip : Result::KeepUnchanged;
+ }
+};
+
+struct UpdateBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _update_bucket;
+ explicit UpdateBucketMergingProcessor(BucketId update_bucket) : _update_bucket(update_bucket) {}
+
+ Result merge(Merger& m) override {
+ if (m.bucket_id() == _update_bucket) {
+ auto& e = m.current_entry();
+ // Add a replica and alter the current one.
+ e->addNode(BucketCopy(123456, 0, api::BucketInfo(2, 3, 4)), toVector<uint16_t>(0));
+ e->addNode(BucketCopy(234567, 1, api::BucketInfo(3, 4, 5)), toVector<uint16_t>(1));
+ return Result::Update;
+ }
+ return Result::KeepUnchanged;
+ }
+};
+
+struct InsertBeforeBucketMergingProcessor : BucketDatabase::MergingProcessor {
+ BucketId _before_bucket;
+ explicit InsertBeforeBucketMergingProcessor(BucketId before_bucket) : _before_bucket(before_bucket) {}
+
+ Result merge(Merger& m) override {
+ if (m.bucket_id() == _before_bucket) {
+ // Assumes _before_bucket is > the inserted bucket
+ m.insert_before_current(BucketDatabase::Entry(document::BucketId(16, 2), BI(2)));
+ }
+ return Result::KeepUnchanged;
+ }
+};
+
+struct InsertAtEndMergingProcessor : BucketDatabase::MergingProcessor {
+ Result merge(Merger&) override {
+ return Result::KeepUnchanged;
+ }
+
+ void insert_remaining_at_end(TrailingInserter& inserter) override {
+ inserter.insert_at_end(BucketDatabase::Entry(document::BucketId(16, 3), BI(3)));
+ }
+};
+
+}
+
+TEST_P(BucketDatabaseTest, merge_keep_unchanged_result_does_not_alter_db_contents) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ KeepUnchangedMergingProcessor proc;
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_entry_skipping_removes_entry_from_db) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+ db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3)));
+
+ SkipBucketMergingProcessor proc(BucketId(16, 2));
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_update_result_updates_entry_in_db) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ UpdateBucketMergingProcessor proc(BucketId(16, 1));
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x3,docs=4/4,bytes=5/5,trusted=false,active=false,ready=false), "
+ "node(idx=0,crc=0x2,docs=3/3,bytes=4/4,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_can_insert_entry_before_current_bucket) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 3), BI(3)));
+
+ InsertBeforeBucketMergingProcessor proc(BucketId(16, 1));
+ db().merge(proc);
+
+ // Bucket (...)00002 is inserted by the merge processor
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, merge_can_insert_entry_at_end) {
+ db().update(BucketDatabase::Entry(BucketId(16, 1), BI(1)));
+ db().update(BucketDatabase::Entry(BucketId(16, 2), BI(2)));
+
+ InsertAtEndMergingProcessor proc;
+ db().merge(proc);
+
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=2,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"
+ "BucketId(0x4000000000000003) : "
+ "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+}
+
+TEST_P(BucketDatabaseTest, DISABLED_benchmark_const_iteration) {
+ constexpr uint32_t superbuckets = 1u << 16u;
+ constexpr uint32_t sub_buckets = 14;
+ constexpr uint32_t n_buckets = superbuckets * sub_buckets;
+
+ std::vector<uint64_t> bucket_keys;
+ bucket_keys.reserve(n_buckets);
+
+ for (uint32_t sb = 0; sb < superbuckets; ++sb) {
+ for (uint64_t i = 0; i < sub_buckets; ++i) {
+ document::BucketId bucket(48, (i << 32ULL) | sb);
+ bucket_keys.emplace_back(bucket.toKey());
+ }
+ }
+ std::sort(bucket_keys.begin(), bucket_keys.end());
+ for (uint64_t k : bucket_keys) {
+ db().update(BucketDatabase::Entry(BucketId(BucketId::keyToBucketId(k)), BI3(0, 1, 2)));
+ }
+
+ auto elapsed = vespalib::BenchmarkTimer::benchmark([&] {
+ DummyProcessor proc;
+ db().forEach(proc, document::BucketId());
+ }, 5);
+ fprintf(stderr, "Full DB iteration of %s takes %g seconds\n",
+ db().toString(false).c_str(), elapsed);
+}
+
}
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 1cfc1692edb..cdaa6e9aaa3 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -514,7 +514,7 @@ public:
OutdatedNodesMap outdatedNodesMap;
state = PendingClusterState::createForClusterStateChange(
clock, clusterInfo, sender,
- owner.getBucketSpaceRepo(), owner.getReadOnlyBucketSpaceRepo(),
+ owner.getBucketSpaceRepo(),
cmd, outdatedNodesMap, api::Timestamp(1));
}
@@ -526,8 +526,7 @@ public:
owner.createClusterInfo(oldClusterState));
state = PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, owner.getBucketSpaceRepo(),
- owner.getReadOnlyBucketSpaceRepo(), api::Timestamp(1));
+ clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1));
}
};
@@ -543,6 +542,8 @@ public:
{
return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState);
}
+
+ uint32_t populate_bucket_db_via_request_bucket_info_for_benchmarking();
};
BucketDBUpdaterTest::BucketDBUpdaterTest()
@@ -863,14 +864,14 @@ TEST_F(BucketDBUpdaterTest, testBitChange) {
const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
auto sreply = std::make_shared<RequestBucketInfoReply>(req);
sreply->setAddress(storageAddress(0));
- api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
+ auto& vec = sreply->getBucketInfo();
if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
int cnt=0;
for (int i=0; cnt < 2; i++) {
lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution();
std::vector<uint16_t> distributors;
if (distribution.getIdealDistributorNode(
- lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"),
+ lib::ClusterState("bits:14 storage:1 distributor:2"),
document::BucketId(16, i))
== 0)
{
@@ -1373,8 +1374,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(),
- getReadOnlyBucketSpaceRepo(), api::Timestamp(1)));
+ clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1)));
sortSentMessagesByIndex(sender);
@@ -1508,7 +1508,7 @@ TEST_F(BucketDBUpdaterTest, testPendingClusterStateReceive) {
OutdatedNodesMap outdatedNodesMap;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, api::Timestamp(1)));
ASSERT_EQ(messageCount(3), sender.commands.size());
@@ -1617,7 +1617,7 @@ struct BucketDumper : public BucketDatabase::EntryProcessor
{
}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
document::BucketId bucketId(e.getBucketId());
ost << (uint32_t)bucketId.getRawId() << ":";
@@ -1661,7 +1661,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d"));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, beforeTime));
parseInputData(existingData, beforeTime, *state, includeBucketInfo);
@@ -1680,7 +1680,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString()));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, afterTime));
parseInputData(newData, afterTime, *state, includeBucketInfo);
@@ -1931,7 +1931,7 @@ struct FunctorProcessor : BucketDatabase::EntryProcessor {
template <typename F>
explicit FunctorProcessor(F&& f) : _f(std::forward<F>(f)) {}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
_f(e);
return true;
}
@@ -2580,19 +2580,17 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_trans
TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
// Need to trigger an initial edge to complete first bucket scan
- ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("storage:1 distributor:2"),
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"),
messageCount(1), 0));
_sender.clear();
- lib::ClusterState state("storage:1 distributor:1");
+ lib::ClusterState state("distributor:1 storage:1");
setSystemState(state);
constexpr uint32_t superbuckets = 1u << 16u;
constexpr uint32_t sub_buckets = 14;
constexpr uint32_t n_buckets = superbuckets * sub_buckets;
- vespalib::BenchmarkTimer timer(1.0);
-
ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO);
@@ -2610,6 +2608,7 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
}
}
+ vespalib::BenchmarkTimer timer(1.0);
// Global space has no buckets but will serve as a trigger for merging
// buckets into the DB. This lets us measure the overhead of just this part.
if (req.getBucketSpace() == FixedBucketSpaces::global_space()) {
@@ -2626,6 +2625,77 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
EXPECT_EQ(size_t(0), mutable_global_db().size());
}
+uint32_t BucketDBUpdaterTest::populate_bucket_db_via_request_bucket_info_for_benchmarking() {
+ // Need to trigger an initial edge to complete first bucket scan
+ setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0);
+ _sender.clear();
+
+ lib::ClusterState state("distributor:1 storage:1");
+ setSystemState(state);
+
+ constexpr uint32_t superbuckets = 1u << 16u;
+ constexpr uint32_t sub_buckets = 14;
+ constexpr uint32_t n_buckets = superbuckets * sub_buckets;
+
+ assert(_bucketSpaces.size() == _sender.commands.size());
+ for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
+ assert(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO);
+ const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.commands[bsi]);
+
+ auto sreply = std::make_shared<RequestBucketInfoReply>(req);
+ sreply->setAddress(storageAddress(0));
+ auto& vec = sreply->getBucketInfo();
+ if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
+ for (uint32_t sb = 0; sb < superbuckets; ++sb) {
+ for (uint64_t i = 0; i < sub_buckets; ++i) {
+ document::BucketId bucket(48, (i << 32ULL) | sb);
+ vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1)));
+ }
+ }
+ }
+ getBucketDBUpdater().onRequestBucketInfoReply(sreply);
+ }
+
+ assert(mutable_default_db().size() == n_buckets);
+ assert(mutable_global_db().size() == 0);
+ return n_buckets;
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_removing_buckets_for_unavailable_storage_nodes) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via ownership
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_no_buckets_removed_during_node_remover_db_pass) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ // TODO this benchmark is void if we further restrict the pruning elision logic to allow
+ // elision when storage nodes come online.
+ lib::ClusterState no_op_state("distributor:1 storage:2"); // Not removing any buckets
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan %u buckets with no-op action\n", timer.min_time(), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_node_remover_db_pass) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via all replicas gone
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
+}
+
TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");
diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
index a3715bb3d27..ef198cb1d3f 100644
--- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
+++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
@@ -35,6 +35,7 @@
namespace storage {
using Entry = BucketDatabase::Entry;
+using ConstEntryRef = BucketDatabase::ConstEntryRef;
using search::datastore::EntryRef;
using vespalib::ConstArrayRef;
using document::BucketId;
@@ -55,6 +56,12 @@ Entry entry_from_replica_array_ref(const BucketId& id, uint32_t gc_timestamp, Co
return Entry(id, BucketInfo(gc_timestamp, std::vector<BucketCopy>(replicas.begin(), replicas.end())));
}
+ConstEntryRef const_entry_ref_from_replica_array_ref(const BucketId& id, uint32_t gc_timestamp,
+ ConstArrayRef<BucketCopy> replicas)
+{
+ return ConstEntryRef(id, ConstBucketInfoRef(gc_timestamp, replicas));
+}
+
EntryRef entry_ref_from_value(uint64_t value) {
return EntryRef(value & 0xffffffffULL);
}
@@ -127,14 +134,27 @@ void BTreeBucketDatabase::commit_tree_changes() {
_tree.getAllocator().trimHoldLists(used_gen);
}
+Entry BTreeBucketDatabase::entry_from_value(uint64_t bucket_key, uint64_t value) const {
+ const auto replicas_ref = _store.get(entry_ref_from_value(value));
+ const auto bucket = BucketId(BucketId::keyToBucketId(bucket_key));
+ return entry_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref);
+}
+
Entry BTreeBucketDatabase::entry_from_iterator(const BTree::ConstIterator& iter) const {
if (!iter.valid()) {
return Entry::createInvalid();
}
+ return entry_from_value(iter.getKey(), iter.getData());
+}
+
+ConstEntryRef BTreeBucketDatabase::const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const {
+ if (!iter.valid()) {
+ return ConstEntryRef::createInvalid();
+ }
const auto value = iter.getData();
const auto replicas_ref = _store.get(entry_ref_from_value(value));
const auto bucket = BucketId(BucketId::keyToBucketId(iter.getKey()));
- return entry_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref);
+ return const_entry_ref_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref);
}
BucketId BTreeBucketDatabase::bucket_from_valid_iterator(const BTree::ConstIterator& iter) const {
@@ -216,8 +236,8 @@ BTreeBucketDatabase::find_parents_internal(const document::BucketId& bucket,
std::vector<Entry>& entries) const
{
const uint64_t bucket_key = bucket.toKey();
- uint64_t parent_key = 0;
- auto iter = _tree.begin();
+ const auto frozen_view = _tree.getFrozenView();
+ auto iter = frozen_view.begin();
// Start at the root level, descending towards the bucket itself.
// Try skipping as many levels of the tree as possible as we go.
uint32_t bits = 1;
@@ -228,11 +248,11 @@ BTreeBucketDatabase::find_parents_internal(const document::BucketId& bucket,
entries.emplace_back(entry_from_iterator(iter));
}
bits = next_parent_bit_seek_level(bits, candidate, bucket);
- parent_key = BucketId(bits, bucket.getRawId()).toKey();
+ const auto parent_key = BucketId(bits, bucket.getRawId()).toKey();
assert(parent_key > iter.getKey());
iter.seek(parent_key);
}
- return iter; // FIXME clang warns here due to copying...
+ return iter;
}
/*
@@ -286,30 +306,118 @@ void BTreeBucketDatabase::update(const Entry& newEntry) {
// FIXME but bit-tree code says "lowerBound" in impl and "after" in declaration???
void BTreeBucketDatabase::forEach(EntryProcessor& proc, const BucketId& after) const {
for (auto iter = _tree.upperBound(after.toKey()); iter.valid(); ++iter) {
- if (!proc.process(entry_from_iterator(iter))) {
+ if (!proc.process(const_entry_ref_from_iterator(iter))) {
break;
}
}
}
-void BTreeBucketDatabase::forEach(MutableEntryProcessor& proc, const BucketId& after) {
- for (auto iter = _tree.upperBound(after.toKey()); iter.valid(); ++iter) {
- // FIXME this is a horrible API which currently has to update every time since we don't
- // know from the function call itself whether something was in fact updated behind the scenes..!
- auto entry = entry_from_iterator(iter);
- bool should_continue = proc.process(entry);
- // TODO optimize this joyful mess :D
- update(entry);
- if (!should_continue) {
- break;
+struct BTreeBuilderMerger final : BucketDatabase::Merger {
+ BTreeBucketDatabase& _db;
+ BTreeBucketDatabase::BTree::Builder& _builder;
+ uint64_t _current_key;
+ uint64_t _current_value;
+ Entry _cached_entry;
+ bool _valid_cached_entry;
+
+ BTreeBuilderMerger(BTreeBucketDatabase& db,
+ BTreeBucketDatabase::BTree::Builder& builder)
+ : _db(db),
+ _builder(builder),
+ _current_key(0),
+ _current_value(0),
+ _cached_entry(),
+ _valid_cached_entry(false)
+ {}
+ ~BTreeBuilderMerger() override = default;
+
+ uint64_t bucket_key() const noexcept override {
+ return _current_key;
+ }
+ BucketId bucket_id() const noexcept override {
+ return BucketId(BucketId::keyToBucketId(_current_key));
+ }
+ Entry& current_entry() override {
+ if (!_valid_cached_entry) {
+ _cached_entry = _db.entry_from_value(_current_key, _current_value);
+ _valid_cached_entry = true;
}
+ return _cached_entry;
}
- //commit_tree_changes(); TODO should be done as bulk op!
+ void insert_before_current(const Entry& e) override {
+ const uint64_t bucket_key = e.getBucketId().toKey();
+ assert(bucket_key < _current_key);
+
+ auto replicas_ref = _db._store.add(e.getBucketInfo().getRawNodes());
+ const auto new_value = value_from(e.getBucketInfo().getLastGarbageCollectionTime(), replicas_ref);
+
+ _builder.insert(bucket_key, new_value);
+ }
+
+ void update_iteration_state(uint64_t key, uint64_t value) {
+ _current_key = key;
+ _current_value = value;
+ _valid_cached_entry = false;
+ }
+};
+
+struct BTreeTrailingInserter final : BucketDatabase::TrailingInserter {
+ BTreeBucketDatabase& _db;
+ BTreeBucketDatabase::BTree::Builder& _builder;
+
+ BTreeTrailingInserter(BTreeBucketDatabase& db,
+ BTreeBucketDatabase::BTree::Builder& builder)
+ : _db(db),
+ _builder(builder)
+ {}
+
+ ~BTreeTrailingInserter() override = default;
+
+ void insert_at_end(const Entry& e) override {
+ const uint64_t bucket_key = e.getBucketId().toKey();
+ const auto replicas_ref = _db._store.add(e.getBucketInfo().getRawNodes());
+ const auto new_value = value_from(e.getBucketInfo().getLastGarbageCollectionTime(), replicas_ref);
+ _builder.insert(bucket_key, new_value);
+ }
+};
+
+// TODO lbound arg?
+void BTreeBucketDatabase::merge(MergingProcessor& proc) {
+ BTreeBucketDatabase::BTree::Builder builder(_tree.getAllocator());
+ BTreeBuilderMerger merger(*this, builder);
+
+ // TODO for_each instead?
+ for (auto iter = _tree.begin(); iter.valid(); ++iter) {
+ const uint64_t key = iter.getKey();
+ const uint64_t value = iter.getData();
+ merger.update_iteration_state(key, value);
+
+ auto result = proc.merge(merger);
+
+ if (result == MergingProcessor::Result::KeepUnchanged) {
+ builder.insert(key, value); // Reuse array store ref with no changes
+ } else if (result == MergingProcessor::Result::Update) {
+ assert(merger._valid_cached_entry); // Must actually have been touched
+ assert(merger._cached_entry.valid());
+ _store.remove(entry_ref_from_value(value));
+ auto new_replicas_ref = _store.add(merger._cached_entry.getBucketInfo().getRawNodes());
+ const auto new_value = value_from(merger._cached_entry.getBucketInfo().getLastGarbageCollectionTime(), new_replicas_ref);
+ builder.insert(key, new_value);
+ } else if (result == MergingProcessor::Result::Skip) {
+ _store.remove(entry_ref_from_value(value));
+ } else {
+ abort();
+ }
+ }
+ BTreeTrailingInserter inserter(*this, builder);
+ proc.insert_remaining_at_end(inserter);
+
+ _tree.assign(builder);
+ commit_tree_changes();
}
Entry BTreeBucketDatabase::upperBound(const BucketId& bucket) const {
return entry_from_iterator(_tree.upperBound(bucket.toKey()));
-
}
uint64_t BTreeBucketDatabase::size() const {
diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
index d896f76db54..eb527071ad7 100644
--- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
+++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
@@ -39,6 +39,8 @@ public:
BTreeBucketDatabase();
~BTreeBucketDatabase() override;
+ void merge(MergingProcessor&) override;
+
// Ye olde bucket DB API:
Entry get(const document::BucketId& bucket) const override;
void remove(const document::BucketId& bucket) override;
@@ -48,7 +50,6 @@ public:
std::vector<Entry>& entries) const override;
void update(const Entry& newEntry) override;
void forEach(EntryProcessor&, const document::BucketId& after) const override;
- void forEach(MutableEntryProcessor&, const document::BucketId& after) override;
Entry upperBound(const document::BucketId& value) const override;
uint64_t size() const override;
void clear() override;
@@ -60,11 +61,17 @@ public:
const std::string& indent) const override;
private:
+ Entry entry_from_value(uint64_t bucket_key, uint64_t value) const;
Entry entry_from_iterator(const BTree::ConstIterator& iter) const;
+ ConstEntryRef const_entry_ref_from_iterator(const BTree::ConstIterator& iter) const;
document::BucketId bucket_from_valid_iterator(const BTree::ConstIterator& iter) const;
void commit_tree_changes();
BTree::ConstIterator find_parents_internal(const document::BucketId& bucket,
std::vector<Entry>& entries) const;
+
+ friend struct BTreeBuilderMerger;
+ friend struct BTreeMergingBuilder;
+ friend struct BTreeTrailingInserter;
};
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp
index 8a4aadfa5be..f2a561b83eb 100644
--- a/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.cpp
@@ -4,17 +4,6 @@
namespace storage {
-namespace {
- struct GetNextEntryProcessor : public BucketDatabase::EntryProcessor {
- BucketDatabase::Entry _entry;
-
- bool process(const BucketDatabase::Entry& e) override {
- _entry = e;
- return false;
- }
- };
-}
-
BucketDatabase::Entry
BucketDatabase::getNext(const document::BucketId& last) const
{
@@ -32,7 +21,8 @@ BucketDatabase::createAppropriateBucket(
return e;
}
-std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e)
+template <typename BucketInfoType>
+std::ostream& operator<<(std::ostream& o, const BucketDatabase::EntryBase<BucketInfoType>& e)
{
if (!e.valid()) {
o << "NONEXISTING";
@@ -42,12 +32,19 @@ std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e)
return o;
}
+template <typename BucketInfoType>
std::string
-BucketDatabase::Entry::toString() const
+BucketDatabase::EntryBase<BucketInfoType>::toString() const
{
std::ostringstream ost;
ost << *this;
return ost.str();
}
+template std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e);
+template std::ostream& operator<<(std::ostream& o, const BucketDatabase::ConstEntryRef& e);
+
+template class BucketDatabase::EntryBase<BucketInfo>;
+template class BucketDatabase::EntryBase<ConstBucketInfoRef>;
+
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.h b/storage/src/vespa/storage/bucketdb/bucketdatabase.h
index a5a70c488ca..d12c2d82972 100644
--- a/storage/src/vespa/storage/bucketdb/bucketdatabase.h
+++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.h
@@ -13,42 +13,48 @@ namespace storage {
class BucketDatabase : public vespalib::Printable
{
public:
- class Entry {
+ template <typename BucketInfoType>
+ class EntryBase {
document::BucketId _bucketId;
- BucketInfo _info;
+ BucketInfoType _info;
public:
- Entry() : _bucketId(0) {} // Invalid entry
- Entry(const document::BucketId& bId, BucketInfo bucketInfo)
+ EntryBase() : _bucketId(0) {} // Invalid entry
+ EntryBase(const document::BucketId& bId, const BucketInfoType& bucketInfo)
+ : _bucketId(bId), _info(bucketInfo) {}
+ EntryBase(const document::BucketId& bId, BucketInfoType&& bucketInfo)
: _bucketId(bId), _info(std::move(bucketInfo)) {}
- explicit Entry(const document::BucketId& bId) : _bucketId(bId) {}
+ explicit EntryBase(const document::BucketId& bId) : _bucketId(bId) {}
- bool operator==(const Entry& other) const {
+ bool operator==(const EntryBase& other) const {
return (_bucketId == other._bucketId && _info == other._info);
}
bool valid() const { return (_bucketId.getRawId() != 0); }
std::string toString() const;
const document::BucketId& getBucketId() const { return _bucketId; }
- const BucketInfo& getBucketInfo() const { return _info; }
- BucketInfo& getBucketInfo() { return _info; }
- BucketInfo* operator->() { return &_info; }
- const BucketInfo* operator->() const { return &_info; }
+ const BucketInfoType& getBucketInfo() const { return _info; }
+ BucketInfoType& getBucketInfo() { return _info; }
+ BucketInfoType* operator->() { return &_info; }
+ const BucketInfoType* operator->() const { return &_info; }
- static Entry createInvalid() {
- return Entry();
+ static EntryBase createInvalid() {
+ return EntryBase();
}
};
- template<typename T> struct Processor {
- virtual ~Processor() {}
+ using Entry = EntryBase<BucketInfo>;
+ // TODO avoid needing to touch memory just to get to the const entry ref
+ // TODO -> lazy ID to ConstArrayRef resolve
+ using ConstEntryRef = EntryBase<ConstBucketInfoRef>;
+
+ struct EntryProcessor {
+ virtual ~EntryProcessor() = default;
/** Return false to stop iterating. */
- virtual bool process(T& e) = 0;
+ virtual bool process(const ConstEntryRef& e) = 0;
};
- typedef Processor<const Entry> EntryProcessor;
- typedef Processor<Entry> MutableEntryProcessor;
- virtual ~BucketDatabase() {}
+ ~BucketDatabase() override = default;
virtual Entry get(const document::BucketId& bucket) const = 0;
virtual void remove(const document::BucketId& bucket) = 0;
@@ -76,9 +82,123 @@ public:
virtual void forEach(
EntryProcessor&,
const document::BucketId& after = document::BucketId()) const = 0;
- virtual void forEach(
- MutableEntryProcessor&,
- const document::BucketId& after = document::BucketId()) = 0;
+
+ /**
+ * Database implementation-specific interface for appending entries
+ * during a merge() operation.
+ */
+ struct TrailingInserter {
+ virtual ~TrailingInserter() = default;
+ /**
+ * Insert a new database entry at the end of the current bucket space.
+ *
+ * Precondition: the entry's bucket ID must sort after all entries that
+ * have already been iterated over or inserted via insert_at_end().
+ */
+ virtual void insert_at_end(const Entry&) = 0;
+ };
+
+ /**
+ * Database implementation-specific interface for accessing bucket
+ * entries and prepending entries during a merge() operation.
+ */
+ struct Merger {
+ virtual ~Merger() = default;
+
+ // TODO this should ideally be separated into read/write functions, but this
+ // will suffice for now to avoid too many changes.
+
+ /**
+ * Bucket key/ID of the currently iterated entry. Unless the information stored
+ * in the DB Entry is needed, using one of these methods should be preferred to
+ * getting the bucket ID via current_entry(). The underlying DB is expected to
+ * have cheap access to the ID but _may_ have expensive access to the entry itself.
+ */
+ virtual uint64_t bucket_key() const noexcept = 0;
+ virtual document::BucketId bucket_id() const noexcept = 0;
+ /**
+ * Returns a mutable representation of the currently iterated database
+ * entry. If changes are made to this object, Result::Update must be
+ * returned from merge(). Otherwise, mutation visibility is undefined.
+ */
+ virtual Entry& current_entry() = 0;
+ /**
+ * Insert a new entry into the bucket database that is ordered before the
+ * currently iterated entry.
+ *
+ * Preconditions:
+ * - The entry's bucket ID must sort _before_ the currently iterated
+ * entry's bucket ID, in "reversed bits" bucket key order.
+ * - The entry's bucket ID must sort _after_ any entries previously
+ * inserted with insert_before_current().
+ * - The entry's bucket ID must not be the same as a bucket that was
+ * already iterated over as part of the DB merge() call or inserted
+ * via a previous call to insert_before_current().
+ * Such buckets must be handled by explicitly updating the provided
+ * entry for the iterated bucket and returning Result::Update.
+ */
+ virtual void insert_before_current(const Entry&) = 0;
+ };
+
+ /**
+ * Interface to be implemented by callers that wish to receive callbacks
+ * during a bucket merge() operation.
+ */
+ struct MergingProcessor {
+ // See merge() for semantics on enum values.
+ enum class Result {
+ Update,
+ KeepUnchanged,
+ Skip
+ };
+
+ virtual ~MergingProcessor() = default;
+ /**
+ * Invoked for each existing bucket in the database, in bucket key order.
+ * The provided Merge instance may be used to access the current entry
+ * and prepend entries to the DB.
+ *
+ * Return value semantics:
+ * - Result::Update:
+ * when merge() returns, the changes made to the current entry will
+ * become visible in the bucket database.
+ * - Result::KeepUnchanged:
+ * when merge() returns, the entry will remain in the same state as
+ * it was when merge() was originally called.
+ * - Result::Skip:
+ * when merge() returns, the entry will no longer be part of the DB.
+ * Any entries added via insert_before_current() _will_ be present.
+ *
+ */
+ virtual Result merge(Merger&) = 0;
+ /**
+ * Invoked once after all existing buckets have been iterated over.
+ * The provided TrailingInserter instance may be used to append
+ * an arbitrary number of entries to the database.
+ *
+ * This is used to handle elements remaining at the end of a linear
+ * merge operation.
+ */
+ virtual void insert_remaining_at_end(TrailingInserter&) {}
+ };
+
+ /**
+ * Iterate over the bucket database in bucket key order, allowing an arbitrary
+ * number of buckets to be inserted, updated and skipped in a way that is
+ * optimized for the backing DB implementation.
+ *
+ * Merging happens in two stages:
+ * 1) The MergeProcessor argument's merge() function is invoked for each existing
+ * bucket in the database. At this point new buckets ordered before the iterated
+ * bucket may be inserted and the iterated bucket may be skipped or updated.
+ * 2) The MergeProcessor argument's insert_remaining_at_end() function is invoked
+ * once when all buckets have been iterated over. This enables the caller to
+ * insert new buckets that sort after the last iterated bucket.
+ *
+ * Changes made to the database are not guaranteed to be visible until
+ * merge() returns.
+ */
+ virtual void merge(MergingProcessor&) = 0;
/**
* Get the first bucket that does _not_ compare less than or equal to
@@ -114,6 +234,7 @@ public:
virtual uint32_t childCount(const document::BucketId&) const = 0;
};
-std::ostream& operator<<(std::ostream& o, const BucketDatabase::Entry& e);
+template <typename BucketInfoType>
+std::ostream& operator<<(std::ostream& o, const BucketDatabase::EntryBase<BucketInfoType>& e);
} // storage
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
index 82fc6686c5f..17efa1658ce 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.cpp
@@ -1,18 +1,18 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucketinfo.h"
+#include "bucketinfo.hpp"
#include <vespa/storage/storageutil/utils.h>
#include <algorithm>
namespace storage {
-BucketInfo::BucketInfo()
- : _lastGarbageCollection(0),
- _nodes()
-{}
+template class BucketInfoBase<std::vector<BucketCopy>>;
+template class BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>>;
+
+BucketInfo::BucketInfo() : BucketInfoBase() {}
BucketInfo::BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes)
- : _lastGarbageCollection(lastGarbageCollection),
- _nodes(std::move(nodes))
+ : BucketInfoBase(lastGarbageCollection, std::move(nodes))
{}
BucketInfo::~BucketInfo() = default;
@@ -22,38 +22,6 @@ BucketInfo& BucketInfo::operator=(const BucketInfo&) = default;
BucketInfo::BucketInfo(BucketInfo&&) noexcept = default;
BucketInfo& BucketInfo::operator=(BucketInfo&&) noexcept = default;
-std::string
-BucketInfo::toString() const {
- std::ostringstream ost;
- print(ost, true, "");
- return ost.str();
-}
-
-bool
-BucketInfo::emptyAndConsistent() const {
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (!_nodes[i].empty()) return false;
- }
- return consistentNodes();
-}
-
-bool
-BucketInfo::validAndConsistent() const {
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (!_nodes[i].valid()) return false;
- }
- return consistentNodes();
-}
-
-bool
-BucketInfo::hasInvalidCopy() const
-{
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (!_nodes[i].valid()) return true;
- }
- return false;
-}
-
void
BucketInfo::updateTrusted() {
if (validAndConsistent()) {
@@ -90,40 +58,6 @@ BucketInfo::resetTrusted() {
updateTrusted();
}
-uint16_t
-BucketInfo::getTrustedCount() const {
- uint32_t trustedCount = 0;
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- if (_nodes[i].trusted()) {
- trustedCount++;
- }
- }
- return trustedCount;
-}
-
-bool
-BucketInfo::consistentNodes(bool countInvalidAsConsistent) const
-{
- int compareIndex = 0;
- for (uint32_t i=1; i<_nodes.size(); i++) {
- if (!_nodes[i].consistentWith(_nodes[compareIndex],
- countInvalidAsConsistent)) return false;
- }
- return true;
-}
-
-void
-BucketInfo::print(std::ostream& out, bool verbose, const std::string& indent) const
-{
- if (_nodes.empty()) {
- out << "no nodes";
- }
- for (uint32_t i=0; i<_nodes.size(); ++i) {
- if (i != 0) out << ", ";
- _nodes[i].print(out, verbose, indent);
- }
-}
-
namespace {
struct Sorter {
@@ -225,19 +159,6 @@ BucketInfo::removeNode(unsigned short node, TrustedUpdate update)
return false;
}
-const BucketCopy*
-BucketInfo::getNode(uint16_t node) const
-{
- for (std::vector<BucketCopy>::const_iterator iter = _nodes.begin();
- iter != _nodes.end();
- iter++) {
- if (iter->getNode() == node) {
- return &*iter;
- }
- }
- return 0;
-}
-
BucketCopy*
BucketInfo::getNodeInternal(uint16_t node)
{
@@ -251,92 +172,4 @@ BucketInfo::getNodeInternal(uint16_t node)
return 0;
}
-std::vector<uint16_t>
-BucketInfo::getNodes() const {
- std::vector<uint16_t> result;
-
- for (uint32_t i = 0; i < _nodes.size(); i++) {
- result.push_back(_nodes[i].getNode());
- }
-
- return result;
-}
-
-uint32_t
-BucketInfo::getHighestDocumentCount() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getDocumentCount());
- }
- return highest;
-}
-
-uint32_t
-BucketInfo::getHighestTotalDocumentSize() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getTotalDocumentSize());
- }
- return highest;
-}
-
-uint32_t
-BucketInfo::getHighestMetaCount() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getMetaCount());
- }
- return highest;
-}
-
-uint32_t
-BucketInfo::getHighestUsedFileSize() const
-{
- uint32_t highest = 0;
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- highest = std::max(highest, _nodes[i].getUsedFileSize());
- }
- return highest;
-}
-
-bool
-BucketInfo::hasRecentlyCreatedEmptyCopy() const
-{
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- if (_nodes[i].wasRecentlyCreated()) {
- return true;
- }
- }
- return false;
-}
-
-bool
-BucketInfo::operator==(const BucketInfo& other) const
-{
- if (_nodes.size() != other._nodes.size()) {
- return false;
- }
-
- for (uint32_t i = 0; i < _nodes.size(); ++i) {
- if (_nodes[i].getNode() != other._nodes[i].getNode()) {
- return false;
- }
-
- if (!(_nodes[i] == other._nodes[i])) {
- return false;
- }
- }
-
- return true;
-};
-
-std::ostream&
-operator<<(std::ostream& out, const BucketInfo& info) {
- info.print(out, false, "");
- return out;
-}
-
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.h b/storage/src/vespa/storage/bucketdb/bucketinfo.h
index 8949c0f10da..f0be700d204 100644
--- a/storage/src/vespa/storage/bucketdb/bucketinfo.h
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.h
@@ -2,6 +2,7 @@
#pragma once
#include "bucketcopy.h"
+#include <vespa/vespalib/util/arrayref.h>
namespace storage {
@@ -14,45 +15,37 @@ enum class TrustedUpdate {
DEFER
};
-class BucketInfo
+template <typename NodeSeq>
+class BucketInfoBase
{
-private:
+protected:
uint32_t _lastGarbageCollection;
- std::vector<BucketCopy> _nodes;
-
+ NodeSeq _nodes;
public:
- BucketInfo();
- BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes);
- ~BucketInfo();
-
- BucketInfo(const BucketInfo&);
- BucketInfo& operator=(const BucketInfo&);
- BucketInfo(BucketInfo&&) noexcept;
- BucketInfo& operator=(BucketInfo&&) noexcept;
+ BucketInfoBase()
+ : _lastGarbageCollection(0),
+ _nodes()
+ {}
+ BucketInfoBase(uint32_t lastGarbageCollection, const NodeSeq& nodes)
+ : _lastGarbageCollection(lastGarbageCollection),
+ _nodes(nodes)
+ {}
+ BucketInfoBase(uint32_t lastGarbageCollection, NodeSeq&& nodes)
+ : _lastGarbageCollection(lastGarbageCollection),
+ _nodes(std::move(nodes))
+ {}
+ ~BucketInfoBase() = default;
+
+ BucketInfoBase(const BucketInfoBase&) = default;
+ BucketInfoBase& operator=(const BucketInfoBase&) = default;
+ BucketInfoBase(BucketInfoBase&&) noexcept = default;
+ BucketInfoBase& operator=(BucketInfoBase&&) noexcept = default;
/**
* @return Returns the last time when this bucket was "garbage collected".
*/
uint32_t getLastGarbageCollectionTime() const { return _lastGarbageCollection; }
- /**
- * Sets the last time the bucket was "garbage collected".
- */
- void setLastGarbageCollectionTime(uint32_t timestamp) {
- _lastGarbageCollection = timestamp;
- }
-
- /**
- Update trusted flags if bucket is now complete and consistent.
- */
- void updateTrusted();
-
- /**
- Removes any historical information on trustedness, and sets the bucket copies to
- trusted if they are now complete and consistent.
- */
- void resetTrusted();
-
/** True if the bucket contains no documents and is consistent. */
bool emptyAndConsistent() const;
@@ -84,10 +77,89 @@ public:
*/
bool consistentNodes(bool countInvalidAsConsistent = false) const;
- static bool mayContain(const BucketInfo&) { return true; }
void print(std::ostream&, bool verbose, const std::string& indent) const;
/**
+ * Returns the bucket copy struct for the given node, null if nonexisting
+ */
+ const BucketCopy* getNode(uint16_t node) const;
+
+ /**
+ * Returns the number of nodes this entry has.
+ */
+ uint32_t getNodeCount() const noexcept { return static_cast<uint32_t>(_nodes.size()); }
+
+ /**
+ * Returns a list of the nodes this entry has.
+ */
+ std::vector<uint16_t> getNodes() const;
+
+ /**
+ Returns a reference to the node with the given index in the node
+ array. This operation has undefined behaviour if the index given
+ is not within the node count.
+ */
+ const BucketCopy& getNodeRef(uint16_t idx) const {
+ return _nodes[idx];
+ }
+
+ const NodeSeq& getRawNodes() const noexcept {
+ return _nodes;
+ }
+
+ std::string toString() const;
+
+ uint32_t getHighestDocumentCount() const;
+ uint32_t getHighestTotalDocumentSize() const;
+ uint32_t getHighestMetaCount() const;
+ uint32_t getHighestUsedFileSize() const;
+
+ bool hasRecentlyCreatedEmptyCopy() const;
+
+ bool operator==(const BucketInfoBase& other) const;
+};
+
+template <typename NodeSeq>
+std::ostream& operator<<(std::ostream& out, const BucketInfoBase<NodeSeq>& info) {
+ info.print(out, false, "");
+ return out;
+}
+
+class ConstBucketInfoRef : public BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>> {
+public:
+ using BucketInfoBase::BucketInfoBase;
+};
+
+class BucketInfo : public BucketInfoBase<std::vector<BucketCopy>> {
+public:
+ BucketInfo();
+ BucketInfo(uint32_t lastGarbageCollection, std::vector<BucketCopy> nodes);
+ ~BucketInfo();
+
+ BucketInfo(const BucketInfo&);
+ BucketInfo& operator=(const BucketInfo&);
+ BucketInfo(BucketInfo&&) noexcept;
+ BucketInfo& operator=(BucketInfo&&) noexcept;
+
+ /**
+ * Sets the last time the bucket was "garbage collected".
+ */
+ void setLastGarbageCollectionTime(uint32_t timestamp) {
+ _lastGarbageCollection = timestamp;
+ }
+
+ /**
+ Update trusted flags if bucket is now complete and consistent.
+ */
+ void updateTrusted();
+
+ /**
+ Removes any historical information on trustedness, and sets the bucket copies to
+ trusted if they are now complete and consistent.
+ */
+ void resetTrusted();
+
+ /**
Adds the given node.
@param recommendedOrder A recommended ordering of nodes.
@@ -118,34 +190,6 @@ public:
*/
bool removeNode(uint16_t node, TrustedUpdate update = TrustedUpdate::UPDATE);
- /**
- * Returns the bucket copy struct for the given node, null if nonexisting
- */
- const BucketCopy* getNode(uint16_t node) const;
-
- /**
- * Returns the number of nodes this entry has.
- */
- uint32_t getNodeCount() const noexcept { return static_cast<uint32_t>(_nodes.size()); }
-
- /**
- * Returns a list of the nodes this entry has.
- */
- std::vector<uint16_t> getNodes() const;
-
- /**
- Returns a reference to the node with the given index in the node
- array. This operation has undefined behaviour if the index given
- is not within the node count.
- */
- const BucketCopy& getNodeRef(uint16_t idx) const {
- return _nodes[idx];
- }
-
- const std::vector<BucketCopy>& getRawNodes() const noexcept {
- return _nodes;
- }
-
void clearTrusted(uint16_t nodeIdx) {
getNodeInternal(nodeIdx)->clearTrusted();
}
@@ -155,19 +199,6 @@ public:
*/
void clear() { _nodes.clear(); }
- std::string toString() const;
-
- bool verifyLegal() const { return true; }
-
- uint32_t getHighestDocumentCount() const;
- uint32_t getHighestTotalDocumentSize() const;
- uint32_t getHighestMetaCount() const;
- uint32_t getHighestUsedFileSize() const;
-
- bool hasRecentlyCreatedEmptyCopy() const;
-
- bool operator==(const BucketInfo& other) const;
-
private:
friend class distributor::DistributorTestUtil;
@@ -183,7 +214,8 @@ private:
void addNodeManual(const BucketCopy& newCopy) { _nodes.push_back(newCopy); }
};
-std::ostream& operator<<(std::ostream& out, const BucketInfo& info);
+extern template class BucketInfoBase<std::vector<BucketCopy>>;
+extern template class BucketInfoBase<vespalib::ConstArrayRef<BucketCopy>>;
}
diff --git a/storage/src/vespa/storage/bucketdb/bucketinfo.hpp b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp
new file mode 100644
index 00000000000..562e8d562fb
--- /dev/null
+++ b/storage/src/vespa/storage/bucketdb/bucketinfo.hpp
@@ -0,0 +1,165 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucketcopy.h"
+#include <vespa/vespalib/util/arrayref.h>
+#include <iostream>
+#include <sstream>
+
+namespace storage {
+
+template <typename NodeSeq>
+std::string BucketInfoBase<NodeSeq>::toString() const {
+ std::ostringstream ost;
+ print(ost, true, "");
+ return ost.str();
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::emptyAndConsistent() const {
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (!_nodes[i].empty()) {
+ return false;
+ }
+ }
+ return consistentNodes();
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::validAndConsistent() const {
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (!_nodes[i].valid()) {
+ return false;
+ }
+ }
+ return consistentNodes();
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::hasInvalidCopy() const {
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (!_nodes[i].valid()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+template <typename NodeSeq>
+uint16_t BucketInfoBase<NodeSeq>::getTrustedCount() const {
+ uint32_t trustedCount = 0;
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ if (_nodes[i].trusted()) {
+ trustedCount++;
+ }
+ }
+ return trustedCount;
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::consistentNodes(bool countInvalidAsConsistent) const {
+ int compareIndex = 0;
+ for (uint32_t i = 1; i < _nodes.size(); i++) {
+ if (!_nodes[i].consistentWith(_nodes[compareIndex],
+ countInvalidAsConsistent)) return false;
+ }
+ return true;
+}
+
+template <typename NodeSeq>
+void BucketInfoBase<NodeSeq>::print(std::ostream& out, bool verbose, const std::string& indent) const {
+ if (_nodes.size() == 0) {
+ out << "no nodes";
+ }
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ if (i != 0) {
+ out << ", ";
+ }
+ _nodes[i].print(out, verbose, indent);
+ }
+}
+
+template <typename NodeSeq>
+const BucketCopy* BucketInfoBase<NodeSeq>::getNode(uint16_t node) const {
+ for (const auto& n : _nodes) {
+ if (n.getNode() == node) {
+ return &n;
+ }
+ }
+ return 0;
+}
+
+template <typename NodeSeq>
+std::vector<uint16_t> BucketInfoBase<NodeSeq>::getNodes() const {
+ std::vector<uint16_t> result;
+ for (uint32_t i = 0; i < _nodes.size(); i++) {
+ result.emplace_back(_nodes[i].getNode());
+ }
+ return result;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestDocumentCount() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getDocumentCount());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestTotalDocumentSize() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getTotalDocumentSize());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestMetaCount() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getMetaCount());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+uint32_t BucketInfoBase<NodeSeq>::getHighestUsedFileSize() const {
+ uint32_t highest = 0;
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ highest = std::max(highest, _nodes[i].getUsedFileSize());
+ }
+ return highest;
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::hasRecentlyCreatedEmptyCopy() const {
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ if (_nodes[i].wasRecentlyCreated()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+template <typename NodeSeq>
+bool BucketInfoBase<NodeSeq>::operator==(const BucketInfoBase<NodeSeq>& other) const {
+ if (_nodes.size() != other._nodes.size()) {
+ return false;
+ }
+
+ for (uint32_t i = 0; i < _nodes.size(); ++i) {
+ if (_nodes[i].getNode() != other._nodes[i].getNode()) {
+ return false;
+ }
+
+ if (!(_nodes[i] == other._nodes[i])) {
+ return false;
+ }
+ }
+
+ return true;
+};
+
+}
diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
index ad56a4083f8..9dad421324e 100644
--- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
+++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
@@ -152,21 +152,21 @@ void __attribute__((noinline)) log_empty_bucket_insertion(const document::Bucket
}
+template <typename EntryType>
+void MapBucketDatabase::update_internal(EntryType&& new_entry) {
+ assert(new_entry.valid());
+ if (new_entry->getNodeCount() == 0) {
+ log_empty_bucket_insertion(new_entry.getBucketId());
+ }
+ Entry* found = find(0, 0, new_entry.getBucketId(), true);
+ assert(found);
+ *found = std::forward<EntryType>(new_entry);
+}
+
void
MapBucketDatabase::update(const Entry& newEntry)
{
- assert(newEntry.valid());
- if (newEntry->getNodeCount() == 0) {
- log_empty_bucket_insertion(newEntry.getBucketId());
- }
- LOG_BUCKET_OPERATION_NO_LOCK(
- newEntry.getBucketId(),
- vespalib::make_string(
- "bucketdb insert of %s", newEntry.toString().c_str()));
-
- Entry* found = find(0, 0, newEntry.getBucketId(), true);
- assert(found);
- *found = newEntry;
+ update_internal(newEntry);
}
void
@@ -334,20 +334,32 @@ MapBucketDatabase::upperBound(const document::BucketId& value) const
return Entry::createInvalid();
}
-template <typename EntryProcessorType>
+namespace {
+
+inline BucketDatabase::ConstEntryRef
+to_entry_ref(const BucketDatabase::Entry& e) {
+ return BucketDatabase::ConstEntryRef(
+ e.getBucketId(),
+ ConstBucketInfoRef(e->getLastGarbageCollectionTime(), e->getRawNodes()));
+}
+
+}
+
bool
MapBucketDatabase::forEach(int index,
- EntryProcessorType& processor,
+ EntryProcessor& processor,
uint8_t bitCount,
const document::BucketId& lowerBound,
- bool& process)
+ bool& process) const
{
if (index == -1) {
return true;
}
- E& e = _db[index];
- if (e.value != -1 && process && !processor.process(_values[e.value])) {
+ const E& e = _db[index];
+ if (e.value != -1 && process
+ && !processor.process(to_entry_ref(_values[e.value])))
+ {
return false;
}
@@ -377,16 +389,82 @@ MapBucketDatabase::forEach(EntryProcessor& processor,
const document::BucketId& after) const
{
bool process = false;
- MapBucketDatabase& mutableSelf(const_cast<MapBucketDatabase&>(*this));
- mutableSelf.forEach(0, processor, 0, after, process);
+ forEach(0, processor, 0, after, process);
}
-void
-MapBucketDatabase::forEach(MutableEntryProcessor& processor,
- const document::BucketId& after)
+struct MapDbMerger final : BucketDatabase::Merger {
+ MapBucketDatabase& _db;
+ BucketDatabase::Entry& _current_entry;
+ std::vector<BucketDatabase::Entry>& _to_insert;
+
+ MapDbMerger(MapBucketDatabase& db,
+ BucketDatabase::Entry& current_entry,
+ std::vector<BucketDatabase::Entry>& to_insert)
+ : _db(db),
+ _current_entry(current_entry),
+ _to_insert(to_insert)
+ {}
+
+ uint64_t bucket_key() const noexcept override {
+ return _current_entry.getBucketId().toKey();
+ }
+ document::BucketId bucket_id() const noexcept override {
+ return _current_entry.getBucketId();
+ }
+ BucketDatabase::Entry& current_entry() override {
+ return _current_entry;
+ }
+ void insert_before_current(const BucketDatabase::Entry& e) override {
+ _to_insert.emplace_back(e); // TODO movable
+ }
+};
+
+struct MapDbTrailingInserter final : BucketDatabase::TrailingInserter {
+ MapBucketDatabase& _db;
+ explicit MapDbTrailingInserter(MapBucketDatabase& db) : _db(db) {}
+
+ void insert_at_end(const BucketDatabase::Entry& e) override {
+ _db.update(e);
+ }
+};
+
+void MapBucketDatabase::merge_internal(int index,
+ MergingProcessor& processor,
+ std::vector<Entry>& to_insert,
+ std::vector<document::BucketId>& to_remove)
{
- bool process = false;
- forEach(0, processor, 0, after, process);
+ if (index == -1) {
+ return;
+ }
+ E& e = _db[index];
+ if (e.value != -1) {
+ Entry& entry = _values[e.value];
+ MapDbMerger merger(*this, entry, to_insert);
+ auto result = processor.merge(merger);
+ if (result == MergingProcessor::Result::KeepUnchanged) {
+ // No-op
+ } else if (result == MergingProcessor::Result::Update) {
+ // Also no-op since it's all in-place
+ } else if (result == MergingProcessor::Result::Skip) {
+ to_remove.emplace_back(entry.getBucketId());
+ }
+ }
+ merge_internal(e.e_0, processor, to_insert, to_remove);
+ merge_internal(e.e_1, processor, to_insert, to_remove);
+}
+
+void MapBucketDatabase::merge(MergingProcessor& processor) {
+ std::vector<document::BucketId> to_remove;
+ std::vector<Entry> to_insert;
+ merge_internal(0, processor, to_insert, to_remove);
+ for (const auto& bucket : to_remove) {
+ remove(bucket);
+ }
+ for (auto& e : to_insert) {
+ update_internal(std::move(e));
+ }
+ MapDbTrailingInserter inserter(*this);
+ processor.insert_remaining_at_end(inserter);
}
void
@@ -482,8 +560,8 @@ MapBucketDatabase::childCount(const document::BucketId& b) const
namespace {
struct Writer : public BucketDatabase::EntryProcessor {
std::ostream& _ost;
- Writer(std::ostream& ost) : _ost(ost) {}
- bool process(const BucketDatabase::Entry& e) override {
+ explicit Writer(std::ostream& ost) : _ost(ost) {}
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
_ost << e.toString() << "\n";
return true;
}
@@ -494,37 +572,16 @@ void
MapBucketDatabase::print(std::ostream& out, bool verbose,
const std::string& indent) const
{
+ out << "MapBucketDatabase(";
(void) indent;
if (verbose) {
Writer writer(out);
forEach(writer);
- /* Write out all the gory details to debug
- out << "Entries {";
- for (uint32_t i=0, n=_db.size(); i<n; ++i) {
- out << "\n" << indent << " " << _db[i].e_0 << "," << _db[i].e_1
- << "," << _db[i].value;
- }
- out << "\n" << indent << "}";
- out << "Free {";
- for (uint32_t i=0, n=_free.size(); i<n; ++i) {
- out << "\n" << indent << " " << _free[i];
- }
- out << "\n" << indent << "}";
- out << "Entries {";
- for (uint32_t i=0, n=_values.size(); i<n; ++i) {
- out << "\n" << indent << " " << _values[i];
- }
- out << "\n" << indent << "}";
- out << "Free {";
- for (uint32_t i=0, n=_freeValues.size(); i<n; ++i) {
- out << "\n" << indent << " " << _freeValues[i];
- }
- out << "\n" << indent << "}";
- */
} else {
out << "Size(" << size() << ") Nodes("
<< (_db.size() - _free.size() - 1) << ")";
}
+ out << ')';
}
} // storage
diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h
index a7a38f1d978..df3e055bd7a 100644
--- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h
+++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h
@@ -18,16 +18,16 @@ public:
void getAll(const document::BucketId& bucket, std::vector<Entry>& entries) const override;
void update(const Entry& newEntry) override;
void forEach(EntryProcessor&, const document::BucketId& after = document::BucketId()) const override;
- void forEach(MutableEntryProcessor&, const document::BucketId& after = document::BucketId()) override;
uint64_t size() const override { return _values.size() - _freeValues.size(); };
void clear() override;
uint32_t childCount(const document::BucketId&) const override;
Entry upperBound(const document::BucketId& value) const override;
+ void merge(MergingProcessor&) override;
+
document::BucketId getAppropriateBucket(uint16_t minBits, const document::BucketId& bid) override;
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
-
private:
struct E {
E() : value(-1), e_0(-1), e_1(-1) {};
@@ -42,14 +42,20 @@ private:
int e_1;
};
+ template <typename EntryType>
+ void update_internal(EntryType&& new_entry);
+
BucketDatabase::Entry* find(int idx, uint8_t bitCount, const document::BucketId& bid, bool create);
bool remove(int index, uint8_t bitCount, const document::BucketId& bId);
int findFirstInOrderNodeInclusive(int index) const;
int upperBoundImpl(int index, uint8_t depth, const document::BucketId& value) const;
- template <typename EntryProcessorType>
- bool forEach(int index, EntryProcessorType& processor, uint8_t bitCount,
- const document::BucketId& lowerBound, bool& process);
+ bool forEach(int index, EntryProcessor& processor, uint8_t bitCount,
+ const document::BucketId& lowerBound, bool& process) const;
+
+ void merge_internal(int index, MergingProcessor& processor,
+ std::vector<Entry>& to_insert,
+ std::vector<document::BucketId>& to_remove);
void findParents(int index, uint8_t bitCount, const document::BucketId& bid, std::vector<Entry>& entries) const;
void findAll(int index, uint8_t bitCount, const document::BucketId& bid, std::vector<Entry>& entries) const;
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 33fa80ab484..d349164e8ed 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -4,8 +4,8 @@
#include "bucket_db_prune_elision.h"
#include "distributor.h"
#include "distributor_bucket_space.h"
-#include "simpleclusterinformation.h"
#include "distributormetricsset.h"
+#include "simpleclusterinformation.h"
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/removelocation.h>
@@ -152,26 +152,20 @@ BucketDBUpdater::removeSuperfluousBuckets(
// Remove all buckets not belonging to this distributor, or
// being on storage nodes that are no longer up.
- NodeRemover proc(
+ MergingNodeRemover proc(
oldClusterState,
*new_cluster_state,
_distributorComponent.getIndex(),
newDistribution,
- up_states);
- bucketDb.forEach(proc);
-
- for (const auto & bucket : proc.getBucketsToRemove()) {
- bucketDb.remove(bucket);
+ up_states,
+ move_to_read_only_db);
+
+ bucketDb.merge(proc);
+ // Non-owned entries vector only has contents if move_to_read_only_db is true.
+ // TODO either rewrite in terms of merge() or remove entirely
+ for (const auto& db_entry : proc.getNonOwnedEntries()) {
+ readOnlyDb.update(db_entry); // TODO Entry move support
}
- // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory...
- for (const auto& bucket : proc.getNonOwnedBuckets()) {
- if (move_to_read_only_db) {
- auto db_entry = bucketDb.get(bucket);
- readOnlyDb.update(db_entry); // TODO Entry move support
- }
- bucketDb.remove(bucket);
- }
-
}
}
@@ -217,7 +211,6 @@ BucketDBUpdater::storageDistributionChanged()
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
- _distributorComponent.getReadOnlyBucketSpaceRepo(),
_distributorComponent.getUniqueTimestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
}
@@ -273,7 +266,6 @@ BucketDBUpdater::onSetSystemState(
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
- _distributorComponent.getReadOnlyBucketSpaceRepo(),
cmd,
_outdatedNodesMap,
_distributorComponent.getUniqueTimestamp());
@@ -740,40 +732,39 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
return "";
}
-bool
-BucketDBUpdater::BucketListGenerator::process(BucketDatabase::Entry& e)
-{
- document::BucketId bucketId(e.getBucketId());
-
- const BucketCopy* copy(e->getNode(_node));
- if (copy) {
- _entries.emplace_back(bucketId, copy->getBucketInfo());
- }
- return true;
-}
-
-BucketDBUpdater::NodeRemover::NodeRemover(
+BucketDBUpdater::MergingNodeRemover::MergingNodeRemover(
const lib::ClusterState& oldState,
const lib::ClusterState& s,
uint16_t localIndex,
const lib::Distribution& distribution,
- const char* upStates)
+ const char* upStates,
+ bool track_non_owned_entries)
: _oldState(oldState),
_state(s),
+ _available_nodes(),
_nonOwnedBuckets(),
- _removedBuckets(),
+ _removed_buckets(0),
_localIndex(localIndex),
_distribution(distribution),
_upStates(upStates),
+ _track_non_owned_entries(track_non_owned_entries),
_cachedDecisionSuperbucket(UINT64_MAX),
_cachedOwned(false)
-{}
+{
+ // TODO intersection of cluster state and distribution config
+ const uint16_t storage_count = s.getNodeCount(lib::NodeType::STORAGE);
+ _available_nodes.resize(storage_count);
+ for (uint16_t i = 0; i < storage_count; ++i) {
+ if (s.getNodeState(lib::Node(lib::NodeType::STORAGE, i)).getState().oneOf(_upStates)) {
+ _available_nodes[i] = true;
+ }
+ }
+}
void
-BucketDBUpdater::NodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
+BucketDBUpdater::MergingNodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
{
LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
- LOG_BUCKET_OPERATION_NO_LOCK(bucketId, msg);
}
namespace {
@@ -786,7 +777,7 @@ uint64_t superbucket_from_id(const document::BucketId& id, uint16_t distribution
}
bool
-BucketDBUpdater::NodeRemover::distributorOwnsBucket(
+BucketDBUpdater::MergingNodeRemover::distributorOwnsBucket(
const document::BucketId& bucketId) const
{
// TODO "no distributors available" case is the same for _all_ buckets; cache once in constructor.
@@ -818,7 +809,7 @@ BucketDBUpdater::NodeRemover::distributorOwnsBucket(
}
void
-BucketDBUpdater::NodeRemover::setCopiesInEntry(
+BucketDBUpdater::MergingNodeRemover::setCopiesInEntry(
BucketDatabase::Entry& e,
const std::vector<BucketCopy>& copies) const
{
@@ -830,77 +821,73 @@ BucketDBUpdater::NodeRemover::setCopiesInEntry(
e->addNodes(copies, order);
LOG(spam, "Changed %s", e->toString().c_str());
- LOG_BUCKET_OPERATION_NO_LOCK(
- e.getBucketId(),
- vespalib::make_string("updated bucketdb entry to %s",
- e->toString().c_str()));
}
-void
-BucketDBUpdater::NodeRemover::removeEmptyBucket(const document::BucketId& bucketId)
+bool
+BucketDBUpdater::MergingNodeRemover::has_unavailable_nodes(const storage::BucketDatabase::Entry& e) const
{
- _removedBuckets.push_back(bucketId);
-
- LOG(spam,
- "After system state change %s, bucket %s now has no copies.",
- _oldState.getTextualDifference(_state).c_str(),
- bucketId.toString().c_str());
- LOG_BUCKET_OPERATION_NO_LOCK(bucketId, "bucket now has no copies");
+ const uint16_t n_nodes = e->getNodeCount();
+ for (uint16_t i = 0; i < n_nodes; i++) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (!storage_node_is_available(node_idx)) {
+ return true;
+ }
+ }
+ return false;
}
-bool
-BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e)
+BucketDatabase::MergingProcessor::Result
+BucketDBUpdater::MergingNodeRemover::merge(storage::BucketDatabase::Merger& merger)
{
- const document::BucketId& bucketId(e.getBucketId());
+ document::BucketId bucketId(merger.bucket_id());
+ LOG(spam, "Check for remove: bucket %s", bucketId.toString().c_str());
+ if (!distributorOwnsBucket(bucketId)) {
+ // TODO remove in favor of DB snapshotting
+ if (_track_non_owned_entries) {
+ _nonOwnedBuckets.emplace_back(merger.current_entry());
+ }
+ return Result::Skip;
+ }
+ auto& e = merger.current_entry();
- LOG(spam, "Check for remove: bucket %s", e.toString().c_str());
- if (e->getNodeCount() == 0) {
- removeEmptyBucket(e.getBucketId());
- return true;
+ if (e->getNodeCount() == 0) { // TODO when should this edge ever trigger?
+ return Result::Skip;
}
- if (!distributorOwnsBucket(bucketId)) {
- _nonOwnedBuckets.push_back(bucketId);
- return true;
+
+ if (!has_unavailable_nodes(e)) {
+ return Result::KeepUnchanged;
}
std::vector<BucketCopy> remainingCopies;
for (uint16_t i = 0; i < e->getNodeCount(); i++) {
- Node n(NodeType::STORAGE, e->getNodeRef(i).getNode());
-
- // TODO replace with intersection hash set of config and cluster state
- if (_state.getNodeState(n).getState().oneOf(_upStates)) {
+ const uint16_t node_idx = e->getNodeRef(i).getNode();
+ if (storage_node_is_available(node_idx)) {
remainingCopies.push_back(e->getNodeRef(i));
}
}
- if (remainingCopies.size() == e->getNodeCount()) {
- return true;
- }
-
if (remainingCopies.empty()) {
- removeEmptyBucket(bucketId);
+ ++_removed_buckets;
+ return Result::Skip;
} else {
setCopiesInEntry(e, remainingCopies);
+ return Result::Update;
}
+}
- return true;
+bool
+BucketDBUpdater::MergingNodeRemover::storage_node_is_available(uint16_t index) const noexcept
+{
+ return ((index < _available_nodes.size()) && _available_nodes[index]);
}
-BucketDBUpdater::NodeRemover::~NodeRemover()
+BucketDBUpdater::MergingNodeRemover::~MergingNodeRemover()
{
- if ( !_removedBuckets.empty()) {
- std::ostringstream ost;
- ost << "After system state change "
- << _oldState.getTextualDifference(_state) << ", we removed "
- << "buckets. Data is unavailable until node comes back up. "
- << _removedBuckets.size() << " buckets removed:";
- for (uint32_t i=0; i < 10 && i < _removedBuckets.size(); ++i) {
- ost << " " << _removedBuckets[i];
- }
- if (_removedBuckets.size() >= 10) {
- ost << " ...";
- }
- LOGBM(info, "%s", ost.str().c_str());
+ if (_removed_buckets != 0) {
+ LOGBM(info, "After cluster state change %s, %" PRIu64 " buckets no longer "
+ "have available replicas. Documents in these buckets will "
+ "be unavailable until nodes come back up",
+ _oldState.getTextualDifference(_state).c_str(), _removed_buckets);
}
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index dbcf6699bdc..663ac488ef4 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -182,54 +182,43 @@ private:
friend class BucketDBUpdater_Test;
friend class MergeOperation_Test;
- class BucketListGenerator
- {
- public:
- BucketListGenerator(uint16_t node, BucketListMerger::BucketList& entries)
- : _node(node), _entries(entries) {};
-
- bool process(BucketDatabase::Entry&);
-
- private:
- uint16_t _node;
- BucketListMerger::BucketList& _entries;
- };
-
/**
Removes all copies of buckets that are on nodes that are down.
*/
- class NodeRemover : public BucketDatabase::MutableEntryProcessor
- {
+ class MergingNodeRemover : public BucketDatabase::MergingProcessor {
public:
- NodeRemover(const lib::ClusterState& oldState,
- const lib::ClusterState& s,
- uint16_t localIndex,
- const lib::Distribution& distribution,
- const char* upStates);
- ~NodeRemover() override;
-
- bool process(BucketDatabase::Entry& e) override;
+ MergingNodeRemover(const lib::ClusterState& oldState,
+ const lib::ClusterState& s,
+ uint16_t localIndex,
+ const lib::Distribution& distribution,
+ const char* upStates,
+ bool track_non_owned_entries);
+ ~MergingNodeRemover() override;
+
+ Result merge(BucketDatabase::Merger&) override;
void logRemove(const document::BucketId& bucketId, const char* msg) const;
bool distributorOwnsBucket(const document::BucketId&) const;
- const std::vector<document::BucketId>& getBucketsToRemove() const noexcept {
- return _removedBuckets;
- }
- const std::vector<document::BucketId>& getNonOwnedBuckets() const noexcept {
+ // TODO this is temporary until explicit DB snapshotting replaces read-only DB usage
+ const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept {
return _nonOwnedBuckets;
}
private:
void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
- void removeEmptyBucket(const document::BucketId& bucketId);
+
+ bool has_unavailable_nodes(const BucketDatabase::Entry&) const;
+ bool storage_node_is_available(uint16_t index) const noexcept;
const lib::ClusterState _oldState;
const lib::ClusterState _state;
- std::vector<document::BucketId> _nonOwnedBuckets;
- std::vector<document::BucketId> _removedBuckets;
+ std::vector<bool> _available_nodes;
+ std::vector<BucketDatabase::Entry> _nonOwnedBuckets;
+ size_t _removed_buckets;
uint16_t _localIndex;
const lib::Distribution& _distribution;
const char* _upStates;
+ bool _track_non_owned_entries;
mutable uint64_t _cachedDecisionSuperbucket;
mutable bool _cachedOwned;
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 3237b4b4d71..96359bcec60 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -114,7 +114,9 @@ public:
* Checks whether a bucket needs to be split, and sends a split
* if so.
*/
- void checkBucketForSplit(document::BucketSpace bucketSpace, const BucketDatabase::Entry& e, uint8_t priority) override;
+ void checkBucketForSplit(document::BucketSpace bucketSpace,
+ const BucketDatabase::Entry& e,
+ uint8_t priority) override;
const lib::ClusterStateBundle& getClusterStateBundle() const override;
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
index 5a1ff31e2e7..5ae5d8dc3f8 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp
@@ -244,7 +244,7 @@ IdealStateManager::generateAll(const document::Bucket &bucket,
void
IdealStateManager::getBucketStatus(
BucketSpace bucketSpace,
- const BucketDatabase::Entry& entry,
+ const BucketDatabase::ConstEntryRef& entry,
NodeMaintenanceStatsTracker& statsTracker,
std::ostream& out) const
{
diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h
index 3bb6d0dd757..8566c67a51b 100644
--- a/storage/src/vespa/storage/distributor/idealstatemanager.h
+++ b/storage/src/vespa/storage/distributor/idealstatemanager.h
@@ -128,14 +128,14 @@ private:
StatusBucketVisitor(const IdealStateManager& ism, document::BucketSpace bucketSpace, std::ostream& out)
: _statsTracker(), _ism(ism), _bucketSpace(bucketSpace), _out(out) {}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
_ism.getBucketStatus(_bucketSpace, e, _statsTracker, _out);
return true;
}
};
friend class StatusBucketVisitor;
- void getBucketStatus(document::BucketSpace bucketSpace, const BucketDatabase::Entry& entry,
+ void getBucketStatus(document::BucketSpace bucketSpace, const BucketDatabase::ConstEntryRef& entry,
NodeMaintenanceStatsTracker& statsTracker, std::ostream& out) const;
void dump_bucket_space_db_status(document::BucketSpace bucket_space, std::ostream& out) const;
};
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index cd18148a662..a89dd52775b 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -483,7 +483,7 @@ struct NextEntryFinder : public BucketDatabase::EntryProcessor {
NextEntryFinder(const document::BucketId& id)
: _first(true), _last(id), _next() {}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
document::BucketId bucket(e.getBucketId());
if (_first && bucket == _last) {
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
index 216646f0345..13f9c21eed0 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -131,24 +131,30 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat
return updated;
}
-// TODO take in bucket key instead
bool
-PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const
+PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const
{
- return (_iter < _entries.size()
- && _entries[_iter].bucket_key < bucketId.toKey());
+ return ((_iter < _entries.size())
+ && (_entries[_iter].bucket_key < bucket_key));
}
-// TODO take in bucket key instead
bool
-PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const
+PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const
{
- return _iter < _entries.size() && _entries[_iter].bucket_id() == bucketId;
+ return _iter < _entries.size() && _entries[_iter].bucket_key == bucket_key;
}
-bool
-PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e)
-{
+using MergeResult = BucketDatabase::MergingProcessor::Result;
+
+MergeResult PendingBucketSpaceDbTransition::merge(BucketDatabase::Merger& merger) {
+ const uint64_t bucket_key = merger.bucket_key();
+
+ while (databaseIteratorHasPassedBucketInfoIterator(bucket_key)) {
+ LOG(spam, "Found new bucket %s, adding", _entries[_iter].bucket_id().toString().c_str());
+ addToMerger(merger, skipAllForSameBucket());
+ }
+
+ auto& e = merger.current_entry();
document::BucketId bucketId(e.getBucketId());
LOG(spam,
@@ -157,19 +163,10 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e)
bucketId.toString().c_str(),
e.getBucketInfo().toString().c_str());
- while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) {
- LOG(spam, "Found new bucket %s, adding",
- _entries[_iter].bucket_id().toString().c_str());
-
- _missingEntries.push_back(skipAllForSameBucket());
- }
-
bool updated(removeCopiesFromNodesThatWereRequested(e, bucketId));
- if (bucketInfoIteratorPointsToBucket(bucketId)) {
- LOG(spam, "Updating bucket %s",
- _entries[_iter].bucket_id().toString().c_str());
-
+ if (bucketInfoIteratorPointsToBucket(bucket_key)) {
+ LOG(spam, "Updating bucket %s", _entries[_iter].bucket_id().toString().c_str());
insertInfo(e, skipAllForSameBucket());
updated = true;
}
@@ -177,23 +174,24 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e)
if (updated) {
// Remove bucket if we've previously removed all nodes from it
if (e->getNodeCount() == 0) {
- _removedBuckets.push_back(bucketId);
+ return MergeResult::Skip;
} else {
e.getBucketInfo().updateTrusted();
+ return MergeResult::Update;
}
}
- LOG(spam,
- "After merging info from nodes [%s], bucket %s had info %s",
- requestNodesToString().c_str(),
- bucketId.toString().c_str(),
- e.getBucketInfo().toString().c_str());
+ return MergeResult::KeepUnchanged;
+}
- return true;
+void PendingBucketSpaceDbTransition::insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) {
+ while (_iter < _entries.size()) {
+ addToInserter(inserter, skipAllForSameBucket());
+ }
}
void
-PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& range)
+PendingBucketSpaceDbTransition::addToMerger(BucketDatabase::Merger& merger, const Range& range)
{
LOG(spam, "Adding new bucket %s with %d copies",
_entries[range.first].bucket_id().toString().c_str(),
@@ -204,33 +202,37 @@ PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& r
if (e->getLastGarbageCollectionTime() == 0) {
e->setLastGarbageCollectionTime(
framework::MicroSecTime(_creationTimestamp)
- .getSeconds().getTime());
+ .getSeconds().getTime());
}
e.getBucketInfo().updateTrusted();
- db.update(e);
+ merger.insert_before_current(e);
}
void
-PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
+PendingBucketSpaceDbTransition::addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range)
{
- BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
- std::sort(_entries.begin(), _entries.end());
-
- db.forEach(*this);
-
- for (uint32_t i = 0; i < _removedBuckets.size(); ++i) {
- db.remove(_removedBuckets[i]);
- }
- _removedBuckets.clear();
+ // TODO dedupe
+ LOG(spam, "Adding new bucket %s with %d copies",
+ _entries[range.first].bucket_id().toString().c_str(),
+ range.second - range.first);
- // All of the remaining were not already in the bucket database.
- while (_iter < _entries.size()) {
- _missingEntries.push_back(skipAllForSameBucket());
+ BucketDatabase::Entry e(_entries[range.first].bucket_id(), BucketInfo());
+ insertInfo(e, range);
+ if (e->getLastGarbageCollectionTime() == 0) {
+ e->setLastGarbageCollectionTime(
+ framework::MicroSecTime(_creationTimestamp)
+ .getSeconds().getTime());
}
+ e.getBucketInfo().updateTrusted();
+ inserter.insert_at_end(e);
+}
- for (uint32_t i = 0; i < _missingEntries.size(); ++i) {
- addToBucketDB(db, _missingEntries[i]);
- }
+void
+PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
+{
+ BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
+ std::sort(_entries.begin(), _entries.end());
+ db.merge(*this);
}
void
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
index 7eb2974eb52..695f80750aa 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
@@ -20,7 +20,7 @@ class DistributorBucketSpace;
* reply result within a bucket space and apply it to the distributor
* bucket database when switching to the pending cluster state.
*/
-class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProcessor
+class PendingBucketSpaceDbTransition : public BucketDatabase::MergingProcessor
{
public:
using Entry = dbtransition::Entry;
@@ -51,8 +51,8 @@ private:
bool _bucketOwnershipTransfer;
std::unordered_map<uint16_t, size_t> _rejectedRequests;
- // BucketDataBase::MutableEntryProcessor API
- bool process(BucketDatabase::Entry& e) override;
+ BucketDatabase::MergingProcessor::Result merge(BucketDatabase::Merger&) override;
+ void insert_remaining_at_end(BucketDatabase::TrailingInserter&) override;
/**
* Skips through all entries for the same bucket and returns
@@ -63,7 +63,8 @@ private:
std::vector<BucketCopy> getCopiesThatAreNewOrAltered(BucketDatabase::Entry& info, const Range& range);
void insertInfo(BucketDatabase::Entry& info, const Range& range);
- void addToBucketDB(BucketDatabase& db, const Range& range);
+ void addToMerger(BucketDatabase::Merger& merger, const Range& range);
+ void addToInserter(BucketDatabase::TrailingInserter& inserter, const Range& range);
bool nodeIsOutdated(uint16_t node) const {
return (_outdatedNodes.find(node) != _outdatedNodes.end());
@@ -75,8 +76,8 @@ private:
bool removeCopiesFromNodesThatWereRequested(BucketDatabase::Entry& e, const document::BucketId& bucketId);
// Helper methods for iterating over _entries
- bool databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const;
- bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const;
+ bool databaseIteratorHasPassedBucketInfoIterator(uint64_t bucket_key) const;
+ bool bucketInfoIteratorPointsToBucket(uint64_t bucket_key) const;
std::string requestNodesToString();
bool distributorChanged();
@@ -99,7 +100,7 @@ public:
std::shared_ptr<const ClusterInformation> clusterInfo,
const lib::ClusterState &newClusterState,
api::Timestamp creationTimestamp);
- ~PendingBucketSpaceDbTransition();
+ ~PendingBucketSpaceDbTransition() override;
// Merges all the results with the corresponding bucket database.
void mergeIntoBucketDatabase();
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 6cba7084037..8298b126690 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -28,7 +28,6 @@ PendingClusterState::PendingClusterState(
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
@@ -41,7 +40,6 @@ PendingClusterState::PendingClusterState(
_creationTimestamp(creationTimestamp),
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
- _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
_clusterStateVersion(_cmd->getClusterStateBundle().getVersion()),
_isVersionedTransition(true),
_bucketOwnershipTransfer(false),
@@ -56,7 +54,6 @@ PendingClusterState::PendingClusterState(
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
_prevClusterStateBundle(clusterInfo->getClusterStateBundle()),
@@ -66,7 +63,6 @@ PendingClusterState::PendingClusterState(
_creationTimestamp(creationTimestamp),
_sender(sender),
_bucketSpaceRepo(bucketSpaceRepo),
- _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo),
_clusterStateVersion(0),
_isVersionedTransition(false),
_bucketOwnershipTransfer(true),
@@ -76,7 +72,7 @@ PendingClusterState::PendingClusterState(
initializeBucketSpaceTransitions(true, OutdatedNodesMap());
}
-PendingClusterState::~PendingClusterState() {}
+PendingClusterState::~PendingClusterState() = default;
void
PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap)
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index cedc0573381..7aa35b32b8e 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -46,14 +46,13 @@ public:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
{
// Naked new due to private constructor
return std::unique_ptr<PendingClusterState>(new PendingClusterState(
- clock, clusterInfo, sender, bucketSpaceRepo, readOnlyBucketSpaceRepo,
+ clock, clusterInfo, sender, bucketSpaceRepo,
newStateCmd, outdatedNodesMap, creationTimestamp));
}
@@ -66,13 +65,11 @@ public:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp)
{
// Naked new due to private constructor
return std::unique_ptr<PendingClusterState>(new PendingClusterState(
- clock, clusterInfo, sender, bucketSpaceRepo,
- readOnlyBucketSpaceRepo, creationTimestamp));
+ clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp));
}
PendingClusterState(const PendingClusterState &) = delete;
@@ -167,7 +164,6 @@ private:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp);
@@ -181,7 +177,6 @@ private:
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
DistributorBucketSpaceRepo& bucketSpaceRepo,
- DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo,
api::Timestamp creationTimestamp);
struct BucketSpaceAndNode {
@@ -232,7 +227,6 @@ private:
DistributorMessageSender& _sender;
DistributorBucketSpaceRepo& _bucketSpaceRepo;
- DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo;
uint32_t _clusterStateVersion;
bool _isVersionedTransition;
bool _bucketOwnershipTransfer;