diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-12-03 16:35:44 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-03 16:35:44 +0100 |
commit | 7d9135e0cd54ec4c120783276e19cfbd313edb0f (patch) | |
tree | 23d39bc2e4e6c7dbdf95eb2534617476e65275b4 /storage | |
parent | 8d824993bfe9573d3ea5336c48971b146d8b9b58 (diff) | |
parent | 77b2ac65da5a0601201ff31712817dc9c94c99c8 (diff) |
Merge pull request #15632 from vespa-engine/toregge/faster-distributor-bucket-db-update
Add process_update member function to BucketDatabase.
Diffstat (limited to 'storage')
7 files changed, 159 insertions, 32 deletions
diff --git a/storage/src/tests/distributor/bucketdatabasetest.cpp b/storage/src/tests/distributor/bucketdatabasetest.cpp index 7f9825d32fc..6aadc6c01e1 100644 --- a/storage/src/tests/distributor/bucketdatabasetest.cpp +++ b/storage/src/tests/distributor/bucketdatabasetest.cpp @@ -625,6 +625,22 @@ struct InsertAtEndMergingProcessor : BucketDatabase::MergingProcessor { } }; +struct EntryUpdateProcessor : BucketDatabase::EntryUpdateProcessor { + using Entry = BucketDatabase::Entry; + std::function<bool(Entry&)> _func; + EntryUpdateProcessor(std::function<bool(Entry&)> func) + : _func(std::move(func)) + { + } + ~EntryUpdateProcessor() override = default; + BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override { + return BucketDatabase::Entry(bucket, BucketInfo()); + } + bool process_entry(Entry& entry) const override { + return(_func(entry)); + } +}; + } TEST_P(BucketDatabaseTest, merge_keep_unchanged_result_does_not_alter_db_contents) { @@ -704,6 +720,25 @@ TEST_P(BucketDatabaseTest, merge_can_insert_entry_at_end) { "node(idx=3,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); } +TEST_P(BucketDatabaseTest, process_update) +{ + using Entry = BucketDatabase::Entry; + document::BucketId bucket(16, 2); + EXPECT_EQ(dump_db(db()), ""); + auto update_entry = [](Entry& entry) { entry->addNode(BC(0), toVector<uint16_t>(0)); return true; }; + EntryUpdateProcessor update_processor(update_entry); + db().process_update(bucket, update_processor, false); + EXPECT_EQ(dump_db(db()), ""); + db().process_update(bucket, update_processor, true); + EXPECT_EQ(dump_db(db()), + "BucketId(0x4000000000000002) : " + "node(idx=0,crc=0x0,docs=0/0,bytes=1/1,trusted=false,active=false,ready=false)\n"); + auto remove_entry = [](Entry&) noexcept { return false; }; + EntryUpdateProcessor remove_processor(remove_entry); + db().process_update(bucket, remove_processor, false); + EXPECT_EQ(dump_db(db()), ""); +} + TEST_P(BucketDatabaseTest, DISABLED_benchmark_const_iteration) { constexpr uint32_t superbuckets = 1u << 16u; constexpr uint32_t sub_buckets = 14; diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp index d547830d765..be02db70d9e 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp @@ -139,6 +139,12 @@ void BTreeBucketDatabase::update(const Entry& newEntry) { _impl->update(newEntry.getBucketId(), newEntry); } +void +BTreeBucketDatabase::process_update(const document::BucketId& bucket, EntryUpdateProcessor &processor, bool create_if_nonexisting) +{ + _impl->process_update(bucket, processor, create_if_nonexisting); +} + // TODO need snapshot read with guarding // FIXME semantics of for-each in judy and bit tree DBs differ, former expects lbound, latter ubound..! // FIXME but bit-tree code says "lowerBound" in impl and "after" in declaration??? diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h index 2821e360792..e0955651209 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h @@ -43,6 +43,7 @@ public: void getAll(const document::BucketId& bucket, std::vector<Entry>& entries) const override; void update(const Entry& newEntry) override; + void process_update(const document::BucketId& bucket, EntryUpdateProcessor &processor, bool create_if_nonexisting) override; void forEach(EntryProcessor&, const document::BucketId& after) const override; Entry upperBound(const document::BucketId& value) const override; uint64_t size() const override; diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.h b/storage/src/vespa/storage/bucketdb/bucketdatabase.h index cd94a698358..9ca231a26d2 100644 --- a/storage/src/vespa/storage/bucketdb/bucketdatabase.h +++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.h @@ -57,6 +57,21 @@ public: virtual bool process(const ConstEntryRef& e) = 0; }; + /* + * Interface class used by process_update() for updating an entry + * with a single call to the bucket database. + */ + struct EntryUpdateProcessor { + virtual ~EntryUpdateProcessor() = default; + virtual Entry create_entry(const document::BucketId& bucket) const = 0; + /* + * Modifies entry. + * returns true if modified entry should be kept. + * returns false if entry should be removed. + */ + virtual bool process_entry(Entry &entry) const = 0; + }; + ~BucketDatabase() override = default; virtual Entry get(const document::BucketId& bucket) const = 0; @@ -82,6 +97,8 @@ public: */ virtual void update(const Entry& newEntry) = 0; + virtual void process_update(const document::BucketId& bucket, EntryUpdateProcessor &processor, bool create_if_nonexisting) = 0; + virtual void forEach( EntryProcessor&, const document::BucketId& after = document::BucketId()) const = 0; diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h index ea6d47f26bb..f9d9f4f7861 100644 --- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h @@ -99,6 +99,8 @@ public: // Returns true if bucket pre-existed in the DB, false otherwise bool update(const document::BucketId& bucket, const ValueType& new_entry); bool update_by_raw_key(uint64_t bucket_key, const ValueType& new_entry); + template <typename EntryUpdateProcessor> + void process_update(const document::BucketId &bucket, EntryUpdateProcessor& processor, bool create_if_nonexisting); template <typename IterValueExtractor, typename Func> void find_parents_and_self(const document::BucketId& bucket, Func func) const; diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp index 839efae4c1b..aafb4286d7d 100644 --- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp @@ -342,6 +342,40 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update(const BucketId& bucket return update_by_raw_key(bucket.toKey(), new_entry); } +template <typename DataStoreTraitsT> +template <typename EntryUpdateProcessor> +void +GenericBTreeBucketDatabase<DataStoreTraitsT>::process_update(const BucketId& bucket, EntryUpdateProcessor& processor, bool create_if_nonexisting) +{ + uint64_t bucket_key = bucket.toKey(); + auto iter = _tree.lowerBound(bucket_key); + bool found = true; + if (!iter.valid() || bucket_key < iter.getKey()) { + if (!create_if_nonexisting) { + return; + } + found = false; + } + ValueType entry(found ? entry_from_iterator(iter) : processor.create_entry(bucket)); + bool keep = processor.process_entry(entry); + if (found) { + DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData()); + if (keep) { + const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry); + std::atomic_thread_fence(std::memory_order_release); + iter.writeData(new_value); + } else { + _tree.remove(iter); + } + } else { + if (keep) { + const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry); + _tree.insert(iter, bucket_key, new_value); + } + } + commit_tree_changes(); +} + /* * Returns the bucket ID which, based on the buckets already existing in the DB, * is the most specific location in the tree in which it should reside. This may diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index 41a99092174..dde5281a15f 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -128,6 +128,62 @@ DistributorComponent::enumerateUnavailableNodes( } } +namespace { + +/** + * Helper class to update entry in bucket database when bucket copies from nodes have changed. + */ +class UpdateBucketDatabaseProcessor : public BucketDatabase::EntryUpdateProcessor { + const framework::Clock& _clock; + const std::vector<BucketCopy>& _changed_nodes; + std::vector<uint16_t> _ideal_nodes; + bool _reset_trusted; +public: + UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted); + virtual ~UpdateBucketDatabaseProcessor(); + virtual BucketDatabase::Entry create_entry(const document::BucketId& bucket) const override; + virtual bool process_entry(BucketDatabase::Entry &entry) const override; +}; + +UpdateBucketDatabaseProcessor::UpdateBucketDatabaseProcessor(const framework::Clock& clock, const std::vector<BucketCopy>& changed_nodes, std::vector<uint16_t> ideal_nodes, bool reset_trusted) + : BucketDatabase::EntryUpdateProcessor(), + _clock(clock), + _changed_nodes(changed_nodes), + _ideal_nodes(std::move(ideal_nodes)), + _reset_trusted(reset_trusted) +{ +} + +UpdateBucketDatabaseProcessor::~UpdateBucketDatabaseProcessor() = default; + +BucketDatabase::Entry +UpdateBucketDatabaseProcessor::create_entry(const document::BucketId &bucket) const +{ + return BucketDatabase::Entry(bucket, BucketInfo()); +} + +bool +UpdateBucketDatabaseProcessor::process_entry(BucketDatabase::Entry &entry) const +{ + // 0 implies bucket was just added. Since we don't know if any other + // distributor has run GC on it, we just have to assume this and set the + // timestamp to the current time to avoid duplicate work. + if (entry->getLastGarbageCollectionTime() == 0) { + entry->setLastGarbageCollectionTime(_clock.getTimeInSeconds().getTime()); + } + entry->addNodes(_changed_nodes, _ideal_nodes); + if (_reset_trusted) { + entry->resetTrusted(); + } + if (entry->getNodeCount() == 0) { + LOG(warning, "all nodes in changedNodes set (size %zu) are down, removing dbentry", _changed_nodes.size()); + return false; // remove entry + } + return true; // keep entry +} + +} + void DistributorComponent::updateBucketDatabase( const document::Bucket &bucket, @@ -136,7 +192,6 @@ DistributorComponent::updateBucketDatabase( { auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); assert(!(bucket.getBucketId() == document::BucketId())); - BucketDatabase::Entry dbentry = bucketSpace.getBucketDatabase().get(bucket.getBucketId()); BucketOwnership ownership(bucketSpace.check_ownership_in_pending_and_current_state(bucket.getBucketId())); if (!ownership.isOwned()) { @@ -148,22 +203,6 @@ DistributorComponent::updateBucketDatabase( return; } - if (!dbentry.valid()) { - if (updateFlags & DatabaseUpdate::CREATE_IF_NONEXISTING) { - dbentry = BucketDatabase::Entry(bucket.getBucketId(), BucketInfo()); - } else { - return; - } - } - - // 0 implies bucket was just added. Since we don't know if any other - // distributor has run GC on it, we just have to assume this and set the - // timestamp to the current time to avoid duplicate work. - if (dbentry->getLastGarbageCollectionTime() == 0) { - dbentry->setLastGarbageCollectionTime( - getClock().getTimeInSeconds().getTime()); - } - // Ensure that we're not trying to bring any zombie copies into the // bucket database (i.e. copies on nodes that are actually unavailable). const auto& available_nodes = bucketSpace.get_available_nodes(); @@ -176,27 +215,20 @@ DistributorComponent::updateBucketDatabase( } // Optimize for common case where we don't have to create a new // bucket copy vector - if (!found_down_node) { - dbentry->addNodes(changedNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId())); - } else { - std::vector<BucketCopy> upNodes; + std::vector<BucketCopy> up_nodes; + if (found_down_node) { + up_nodes.reserve(changedNodes.size()); for (uint32_t i = 0; i < changedNodes.size(); ++i) { const BucketCopy& copy(changedNodes[i]); if (copy.getNode() < available_nodes.size() && available_nodes[copy.getNode()]) { - upNodes.emplace_back(copy); + up_nodes.emplace_back(copy); } } - dbentry->addNodes(upNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId())); - } - if (updateFlags & DatabaseUpdate::RESET_TRUSTED) { - dbentry->resetTrusted(); } - if (dbentry->getNodeCount() == 0) { - LOG(warning, "all nodes in changedNodes set (size %zu) are down, removing dbentry", changedNodes.size()); - bucketSpace.getBucketDatabase().remove(bucket.getBucketId()); - return; - } - bucketSpace.getBucketDatabase().update(dbentry); + + UpdateBucketDatabaseProcessor processor(getClock(), found_down_node ? up_nodes : changedNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId()), (updateFlags & DatabaseUpdate::RESET_TRUSTED) != 0); + + bucketSpace.getBucketDatabase().process_update(bucket.getBucketId(), processor, (updateFlags & DatabaseUpdate::CREATE_IF_NONEXISTING) != 0); } void |