diff options
Diffstat (limited to 'storage')
3 files changed, 98 insertions, 24 deletions
diff --git a/storage/src/tests/distributor/btree_bucket_database_test.cpp b/storage/src/tests/distributor/btree_bucket_database_test.cpp index cdeacba4b58..36e700ab1ff 100644 --- a/storage/src/tests/distributor/btree_bucket_database_test.cpp +++ b/storage/src/tests/distributor/btree_bucket_database_test.cpp @@ -2,8 +2,11 @@ #include "bucketdatabasetest.h" #include <vespa/storage/bucketdb/btree_bucket_database.h> +#include <vespa/vespalib/util/count_down_latch.h> +#include <vespa/vespalib/util/time.h> #include <gtest/gtest.h> -#include <gmock/gmock.h> +#include <atomic> +#include <thread> using namespace ::testing; @@ -59,5 +62,77 @@ TEST_F(BTreeReadGuardTest, guard_observes_entries_alive_at_acquire_time) { EXPECT_EQ(entries[0].getBucketInfo(), BI(1, 1234)); } +namespace { + +BucketCopy make_bucket_copy(uint16_t node_idx, uint32_t dummy_info) { + return {0, node_idx, api::BucketInfo(dummy_info, dummy_info, dummy_info)}; +} + +BucketInfo make_bucket_info(uint32_t dummy_info) { + BucketInfo bi; + bi.addNode(make_bucket_copy(0, dummy_info), {0, 1, 2}); + bi.addNode(make_bucket_copy(1, dummy_info), {0, 1, 2}); + bi.addNode(make_bucket_copy(2, dummy_info), {0, 1, 2}); + bi.setLastGarbageCollectionTime(dummy_info); + return bi; +} + +} + +// Simple pseudo-stress test with a single writer and a single reader thread. +// The writer thread continuously updates a set of buckets with an array of bucket +// info instances and last GC timestamp that all have the same value, but the value +// itself is incremented for each write. This allows the reader to validate that it +// is observing a stable snapshot across all read values for a given bucket key. +TEST_F(BTreeReadGuardTest, multithreaded_read_guards_observe_stable_snapshots) { + constexpr uint32_t bucket_bits = 20; + constexpr uint32_t n_buckets = 1u << 10u; // Must be less than 2**bucket_bits + constexpr auto duration = 500ms; + vespalib::CountDownLatch latch(2); + std::atomic<bool> run_reader(true); + + std::thread reader_thread([&]{ + latch.countDown(); + uint32_t read_counter = 0; + while (run_reader.load(std::memory_order_relaxed)) { + auto guard = _db.acquire_read_guard(); + const uint32_t superbucket = (read_counter % n_buckets); + BucketId bucket(bucket_bits, superbucket); + const auto entries = guard->find_parents_and_self(bucket); + // Entry might not have been written yet. If so, yield to give some time. + if (entries.empty()) { + std::this_thread::yield(); + continue; + } + ++read_counter; + // Use plain assertions to avoid any implicit thread/lock interactions with gtest + assert(entries.size() == 1); + const auto& entry = entries[0]; + assert(entry.getBucketId() == bucket); + assert(entry->getNodeCount() == 3); + // We reuse the same write counter as GC timestamp and checksum/doc count/size across + // all stored bucket infos in a given bucket. + const auto expected_stable_val = entry->getLastGarbageCollectionTime(); + for (uint16_t i = 0; i < 3; ++i) { + const auto& info = entry->getNodeRef(i); + assert(info.getChecksum() == expected_stable_val); + assert(info.getDocumentCount() == expected_stable_val); + assert(info.getTotalDocumentSize() == expected_stable_val); + } + } + }); + latch.countDown(); + const auto start_time = vespalib::steady_clock::now(); + uint32_t write_counter = 0; + do { + for (uint32_t i = 0; i < n_buckets; ++i, ++write_counter) { + BucketId bucket_id(bucket_bits, i); + _db.update(BucketDatabase::Entry(bucket_id, make_bucket_info(write_counter))); + } + } while ((vespalib::steady_clock::now() - start_time) < duration); + run_reader.store(false, std::memory_order_relaxed); + reader_thread.join(); +} + } diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h index 77e71bb4f0c..ba99dad39c0 100644 --- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h @@ -7,6 +7,7 @@ #include <vespa/vespalib/btree/btree.h> #include <vespa/vespalib/btree/minmaxaggregated.h> #include <vespa/vespalib/btree/minmaxaggrcalc.h> +#include <vespa/vespalib/datastore/atomic_value_wrapper.h> namespace storage::bucketdb { @@ -38,10 +39,11 @@ namespace storage::bucketdb { template <typename DataStoreTraitsT> class GenericBTreeBucketDatabase { public: - using DataStoreType = typename DataStoreTraitsT::DataStoreType; - using ValueType = typename DataStoreTraitsT::ValueType; - using ConstValueRef = typename DataStoreTraitsT::ConstValueRef; - using GenerationHandler = vespalib::GenerationHandler; + using DataStoreType = typename DataStoreTraitsT::DataStoreType; + using ValueType = typename DataStoreTraitsT::ValueType; + using ConstValueRef = typename DataStoreTraitsT::ConstValueRef; + using GenerationHandler = vespalib::GenerationHandler; + using AtomicValueWrapper = vespalib::datastore::AtomicValueWrapper<uint64_t>; struct KeyUsedBitsMinMaxAggrCalc : vespalib::btree::MinMaxAggrCalc { constexpr static bool aggregate_over_values() { return false; } @@ -51,7 +53,8 @@ public: } }; - using BTree = vespalib::btree::BTree<uint64_t, uint64_t, + using BTree = vespalib::btree::BTree<uint64_t, + vespalib::datastore::AtomicValueWrapper<uint64_t>, vespalib::btree::MinMaxAggregated, std::less<>, vespalib::btree::BTreeDefaultTraits, diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp index 7b0d147df0d..9db36e96fc0 100644 --- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp @@ -77,16 +77,14 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::entry_from_iterator(const BTreeCon if (!iter.valid()) { return DataStoreTraitsT::make_invalid_value(); } - const auto value = iter.getData(); - std::atomic_thread_fence(std::memory_order_acquire); + const auto value = iter.getData().load_acquire(); return DataStoreTraitsT::unwrap_from_key_value(_store, iter.getKey(), value); } template <typename DataStoreTraitsT> typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ConstValueRef GenericBTreeBucketDatabase<DataStoreTraitsT>::const_value_ref_from_valid_iterator(const BTreeConstIterator& iter) const { - const auto value = iter.getData(); - std::atomic_thread_fence(std::memory_order_acquire); + const auto value = iter.getData().load_acquire(); return DataStoreTraitsT::unwrap_const_ref_from_key_value(_store, iter.getKey(), value); } @@ -304,7 +302,7 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::remove_by_raw_key(uint64_t ke if (!iter.valid()) { return false; } - const auto value = iter.getData(); + const auto value = iter.getData().load_relaxed(); // Called from writer only DataStoreTraitsT::remove_by_wrapped_value(_store, value); _tree.remove(iter); commit_tree_changes(); @@ -324,12 +322,11 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update_by_raw_key(uint64_t bu auto iter = _tree.lowerBound(bucket_key); const bool pre_existed = (iter.valid() && (iter.getKey() == bucket_key)); if (pre_existed) { - DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData()); + DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed()); // In-place update of value; does not require tree structure modification - std::atomic_thread_fence(std::memory_order_release); // Must ensure visibility when new array ref is observed - iter.writeData(new_value); + iter.getWData().store_release(new_value); // Must ensure visibility when new array ref is observed } else { - _tree.insert(iter, bucket_key, new_value); + _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value)); } commit_tree_changes(); // TODO does publishing a new root imply an implicit memory fence? return pre_existed; @@ -359,18 +356,17 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::process_update(const BucketId& buc ValueType entry(found ? entry_from_iterator(iter) : processor.create_entry(bucket)); bool keep = processor.process_entry(entry); if (found) { - DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData()); + DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed()); // Called from writer only if (keep) { const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry); - std::atomic_thread_fence(std::memory_order_release); - iter.writeData(new_value); + iter.getWData().store_release(new_value); } else { _tree.remove(iter); } } else { if (keep) { const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry); - _tree.insert(iter, bucket_key, new_value); + _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value)); } } commit_tree_changes(); @@ -491,7 +487,7 @@ struct BTreeBuilderMerger final : Merger<typename DataStoreTraitsT::ValueType> { const uint64_t bucket_key = bucket_id.toKey(); assert(bucket_key < _current_key); const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e); - _builder.insert(bucket_key, new_value); + _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value)); } void update_iteration_state(uint64_t key, uint64_t value) { @@ -520,7 +516,7 @@ struct BTreeTrailingInserter final : TrailingInserter<typename DataStoreTraitsT: void insert_at_end(const BucketId& bucket_id, const ValueType& e) override { const uint64_t bucket_key = bucket_id.toKey(); const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e); - _builder.insert(bucket_key, new_value); + _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value)); } }; @@ -533,19 +529,19 @@ void GenericBTreeBucketDatabase<DataStoreTraitsT>::merge(MergingProcessor<ValueT // TODO for_each instead? for (auto iter = _tree.begin(); iter.valid(); ++iter) { const uint64_t key = iter.getKey(); - const uint64_t value = iter.getData(); + const uint64_t value = iter.getData().load_relaxed(); // Only called from writer merger.update_iteration_state(key, value); auto result = proc.merge(merger); if (result == MergingProcessor<ValueType>::Result::KeepUnchanged) { - builder.insert(key, value); // Reuse array store ref with no changes + builder.insert(key, AtomicValueWrapper(value)); // Reuse array store ref with no changes } else if (result == MergingProcessor<ValueType>::Result::Update) { assert(merger._valid_cached_value); // Must actually have been touched assert(merger._cached_value.valid()); DataStoreTraitsT::remove_by_wrapped_value(_store, value); const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, merger._cached_value); - builder.insert(key, new_value); + builder.insert(key, AtomicValueWrapper(new_value)); } else if (result == MergingProcessor<ValueType>::Result::Skip) { DataStoreTraitsT::remove_by_wrapped_value(_store, value); } else { |