summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-08 14:18:53 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-10-09 14:54:55 +0000
commit8ab839a2b541548a3b762f69da3d5e803905984e (patch)
tree89b1d496863ed1ba69cc1d1f118f26f6c93db777 /storage/src
parent0c41661e9c77aa39d26d67970628ffdd64853971 (diff)
Rewrite read-only DB updating to use the linear merge-based API
Avoids O(n) explicit inserts in favor of a bulk load.
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp46
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h1
2 files changed, 42 insertions, 5 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 227165a0911..715e6279769 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -32,7 +32,9 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
_sender(sender),
_transitionTimer(_distributorComponent.getClock()),
+ _stale_reads_enabled(false),
_active_distribution_contexts(),
+ _explicit_transition_read_guard(),
_distribution_context_mutex()
{
for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
@@ -159,6 +161,43 @@ BucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx,
sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>());
}
+namespace {
+
+class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor {
+ using NewEntries = std::vector<BucketDatabase::Entry>;
+ NewEntries::const_iterator _current;
+ const NewEntries::const_iterator _last;
+public:
+ explicit ReadOnlyDbMergingInserter(const NewEntries& new_entries)
+ : _current(new_entries.cbegin()),
+ _last(new_entries.cend())
+ {}
+
+ Result merge(BucketDatabase::Merger& m) override {
+ const auto key = m.bucket_key();
+ while (_current != _last && (_current->getBucketId().toKey() < key)) {
+ m.insert_before_current(*_current);
+ ++_current;
+ }
+ if (_current != _last && _current->getBucketId().toKey() == key) {
+ // If we encounter a bucket that already exists, replace value wholesale.
+ // Don't try to cleverly merge replicas, as the values we currently hold
+ // in the read-only DB may be stale.
+ m.current_entry() = *_current;
+ return Result::Update;
+ }
+ return Result::KeepUnchanged;
+ }
+
+ void insert_remaining_at_end(BucketDatabase::TrailingInserter& inserter) override {
+ for (; _current != _last; ++_current) {
+ inserter.insert_at_end(*_current);
+ }
+ }
+};
+
+}
+
void
BucketDBUpdater::removeSuperfluousBuckets(
const lib::ClusterStateBundle& newState,
@@ -196,10 +235,9 @@ BucketDBUpdater::removeSuperfluousBuckets(
move_to_read_only_db);
bucketDb.merge(proc);
- // Non-owned entries vector only has contents if move_to_read_only_db is true.
- // TODO either rewrite in terms of merge() or remove entirely
- for (const auto& db_entry : proc.getNonOwnedEntries()) {
- readOnlyDb.update(db_entry); // TODO Entry move support
+ if (move_to_read_only_db) {
+ ReadOnlyDbMergingInserter read_only_merger(proc.getNonOwnedEntries());
+ readOnlyDb.merge(read_only_merger);
}
maybe_inject_simulated_db_pruning_delay();
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 86ceab14486..b756c5ca7bb 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -219,7 +219,6 @@ private:
void logRemove(const document::BucketId& bucketId, const char* msg) const;
bool distributorOwnsBucket(const document::BucketId&) const;
- // TODO this is temporary until explicit DB snapshotting replaces read-only DB usage
const std::vector<BucketDatabase::Entry>& getNonOwnedEntries() const noexcept {
return _nonOwnedBuckets;
}