diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-05-15 13:42:48 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-05-15 13:42:48 +0000 |
commit | ca13052b3631149d40905b21737d70256a8243c0 (patch) | |
tree | 0615b91b3e0e93171efa5f25393dcd3a57b48d07 /storage | |
parent | 20f644c175b73230373cec00f8c80ffbf22193e6 (diff) |
Avoid recomputing bucket keys during sorting step
Buckets are sorted in increasing key order before being merged into
the database. Instead of repeatedly calling `toKey()` on entries, just
store the key verbatim.
Add a (by default disabled) benchmarking test for this case.
With the bucket key change, running this locally brings total merge time
for 917K buckets down from 2 seconds to 1.3 seconds.
Diffstat (limited to 'storage')
3 files changed, 68 insertions, 18 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 11c9f18b3ef..9d063c82c69 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -17,6 +17,7 @@ #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/util/benchmark_timer.h> #include <sstream> #include <iomanip> @@ -2577,4 +2578,52 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_trans EXPECT_EQ(size_t(0), _sender.replies.size()); } +TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { + // Need to trigger an initial edge to complete first bucket scan + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("storage:1 distributor:2"), + messageCount(1), 0)); + _sender.clear(); + + lib::ClusterState state("storage:1 distributor:1"); + setSystemState(state); + + constexpr uint32_t superbuckets = 1u << 16u; + constexpr uint32_t sub_buckets = 14; + constexpr uint32_t n_buckets = superbuckets * sub_buckets; + + vespalib::BenchmarkTimer timer(1.0); + + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); + for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { + ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO); + const auto& req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]); + + auto sreply = std::make_shared<RequestBucketInfoReply>(req); + sreply->setAddress(storageAddress(0)); + auto& vec = sreply->getBucketInfo(); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + for (uint32_t sb = 0; sb < superbuckets; ++sb) { + for (uint64_t i = 0; i < sub_buckets; ++i) { + document::BucketId bucket(48, (i << 32ULL) | sb); + vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10,1,1))); + } + } + } + + // Global space has no buckets but will serve as a trigger for merging + // buckets into the DB. This lets us measure the overhead of just this part. + if (req.getBucketSpace() == FixedBucketSpaces::global_space()) { + timer.before(); + } + getBucketDBUpdater().onRequestBucketInfoReply(sreply); + if (req.getBucketSpace() == FixedBucketSpaces::global_space()) { + timer.after(); + fprintf(stderr, "Took %g seconds to merge %u buckets into DB\n", timer.min_time(), n_buckets); + } + } + + EXPECT_EQ(size_t(n_buckets), mutable_default_db().size()); + EXPECT_EQ(size_t(0), mutable_global_db().size()); +} + } diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index 4208e8b33f9..216646f0345 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -56,8 +56,8 @@ PendingBucketSpaceDbTransition::skipAllForSameBucket() { Range r(_iter, _iter); - for (document::BucketId& bid = _entries[_iter].bucketId; - _iter < _entries.size() && _entries[_iter].bucketId == bid; + for (uint64_t bkey = _entries[_iter].bucket_key; + (_iter < _entries.size()) && (_entries[_iter].bucket_key == bkey); ++_iter) { } @@ -91,14 +91,9 @@ PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Ra std::vector<uint16_t> order( dist.getIdealStorageNodes( _newClusterState, - _entries[range.first].bucketId, + _entries[range.first].bucket_id(), _clusterInfo->getStorageUpStates())); info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER); - - LOG_BUCKET_OPERATION_NO_LOCK( - _entries[range.first].bucketId, - vespalib::make_string("insertInfo: %s", - info.toString().c_str())); } std::string @@ -136,17 +131,19 @@ PendingBucketSpaceDbTransition::removeCopiesFromNodesThatWereRequested(BucketDat return updated; } +// TODO take in bucket key instead bool PendingBucketSpaceDbTransition::databaseIteratorHasPassedBucketInfoIterator(const document::BucketId& bucketId) const { return (_iter < _entries.size() - && _entries[_iter].bucketId.toKey() < bucketId.toKey()); + && _entries[_iter].bucket_key < bucketId.toKey()); } +// TODO take in bucket key instead bool PendingBucketSpaceDbTransition::bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const { - return _iter < _entries.size() && _entries[_iter].bucketId == bucketId; + return _iter < _entries.size() && _entries[_iter].bucket_id() == bucketId; } bool @@ -162,7 +159,7 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e) while (databaseIteratorHasPassedBucketInfoIterator(bucketId)) { LOG(spam, "Found new bucket %s, adding", - _entries[_iter].bucketId.toString().c_str()); + _entries[_iter].bucket_id().toString().c_str()); _missingEntries.push_back(skipAllForSameBucket()); } @@ -171,7 +168,7 @@ PendingBucketSpaceDbTransition::process(BucketDatabase::Entry& e) if (bucketInfoIteratorPointsToBucket(bucketId)) { LOG(spam, "Updating bucket %s", - _entries[_iter].bucketId.toString().c_str()); + _entries[_iter].bucket_id().toString().c_str()); insertInfo(e, skipAllForSameBucket()); updated = true; @@ -199,10 +196,10 @@ void PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& range) { LOG(spam, "Adding new bucket %s with %d copies", - _entries[range.first].bucketId.toString().c_str(), + _entries[range.first].bucket_id().toString().c_str(), range.second - range.first); - BucketDatabase::Entry e(_entries[range.first].bucketId, BucketInfo()); + BucketDatabase::Entry e(_entries[range.first].bucket_id(), BucketInfo()); insertInfo(e, range); if (e->getLastGarbageCollectionTime() == 0) { e->setLastGarbageCollectionTime( diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h index ad6e0695be1..124ee1bdf45 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition_entry.h @@ -9,15 +9,19 @@ namespace storage::distributor::dbtransition { struct Entry { Entry(const document::BucketId& bid, const BucketCopy& copy_) - : bucketId(bid), + : bucket_key(bid.toKey()), copy(copy_) {} - document::BucketId bucketId; + uint64_t bucket_key; BucketCopy copy; - bool operator<(const Entry& other) const { - return bucketId.toKey() < other.bucketId.toKey(); + document::BucketId bucket_id() const noexcept { + return document::BucketId(document::BucketId::keyToBucketId(bucket_key)); + } + + bool operator<(const Entry& other) const noexcept { + return bucket_key < other.bucket_key; } }; |