summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-03-11 12:17:26 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-03-11 16:28:19 +0000
commit3e033275ec2f1deda6aca43b98949c9297ead2ea (patch)
tree78410573eb744821d5007194eb2240250d1d8b87 /storage
parent445f2b8f9b0a80fc6488c7095d5cd9ba8df51b43 (diff)
Make B-tree bucket database values atomic to ensure well-defined access
Existing implementation already used explicit acquire/release fences to ensure visibility from writer to concurrent readers, but the values written/read were not of an atomic type and thus _technically_ considered a data race. This commit adds an AtomicValueWrapper wrapper to vespalib which looks and acts much like the existing AtomicEntryRef, but for primitive types that are not related to EntryRefs. The bucket DB B-tree primitive u64 value type is replaced with an atomic wrapper and explicit memory fences are replaced with release stores and acquire loads on the atomic values themselves to ensure they form correct pairs between writer and readers.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/btree_bucket_database_test.cpp77
-rw-r--r--storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h13
-rw-r--r--storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp32
3 files changed, 98 insertions, 24 deletions
diff --git a/storage/src/tests/distributor/btree_bucket_database_test.cpp b/storage/src/tests/distributor/btree_bucket_database_test.cpp
index cdeacba4b58..36e700ab1ff 100644
--- a/storage/src/tests/distributor/btree_bucket_database_test.cpp
+++ b/storage/src/tests/distributor/btree_bucket_database_test.cpp
@@ -2,8 +2,11 @@
#include "bucketdatabasetest.h"
#include <vespa/storage/bucketdb/btree_bucket_database.h>
+#include <vespa/vespalib/util/count_down_latch.h>
+#include <vespa/vespalib/util/time.h>
#include <gtest/gtest.h>
-#include <gmock/gmock.h>
+#include <atomic>
+#include <thread>
using namespace ::testing;
@@ -59,5 +62,77 @@ TEST_F(BTreeReadGuardTest, guard_observes_entries_alive_at_acquire_time) {
EXPECT_EQ(entries[0].getBucketInfo(), BI(1, 1234));
}
+namespace {
+
+BucketCopy make_bucket_copy(uint16_t node_idx, uint32_t dummy_info) {
+ return {0, node_idx, api::BucketInfo(dummy_info, dummy_info, dummy_info)};
+}
+
+BucketInfo make_bucket_info(uint32_t dummy_info) {
+ BucketInfo bi;
+ bi.addNode(make_bucket_copy(0, dummy_info), {0, 1, 2});
+ bi.addNode(make_bucket_copy(1, dummy_info), {0, 1, 2});
+ bi.addNode(make_bucket_copy(2, dummy_info), {0, 1, 2});
+ bi.setLastGarbageCollectionTime(dummy_info);
+ return bi;
+}
+
+}
+
+// Simple pseudo-stress test with a single writer and a single reader thread.
+// The writer thread continuously updates a set of buckets with an array of bucket
+// info instances and last GC timestamp that all have the same value, but the value
+// itself is incremented for each write. This allows the reader to validate that it
+// is observing a stable snapshot across all read values for a given bucket key.
+TEST_F(BTreeReadGuardTest, multithreaded_read_guards_observe_stable_snapshots) {
+ constexpr uint32_t bucket_bits = 20;
+ constexpr uint32_t n_buckets = 1u << 10u; // Must be less than 2**bucket_bits
+ constexpr auto duration = 500ms;
+ vespalib::CountDownLatch latch(2);
+ std::atomic<bool> run_reader(true);
+
+ std::thread reader_thread([&]{
+ latch.countDown();
+ uint32_t read_counter = 0;
+ while (run_reader.load(std::memory_order_relaxed)) {
+ auto guard = _db.acquire_read_guard();
+ const uint32_t superbucket = (read_counter % n_buckets);
+ BucketId bucket(bucket_bits, superbucket);
+ const auto entries = guard->find_parents_and_self(bucket);
+ // Entry might not have been written yet. If so, yield to give some time.
+ if (entries.empty()) {
+ std::this_thread::yield();
+ continue;
+ }
+ ++read_counter;
+ // Use plain assertions to avoid any implicit thread/lock interactions with gtest
+ assert(entries.size() == 1);
+ const auto& entry = entries[0];
+ assert(entry.getBucketId() == bucket);
+ assert(entry->getNodeCount() == 3);
+ // We reuse the same write counter as GC timestamp and checksum/doc count/size across
+ // all stored bucket infos in a given bucket.
+ const auto expected_stable_val = entry->getLastGarbageCollectionTime();
+ for (uint16_t i = 0; i < 3; ++i) {
+ const auto& info = entry->getNodeRef(i);
+ assert(info.getChecksum() == expected_stable_val);
+ assert(info.getDocumentCount() == expected_stable_val);
+ assert(info.getTotalDocumentSize() == expected_stable_val);
+ }
+ }
+ });
+ latch.countDown();
+ const auto start_time = vespalib::steady_clock::now();
+ uint32_t write_counter = 0;
+ do {
+ for (uint32_t i = 0; i < n_buckets; ++i, ++write_counter) {
+ BucketId bucket_id(bucket_bits, i);
+ _db.update(BucketDatabase::Entry(bucket_id, make_bucket_info(write_counter)));
+ }
+ } while ((vespalib::steady_clock::now() - start_time) < duration);
+ run_reader.store(false, std::memory_order_relaxed);
+ reader_thread.join();
+}
+
}
diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
index 77e71bb4f0c..ba99dad39c0 100644
--- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
+++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
@@ -7,6 +7,7 @@
#include <vespa/vespalib/btree/btree.h>
#include <vespa/vespalib/btree/minmaxaggregated.h>
#include <vespa/vespalib/btree/minmaxaggrcalc.h>
+#include <vespa/vespalib/datastore/atomic_value_wrapper.h>
namespace storage::bucketdb {
@@ -38,10 +39,11 @@ namespace storage::bucketdb {
template <typename DataStoreTraitsT>
class GenericBTreeBucketDatabase {
public:
- using DataStoreType = typename DataStoreTraitsT::DataStoreType;
- using ValueType = typename DataStoreTraitsT::ValueType;
- using ConstValueRef = typename DataStoreTraitsT::ConstValueRef;
- using GenerationHandler = vespalib::GenerationHandler;
+ using DataStoreType = typename DataStoreTraitsT::DataStoreType;
+ using ValueType = typename DataStoreTraitsT::ValueType;
+ using ConstValueRef = typename DataStoreTraitsT::ConstValueRef;
+ using GenerationHandler = vespalib::GenerationHandler;
+ using AtomicValueWrapper = vespalib::datastore::AtomicValueWrapper<uint64_t>;
struct KeyUsedBitsMinMaxAggrCalc : vespalib::btree::MinMaxAggrCalc {
constexpr static bool aggregate_over_values() { return false; }
@@ -51,7 +53,8 @@ public:
}
};
- using BTree = vespalib::btree::BTree<uint64_t, uint64_t,
+ using BTree = vespalib::btree::BTree<uint64_t,
+ vespalib::datastore::AtomicValueWrapper<uint64_t>,
vespalib::btree::MinMaxAggregated,
std::less<>,
vespalib::btree::BTreeDefaultTraits,
diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
index 7b0d147df0d..9db36e96fc0 100644
--- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
+++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
@@ -77,16 +77,14 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::entry_from_iterator(const BTreeCon
if (!iter.valid()) {
return DataStoreTraitsT::make_invalid_value();
}
- const auto value = iter.getData();
- std::atomic_thread_fence(std::memory_order_acquire);
+ const auto value = iter.getData().load_acquire();
return DataStoreTraitsT::unwrap_from_key_value(_store, iter.getKey(), value);
}
template <typename DataStoreTraitsT>
typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ConstValueRef
GenericBTreeBucketDatabase<DataStoreTraitsT>::const_value_ref_from_valid_iterator(const BTreeConstIterator& iter) const {
- const auto value = iter.getData();
- std::atomic_thread_fence(std::memory_order_acquire);
+ const auto value = iter.getData().load_acquire();
return DataStoreTraitsT::unwrap_const_ref_from_key_value(_store, iter.getKey(), value);
}
@@ -304,7 +302,7 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::remove_by_raw_key(uint64_t ke
if (!iter.valid()) {
return false;
}
- const auto value = iter.getData();
+ const auto value = iter.getData().load_relaxed(); // Called from writer only
DataStoreTraitsT::remove_by_wrapped_value(_store, value);
_tree.remove(iter);
commit_tree_changes();
@@ -324,12 +322,11 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update_by_raw_key(uint64_t bu
auto iter = _tree.lowerBound(bucket_key);
const bool pre_existed = (iter.valid() && (iter.getKey() == bucket_key));
if (pre_existed) {
- DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData());
+ DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed());
// In-place update of value; does not require tree structure modification
- std::atomic_thread_fence(std::memory_order_release); // Must ensure visibility when new array ref is observed
- iter.writeData(new_value);
+ iter.getWData().store_release(new_value); // Must ensure visibility when new array ref is observed
} else {
- _tree.insert(iter, bucket_key, new_value);
+ _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value));
}
commit_tree_changes(); // TODO does publishing a new root imply an implicit memory fence?
return pre_existed;
@@ -359,18 +356,17 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::process_update(const BucketId& buc
ValueType entry(found ? entry_from_iterator(iter) : processor.create_entry(bucket));
bool keep = processor.process_entry(entry);
if (found) {
- DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData());
+ DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed()); // Called from writer only
if (keep) {
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry);
- std::atomic_thread_fence(std::memory_order_release);
- iter.writeData(new_value);
+ iter.getWData().store_release(new_value);
} else {
_tree.remove(iter);
}
} else {
if (keep) {
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry);
- _tree.insert(iter, bucket_key, new_value);
+ _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value));
}
}
commit_tree_changes();
@@ -491,7 +487,7 @@ struct BTreeBuilderMerger final : Merger<typename DataStoreTraitsT::ValueType> {
const uint64_t bucket_key = bucket_id.toKey();
assert(bucket_key < _current_key);
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e);
- _builder.insert(bucket_key, new_value);
+ _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value));
}
void update_iteration_state(uint64_t key, uint64_t value) {
@@ -520,7 +516,7 @@ struct BTreeTrailingInserter final : TrailingInserter<typename DataStoreTraitsT:
void insert_at_end(const BucketId& bucket_id, const ValueType& e) override {
const uint64_t bucket_key = bucket_id.toKey();
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e);
- _builder.insert(bucket_key, new_value);
+ _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value));
}
};
@@ -533,19 +529,19 @@ void GenericBTreeBucketDatabase<DataStoreTraitsT>::merge(MergingProcessor<ValueT
// TODO for_each instead?
for (auto iter = _tree.begin(); iter.valid(); ++iter) {
const uint64_t key = iter.getKey();
- const uint64_t value = iter.getData();
+ const uint64_t value = iter.getData().load_relaxed(); // Only called from writer
merger.update_iteration_state(key, value);
auto result = proc.merge(merger);
if (result == MergingProcessor<ValueType>::Result::KeepUnchanged) {
- builder.insert(key, value); // Reuse array store ref with no changes
+ builder.insert(key, AtomicValueWrapper(value)); // Reuse array store ref with no changes
} else if (result == MergingProcessor<ValueType>::Result::Update) {
assert(merger._valid_cached_value); // Must actually have been touched
assert(merger._cached_value.valid());
DataStoreTraitsT::remove_by_wrapped_value(_store, value);
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, merger._cached_value);
- builder.insert(key, new_value);
+ builder.insert(key, AtomicValueWrapper(new_value));
} else if (result == MergingProcessor<ValueType>::Result::Skip) {
DataStoreTraitsT::remove_by_wrapped_value(_store, value);
} else {