summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-12-03 16:35:44 +0100
committerGitHub <noreply@github.com>2020-12-03 16:35:44 +0100
commit7d9135e0cd54ec4c120783276e19cfbd313edb0f (patch)
tree23d39bc2e4e6c7dbdf95eb2534617476e65275b4 /storage
parent8d824993bfe9573d3ea5336c48971b146d8b9b58 (diff)
parent77b2ac65da5a0601201ff31712817dc9c94c99c8 (diff)
Merge pull request #15632 from vespa-engine/toregge/faster-distributor-bucket-db-update
Add process_update member function to BucketDatabase.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/bucketdatabasetest.cpp35
-rw-r--r--storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp6
-rw-r--r--storage/src/vespa/storage/bucketdb/btree_bucket_database.h1
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketdatabase.h17
-rw-r--r--storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h2
-rw-r--r--storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp34
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.cpp96
7 files changed, 159 insertions, 32 deletions
diff --git a/storage/src/tests/distributor/bucketdatabasetest.cpp b/storage/src/tests/distributor/bucketdatabasetest.cpp
index 7f9825d32fc..6aadc6c01e1 100644
--- a/storage/src/tests/distributor/bucketdatabasetest.cpp
+++ b/storage/src/tests/distributor/bucketdatabasetest.cpp
@@ -625,6 +625,22 @@ struct InsertAtEndMergingProcessor : BucketDatabase::MergingProcessor {
}
};
+struct EntryUpdateProcessor : BucketDatabase::EntryUpdateProcessor {
+ using Entry = BucketDatabase::Entry;
+ std::function<bool(Entry&)> _func;
+ EntryUpdateProcessor(std::function<bool(Entry&)> func)
+ : _func(std::move(func))
+ {
+ }
+ ~EntryUpdateProcessor() override = default;
+ BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override {
+ return BucketDatabase::Entry(bucket, BucketInfo());
+ }
+ bool process_entry(Entry& entry) const override {
+ return(_func(entry));
+ }
+};
+
}
TEST_P(BucketDatabaseTest, merge_keep_unchanged_result_does_not_alter_db_contents) {
@@ -704,6 +720,25 @@ TEST_P(BucketDatabaseTest, merge_can_insert_entry_at_end) {
"node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
}
+TEST_P(BucketDatabaseTest, process_update)
+{
+ using Entry = BucketDatabase::Entry;
+ document::BucketId bucket(16, 2);
+ EXPECT_EQ(dump_db(db()), "");
+ auto update_entry = [](Entry& entry) { entry->addNode(BC(0), toVector<uint16_t>(0)); return true; };
+ EntryUpdateProcessor update_processor(update_entry);
+ db().process_update(bucket, update_processor, false);
+ EXPECT_EQ(dump_db(db()), "");
+ db().process_update(bucket, update_processor, true);
+ EXPECT_EQ(dump_db(db()),
+ "BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n");
+ auto remove_entry = [](Entry&) noexcept { return false; };
+ EntryUpdateProcessor remove_processor(remove_entry);
+ db().process_update(bucket, remove_processor, false);
+ EXPECT_EQ(dump_db(db()), "");
+}
+
TEST_P(BucketDatabaseTest, DISABLED_benchmark_const_iteration) {
constexpr uint32_t superbuckets = 1u << 16u;
constexpr uint32_t sub_buckets = 14;
diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
index d547830d765..be02db70d9e 100644
--- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
+++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp
@@ -139,6 +139,12 @@ void BTreeBucketDatabase::update(const Entry& newEntry) {
_impl->update(newEntry.getBucketId(), newEntry);
}
+void
+BTreeBucketDatabase::process_update(const document::BucketId& bucket, EntryUpdateProcessor &processor, bool create_if_nonexisting)
+{
+ _impl->process_update(bucket, processor, create_if_nonexisting);
+}
+
// TODO need snapshot read with guarding
// FIXME semantics of for-each in judy and bit tree DBs differ, former expects lbound, latter ubound..!
// FIXME but bit-tree code says "lowerBound" in impl and "after" in declaration???
diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
index 2821e360792..e0955651209 100644
--- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
+++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h
@@ -43,6 +43,7 @@ public:
void getAll(const document::BucketId& bucket,
std::vector<Entry>& entries) const override;
void update(const Entry& newEntry) override;
+ void process_update(const document::BucketId& bucket, EntryUpdateProcessor &processor, bool create_if_nonexisting) override;
void forEach(EntryProcessor&, const document::BucketId& after) const override;
Entry upperBound(const document::BucketId& value) const override;
uint64_t size() const override;
diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.h b/storage/src/vespa/storage/bucketdb/bucketdatabase.h
index cd94a698358..9ca231a26d2 100644
--- a/storage/src/vespa/storage/bucketdb/bucketdatabase.h
+++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.h
@@ -57,6 +57,21 @@ public:
virtual bool process(const ConstEntryRef& e) = 0;
};
+ /*
+ * Interface class used by process_update() for updating an entry
+ * with a single call to the bucket database.
+ */
+ struct EntryUpdateProcessor {
+ virtual ~EntryUpdateProcessor() = default;
+ virtual Entry create_entry(const document::BucketId& bucket) const = 0;
+ /*
+ * Modifies entry.
+ * returns true if modified entry should be kept.
+ * returns false if entry should be removed.
+ */
+ virtual bool process_entry(Entry &entry) const = 0;
+ };
+
~BucketDatabase() override = default;
virtual Entry get(const document::BucketId& bucket) const = 0;
@@ -82,6 +97,8 @@ public:
*/
virtual void update(const Entry& newEntry) = 0;
+ virtual void process_update(const document::BucketId& bucket, EntryUpdateProcessor &processor, bool create_if_nonexisting) = 0;
+
virtual void forEach(
EntryProcessor&,
const document::BucketId& after = document::BucketId()) const = 0;
diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
index ea6d47f26bb..f9d9f4f7861 100644
--- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
+++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
@@ -99,6 +99,8 @@ public:
// Returns true if bucket pre-existed in the DB, false otherwise
bool update(const document::BucketId& bucket, const ValueType& new_entry);
bool update_by_raw_key(uint64_t bucket_key, const ValueType& new_entry);
+ template <typename EntryUpdateProcessor>
+ void process_update(const document::BucketId &bucket, EntryUpdateProcessor& processor, bool create_if_nonexisting);
template <typename IterValueExtractor, typename Func>
void find_parents_and_self(const document::BucketId& bucket, Func func) const;
diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
index 839efae4c1b..aafb4286d7d 100644
--- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
+++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
@@ -342,6 +342,40 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update(const BucketId& bucket
return update_by_raw_key(bucket.toKey(), new_entry);
}
+template <typename DataStoreTraitsT>
+template <typename EntryUpdateProcessor>
+void
+GenericBTreeBucketDatabase<DataStoreTraitsT>::process_update(const BucketId& bucket, EntryUpdateProcessor& processor, bool create_if_nonexisting)
+{
+ uint64_t bucket_key = bucket.toKey();
+ auto iter = _tree.lowerBound(bucket_key);
+ bool found = true;
+ if (!iter.valid() || bucket_key < iter.getKey()) {
+ if (!create_if_nonexisting) {
+ return;
+ }
+ found = false;
+ }
+ ValueType entry(found ? entry_from_iterator(iter) : processor.create_entry(bucket));
+ bool keep = processor.process_entry(entry);
+ if (found) {
+ DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData());
+ if (keep) {
+ const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry);
+ std::atomic_thread_fence(std::memory_order_release);
+ iter.writeData(new_value);
+ } else {
+ _tree.remove(iter);
+ }
+ } else {
+ if (keep) {
+ const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry);
+ _tree.insert(iter, bucket_key, new_value);
+ }
+ }
+ commit_tree_changes();
+}
+
/*
* Returns the bucket ID which, based on the buckets already existing in the DB,
* is the most specific location in the tree in which it should reside. This may
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
index 41a99092174..dde5281a15f 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
@@ -128,6 +128,62 @@ DistributorComponent::enumerateUnavailableNodes(
}
}
+namespace {
+
+/**
+ * Helper class to update entry in bucket database when bucket copies from nodes have changed.
+ */
+class UpdateBucketDatabaseProcessor : public BucketDatabase::EntryUpdateProcessor {
+ const framework::Clock& _clock;
+ const std::vector<BucketCopy>& _changed_nodes;
+ std::vector<uint16_t> _ideal_nodes;
+ bool _reset_trusted;
+public:
+ UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted);
+ virtual ~UpdateBucketDatabaseProcessor();
+ virtual BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override;
+ virtual bool process_entry(BucketDatabase::Entry &entry) const override;
+};
+
+UpdateBucketDatabaseProcessor::UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted)
+ : BucketDatabase::EntryUpdateProcessor(),
+ _clock(clock),
+ _changed_nodes(changed_nodes),
+ _ideal_nodes(std::move(ideal_nodes)),
+ _reset_trusted(reset_trusted)
+{
+}
+
+UpdateBucketDatabaseProcessor::~UpdateBucketDatabaseProcessor() = default;
+
+BucketDatabase::Entry
+UpdateBucketDatabaseProcessor::create_entry(const document::BucketId &bucket) const
+{
+ return BucketDatabase::Entry(bucket, BucketInfo());
+}
+
+bool
+UpdateBucketDatabaseProcessor::process_entry(BucketDatabase::Entry &entry) const
+{
+ // 0 implies bucket was just added. Since we don't know if any other
+ // distributor has run GC on it, we just have to assume this and set the
+ // timestamp to the current time to avoid duplicate work.
+ if (entry->getLastGarbageCollectionTime() == 0) {
+ entry->setLastGarbageCollectionTime(_clock.getTimeInSeconds().getTime());
+ }
+ entry->addNodes(_changed_nodes, _ideal_nodes);
+ if (_reset_trusted) {
+ entry->resetTrusted();
+ }
+ if (entry->getNodeCount() == 0) {
+ LOG(warning, "all nodes in changedNodes set (size %zu) are down, removing dbentry", _changed_nodes.size());
+ return false; // remove entry
+ }
+ return true; // keep entry
+}
+
+}
+
void
DistributorComponent::updateBucketDatabase(
const document::Bucket &bucket,
@@ -136,7 +192,6 @@ DistributorComponent::updateBucketDatabase(
{
auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
assert(!(bucket.getBucketId() == document::BucketId()));
- BucketDatabase::Entry dbentry = bucketSpace.getBucketDatabase().get(bucket.getBucketId());
BucketOwnership ownership(bucketSpace.check_ownership_in_pending_and_current_state(bucket.getBucketId()));
if (!ownership.isOwned()) {
@@ -148,22 +203,6 @@ DistributorComponent::updateBucketDatabase(
return;
}
- if (!dbentry.valid()) {
- if (updateFlags & DatabaseUpdate::CREATE_IF_NONEXISTING) {
- dbentry = BucketDatabase::Entry(bucket.getBucketId(), BucketInfo());
- } else {
- return;
- }
- }
-
- // 0 implies bucket was just added. Since we don't know if any other
- // distributor has run GC on it, we just have to assume this and set the
- // timestamp to the current time to avoid duplicate work.
- if (dbentry->getLastGarbageCollectionTime() == 0) {
- dbentry->setLastGarbageCollectionTime(
- getClock().getTimeInSeconds().getTime());
- }
-
// Ensure that we're not trying to bring any zombie copies into the
// bucket database (i.e. copies on nodes that are actually unavailable).
const auto& available_nodes = bucketSpace.get_available_nodes();
@@ -176,27 +215,20 @@ DistributorComponent::updateBucketDatabase(
}
// Optimize for common case where we don't have to create a new
// bucket copy vector
- if (!found_down_node) {
- dbentry->addNodes(changedNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId()));
- } else {
- std::vector<BucketCopy> upNodes;
+ std::vector<BucketCopy> up_nodes;
+ if (found_down_node) {
+ up_nodes.reserve(changedNodes.size());
for (uint32_t i = 0; i < changedNodes.size(); ++i) {
const BucketCopy& copy(changedNodes[i]);
if (copy.getNode() < available_nodes.size() && available_nodes[copy.getNode()]) {
- upNodes.emplace_back(copy);
+ up_nodes.emplace_back(copy);
}
}
- dbentry->addNodes(upNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId()));
- }
- if (updateFlags & DatabaseUpdate::RESET_TRUSTED) {
- dbentry->resetTrusted();
}
- if (dbentry->getNodeCount() == 0) {
- LOG(warning, "all nodes in changedNodes set (size %zu) are down, removing dbentry", changedNodes.size());
- bucketSpace.getBucketDatabase().remove(bucket.getBucketId());
- return;
- }
- bucketSpace.getBucketDatabase().update(dbentry);
+
+ UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changedNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId()), (updateFlags & DatabaseUpdate::RESET_TRUSTED) != 0);
+
+ bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (updateFlags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0);
}
void