diff options
5 files changed, 173 insertions, 24 deletions
diff --git a/storage/src/tests/distributor/btree_bucket_database_test.cpp b/storage/src/tests/distributor/btree_bucket_database_test.cpp index cdeacba4b58..36e700ab1ff 100644 --- a/storage/src/tests/distributor/btree_bucket_database_test.cpp +++ b/storage/src/tests/distributor/btree_bucket_database_test.cpp @@ -2,8 +2,11 @@ #include "bucketdatabasetest.h" #include <vespa/storage/bucketdb/btree_bucket_database.h> +#include <vespa/vespalib/util/count_down_latch.h> +#include <vespa/vespalib/util/time.h> #include <gtest/gtest.h> -#include <gmock/gmock.h> +#include <atomic> +#include <thread> using namespace ::testing; @@ -59,5 +62,77 @@ TEST_F(BTreeReadGuardTest, guard_observes_entries_alive_at_acquire_time) { EXPECT_EQ(entries[0].getBucketInfo(), BI(1, 1234)); } +namespace { + +BucketCopy make_bucket_copy(uint16_t node_idx, uint32_t dummy_info) { + return {0, node_idx, api::BucketInfo(dummy_info, dummy_info, dummy_info)}; +} + +BucketInfo make_bucket_info(uint32_t dummy_info) { + BucketInfo bi; + bi.addNode(make_bucket_copy(0, dummy_info), {0, 1, 2}); + bi.addNode(make_bucket_copy(1, dummy_info), {0, 1, 2}); + bi.addNode(make_bucket_copy(2, dummy_info), {0, 1, 2}); + bi.setLastGarbageCollectionTime(dummy_info); + return bi; +} + +} + +// Simple pseudo-stress test with a single writer and a single reader thread. +// The writer thread continuously updates a set of buckets with an array of bucket +// info instances and last GC timestamp that all have the same value, but the value +// itself is incremented for each write. This allows the reader to validate that it +// is observing a stable snapshot across all read values for a given bucket key. +TEST_F(BTreeReadGuardTest, multithreaded_read_guards_observe_stable_snapshots) { + constexpr uint32_t bucket_bits = 20; + constexpr uint32_t n_buckets = 1u << 10u; // Must be less than 2**bucket_bits + constexpr auto duration = 500ms; + vespalib::CountDownLatch latch(2); + std::atomic<bool> run_reader(true); + + std::thread reader_thread([&]{ + latch.countDown(); + uint32_t read_counter = 0; + while (run_reader.load(std::memory_order_relaxed)) { + auto guard = _db.acquire_read_guard(); + const uint32_t superbucket = (read_counter % n_buckets); + BucketId bucket(bucket_bits, superbucket); + const auto entries = guard->find_parents_and_self(bucket); + // Entry might not have been written yet. If so, yield to give some time. + if (entries.empty()) { + std::this_thread::yield(); + continue; + } + ++read_counter; + // Use plain assertions to avoid any implicit thread/lock interactions with gtest + assert(entries.size() == 1); + const auto& entry = entries[0]; + assert(entry.getBucketId() == bucket); + assert(entry->getNodeCount() == 3); + // We reuse the same write counter as GC timestamp and checksum/doc count/size across + // all stored bucket infos in a given bucket. + const auto expected_stable_val = entry->getLastGarbageCollectionTime(); + for (uint16_t i = 0; i < 3; ++i) { + const auto& info = entry->getNodeRef(i); + assert(info.getChecksum() == expected_stable_val); + assert(info.getDocumentCount() == expected_stable_val); + assert(info.getTotalDocumentSize() == expected_stable_val); + } + } + }); + latch.countDown(); + const auto start_time = vespalib::steady_clock::now(); + uint32_t write_counter = 0; + do { + for (uint32_t i = 0; i < n_buckets; ++i, ++write_counter) { + BucketId bucket_id(bucket_bits, i); + _db.update(BucketDatabase::Entry(bucket_id, make_bucket_info(write_counter))); + } + } while ((vespalib::steady_clock::now() - start_time) < duration); + run_reader.store(false, std::memory_order_relaxed); + reader_thread.join(); +} + } diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h index 77e71bb4f0c..256a54b19b3 100644 --- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h @@ -7,6 +7,7 @@ #include <vespa/vespalib/btree/btree.h> #include <vespa/vespalib/btree/minmaxaggregated.h> #include <vespa/vespalib/btree/minmaxaggrcalc.h> +#include <vespa/vespalib/datastore/atomic_value_wrapper.h> namespace storage::bucketdb { @@ -38,10 +39,11 @@ namespace storage::bucketdb { template <typename DataStoreTraitsT> class GenericBTreeBucketDatabase { public: - using DataStoreType = typename DataStoreTraitsT::DataStoreType; - using ValueType = typename DataStoreTraitsT::ValueType; - using ConstValueRef = typename DataStoreTraitsT::ConstValueRef; - using GenerationHandler = vespalib::GenerationHandler; + using DataStoreType = typename DataStoreTraitsT::DataStoreType; + using ValueType = typename DataStoreTraitsT::ValueType; + using ConstValueRef = typename DataStoreTraitsT::ConstValueRef; + using GenerationHandler = vespalib::GenerationHandler; + using AtomicValueWrapper = vespalib::datastore::AtomicValueWrapper<uint64_t>; struct KeyUsedBitsMinMaxAggrCalc : vespalib::btree::MinMaxAggrCalc { constexpr static bool aggregate_over_values() { return false; } @@ -51,7 +53,18 @@ public: } }; - using BTree = vespalib::btree::BTree<uint64_t, uint64_t, + // Rationale for using an atomic u64 value type: + // It is expected that the set of bucket keys is much less frequently updated than their + // corresponding values. Since values must be stable for concurrent readers, all written values + // are _immutable_ once created. Consequently, every single mutation of a bucket will replace its + // value with a new (immutable) entry. For distributors, this replaces an entire array of values. + // Instead of constantly thawing and freezing subtrees for each bucket update, we atomically + // replace the value to point to a new u32 EntryRef mangled together with an u32 timestamp. + // This means updates that don't change the set of buckets leave the B-tree node structure + // itself entirely untouched. + // This requires great care to be taken when writing and reading to ensure memory visibility. + using BTree = vespalib::btree::BTree<uint64_t, + vespalib::datastore::AtomicValueWrapper<uint64_t>, vespalib::btree::MinMaxAggregated, std::less<>, vespalib::btree::BTreeDefaultTraits, diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp index 7b0d147df0d..9db36e96fc0 100644 --- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp @@ -77,16 +77,14 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::entry_from_iterator(const BTreeCon if (!iter.valid()) { return DataStoreTraitsT::make_invalid_value(); } - const auto value = iter.getData(); - std::atomic_thread_fence(std::memory_order_acquire); + const auto value = iter.getData().load_acquire(); return DataStoreTraitsT::unwrap_from_key_value(_store, iter.getKey(), value); } template <typename DataStoreTraitsT> typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ConstValueRef GenericBTreeBucketDatabase<DataStoreTraitsT>::const_value_ref_from_valid_iterator(const BTreeConstIterator& iter) const { - const auto value = iter.getData(); - std::atomic_thread_fence(std::memory_order_acquire); + const auto value = iter.getData().load_acquire(); return DataStoreTraitsT::unwrap_const_ref_from_key_value(_store, iter.getKey(), value); } @@ -304,7 +302,7 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::remove_by_raw_key(uint64_t ke if (!iter.valid()) { return false; } - const auto value = iter.getData(); + const auto value = iter.getData().load_relaxed(); // Called from writer only DataStoreTraitsT::remove_by_wrapped_value(_store, value); _tree.remove(iter); commit_tree_changes(); @@ -324,12 +322,11 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update_by_raw_key(uint64_t bu auto iter = _tree.lowerBound(bucket_key); const bool pre_existed = (iter.valid() && (iter.getKey() == bucket_key)); if (pre_existed) { - DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData()); + DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed()); // In-place update of value; does not require tree structure modification - std::atomic_thread_fence(std::memory_order_release); // Must ensure visibility when new array ref is observed - iter.writeData(new_value); + iter.getWData().store_release(new_value); // Must ensure visibility when new array ref is observed } else { - _tree.insert(iter, bucket_key, new_value); + _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value)); } commit_tree_changes(); // TODO does publishing a new root imply an implicit memory fence? return pre_existed; @@ -359,18 +356,17 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::process_update(const BucketId& buc ValueType entry(found ? entry_from_iterator(iter) : processor.create_entry(bucket)); bool keep = processor.process_entry(entry); if (found) { - DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData()); + DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed()); // Called from writer only if (keep) { const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry); - std::atomic_thread_fence(std::memory_order_release); - iter.writeData(new_value); + iter.getWData().store_release(new_value); } else { _tree.remove(iter); } } else { if (keep) { const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry); - _tree.insert(iter, bucket_key, new_value); + _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value)); } } commit_tree_changes(); @@ -491,7 +487,7 @@ struct BTreeBuilderMerger final : Merger<typename DataStoreTraitsT::ValueType> { const uint64_t bucket_key = bucket_id.toKey(); assert(bucket_key < _current_key); const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e); - _builder.insert(bucket_key, new_value); + _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value)); } void update_iteration_state(uint64_t key, uint64_t value) { @@ -520,7 +516,7 @@ struct BTreeTrailingInserter final : TrailingInserter<typename DataStoreTraitsT: void insert_at_end(const BucketId& bucket_id, const ValueType& e) override { const uint64_t bucket_key = bucket_id.toKey(); const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e); - _builder.insert(bucket_key, new_value); + _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value)); } }; @@ -533,19 +529,19 @@ void GenericBTreeBucketDatabase<DataStoreTraitsT>::merge(MergingProcessor<ValueT // TODO for_each instead? for (auto iter = _tree.begin(); iter.valid(); ++iter) { const uint64_t key = iter.getKey(); - const uint64_t value = iter.getData(); + const uint64_t value = iter.getData().load_relaxed(); // Only called from writer merger.update_iteration_state(key, value); auto result = proc.merge(merger); if (result == MergingProcessor<ValueType>::Result::KeepUnchanged) { - builder.insert(key, value); // Reuse array store ref with no changes + builder.insert(key, AtomicValueWrapper(value)); // Reuse array store ref with no changes } else if (result == MergingProcessor<ValueType>::Result::Update) { assert(merger._valid_cached_value); // Must actually have been touched assert(merger._cached_value.valid()); DataStoreTraitsT::remove_by_wrapped_value(_store, value); const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, merger._cached_value); - builder.insert(key, new_value); + builder.insert(key, AtomicValueWrapper(new_value)); } else if (result == MergingProcessor<ValueType>::Result::Skip) { DataStoreTraitsT::remove_by_wrapped_value(_store, value); } else { diff --git a/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp b/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp index 05323d1329a..9f98ba05493 100644 --- a/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp +++ b/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp @@ -4,6 +4,7 @@ #include "btreerootbase.h" #include "btreeroot.h" #include "btreenodeallocator.h" +#include <vespa/vespalib/datastore/atomic_value_wrapper.h> #include <vespa/vespalib/datastore/datastore.h> #include <vespa/vespalib/datastore/buffer_type.hpp> @@ -43,6 +44,7 @@ VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint32_t, uint32_t, NoAggrega VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint32_t, BTreeNoLeafData, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS); VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint32_t, int32_t , MinMaxAggregated, BTreeDefaultTraits::LEAF_SLOTS); VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint64_t, uint64_t , MinMaxAggregated, BTreeDefaultTraits::LEAF_SLOTS); +VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint64_t, AtomicValueWrapper<uint64_t>, MinMaxAggregated, BTreeDefaultTraits::LEAF_SLOTS); VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(AtomicEntryRef, AtomicEntryRef, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS); VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(AtomicEntryRef, BTreeNoLeafData, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS); VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(EntryRef, BTreeNoLeafData, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS); diff --git a/vespalib/src/vespa/vespalib/datastore/atomic_value_wrapper.h b/vespalib/src/vespa/vespalib/datastore/atomic_value_wrapper.h new file mode 100644 index 00000000000..3ee871be9b0 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/atomic_value_wrapper.h @@ -0,0 +1,63 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <atomic> + +namespace vespalib::datastore { + +/** + * Copyable atomic wrapper for a primitive value that offers value store and load + * functionality with explicit memory ordering constraints. Intended to be used for + * non-EntryRef values where atomicity and transitive visibility is a requirement. + * + * Copying always happens with relaxed ordering, as it expects that the copier has + * already loaded the source AtomicValueWrapper with an ordering that is appropriate + * for observing any transitive memory dependencies. + * + * This wrapper is intentionally not implicitly convertible to/from values of the + * underlying primitive type. + * + * Note: use AtomicEntryRef instead if you're wrapping an EntryRef directly. + */ +template <typename T> +class AtomicValueWrapper { + static_assert(std::atomic<T>::is_always_lock_free); + + std::atomic<T> _value; +public: + constexpr AtomicValueWrapper() noexcept : _value() {} + constexpr explicit AtomicValueWrapper(T value) noexcept : _value(value) {} + AtomicValueWrapper(const AtomicValueWrapper& rhs) noexcept + : _value(rhs._value.load(std::memory_order_relaxed)) + {} + AtomicValueWrapper(AtomicValueWrapper&& rhs) noexcept + : _value(rhs._value.load(std::memory_order_relaxed)) + {} + AtomicValueWrapper& operator=(const AtomicValueWrapper& rhs) noexcept { + _value.store(rhs._value.load(std::memory_order_relaxed), + std::memory_order_relaxed); + return *this; + } + void store_release(T value) noexcept { + _value.store(value, std::memory_order_release); + } + void store_relaxed(T value) noexcept { + _value.store(value, std::memory_order_relaxed); + } + [[nodiscard]] T load_acquire() const noexcept { + return _value.load(std::memory_order_acquire); + } + [[nodiscard]] T load_relaxed() const noexcept { + return _value.load(std::memory_order_relaxed); + } + + [[nodiscard]] bool operator==(const AtomicValueWrapper& rhs) const noexcept { + return (load_relaxed() == rhs.load_relaxed()); + } + [[nodiscard]] bool operator!=(const AtomicValueWrapper& rhs) const noexcept { + return !(*this == rhs); + } +}; + +} |