summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/btree_bucket_database_test.cpp77
-rw-r--r--storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h23
-rw-r--r--storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp32
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreenodestore.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/datastore/atomic_value_wrapper.h63
5 files changed, 173 insertions, 24 deletions
diff --git a/storage/src/tests/distributor/btree_bucket_database_test.cpp b/storage/src/tests/distributor/btree_bucket_database_test.cpp
index cdeacba4b58..36e700ab1ff 100644
--- a/storage/src/tests/distributor/btree_bucket_database_test.cpp
+++ b/storage/src/tests/distributor/btree_bucket_database_test.cpp
@@ -2,8 +2,11 @@
#include "bucketdatabasetest.h"
#include <vespa/storage/bucketdb/btree_bucket_database.h>
+#include <vespa/vespalib/util/count_down_latch.h>
+#include <vespa/vespalib/util/time.h>
#include <gtest/gtest.h>
-#include <gmock/gmock.h>
+#include <atomic>
+#include <thread>
using namespace ::testing;
@@ -59,5 +62,77 @@ TEST_F(BTreeReadGuardTest, guard_observes_entries_alive_at_acquire_time) {
EXPECT_EQ(entries[0].getBucketInfo(), BI(1, 1234));
}
+namespace {
+
+BucketCopy make_bucket_copy(uint16_t node_idx, uint32_t dummy_info) {
+ return {0, node_idx, api::BucketInfo(dummy_info, dummy_info, dummy_info)};
+}
+
+BucketInfo make_bucket_info(uint32_t dummy_info) {
+ BucketInfo bi;
+ bi.addNode(make_bucket_copy(0, dummy_info), {0, 1, 2});
+ bi.addNode(make_bucket_copy(1, dummy_info), {0, 1, 2});
+ bi.addNode(make_bucket_copy(2, dummy_info), {0, 1, 2});
+ bi.setLastGarbageCollectionTime(dummy_info);
+ return bi;
+}
+
+}
+
+// Simple pseudo-stress test with a single writer and a single reader thread.
+// The writer thread continuously updates a set of buckets with an array of bucket
+// info instances and last GC timestamp that all have the same value, but the value
+// itself is incremented for each write. This allows the reader to validate that it
+// is observing a stable snapshot across all read values for a given bucket key.
+TEST_F(BTreeReadGuardTest, multithreaded_read_guards_observe_stable_snapshots) {
+ constexpr uint32_t bucket_bits = 20;
+ constexpr uint32_t n_buckets = 1u << 10u; // Must be less than 2**bucket_bits
+ constexpr auto duration = 500ms;
+ vespalib::CountDownLatch latch(2);
+ std::atomic<bool> run_reader(true);
+
+ std::thread reader_thread([&]{
+ latch.countDown();
+ uint32_t read_counter = 0;
+ while (run_reader.load(std::memory_order_relaxed)) {
+ auto guard = _db.acquire_read_guard();
+ const uint32_t superbucket = (read_counter % n_buckets);
+ BucketId bucket(bucket_bits, superbucket);
+ const auto entries = guard->find_parents_and_self(bucket);
+ // Entry might not have been written yet. If so, yield to give some time.
+ if (entries.empty()) {
+ std::this_thread::yield();
+ continue;
+ }
+ ++read_counter;
+ // Use plain assertions to avoid any implicit thread/lock interactions with gtest
+ assert(entries.size() == 1);
+ const auto& entry = entries[0];
+ assert(entry.getBucketId() == bucket);
+ assert(entry->getNodeCount() == 3);
+ // We reuse the same write counter as GC timestamp and checksum/doc count/size across
+ // all stored bucket infos in a given bucket.
+ const auto expected_stable_val = entry->getLastGarbageCollectionTime();
+ for (uint16_t i = 0; i < 3; ++i) {
+ const auto& info = entry->getNodeRef(i);
+ assert(info.getChecksum() == expected_stable_val);
+ assert(info.getDocumentCount() == expected_stable_val);
+ assert(info.getTotalDocumentSize() == expected_stable_val);
+ }
+ }
+ });
+ latch.countDown();
+ const auto start_time = vespalib::steady_clock::now();
+ uint32_t write_counter = 0;
+ do {
+ for (uint32_t i = 0; i < n_buckets; ++i, ++write_counter) {
+ BucketId bucket_id(bucket_bits, i);
+ _db.update(BucketDatabase::Entry(bucket_id, make_bucket_info(write_counter)));
+ }
+ } while ((vespalib::steady_clock::now() - start_time) < duration);
+ run_reader.store(false, std::memory_order_relaxed);
+ reader_thread.join();
+}
+
}
diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
index 77e71bb4f0c..256a54b19b3 100644
--- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
+++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h
@@ -7,6 +7,7 @@
#include <vespa/vespalib/btree/btree.h>
#include <vespa/vespalib/btree/minmaxaggregated.h>
#include <vespa/vespalib/btree/minmaxaggrcalc.h>
+#include <vespa/vespalib/datastore/atomic_value_wrapper.h>
namespace storage::bucketdb {
@@ -38,10 +39,11 @@ namespace storage::bucketdb {
template <typename DataStoreTraitsT>
class GenericBTreeBucketDatabase {
public:
- using DataStoreType = typename DataStoreTraitsT::DataStoreType;
- using ValueType = typename DataStoreTraitsT::ValueType;
- using ConstValueRef = typename DataStoreTraitsT::ConstValueRef;
- using GenerationHandler = vespalib::GenerationHandler;
+ using DataStoreType = typename DataStoreTraitsT::DataStoreType;
+ using ValueType = typename DataStoreTraitsT::ValueType;
+ using ConstValueRef = typename DataStoreTraitsT::ConstValueRef;
+ using GenerationHandler = vespalib::GenerationHandler;
+ using AtomicValueWrapper = vespalib::datastore::AtomicValueWrapper<uint64_t>;
struct KeyUsedBitsMinMaxAggrCalc : vespalib::btree::MinMaxAggrCalc {
constexpr static bool aggregate_over_values() { return false; }
@@ -51,7 +53,18 @@ public:
}
};
- using BTree = vespalib::btree::BTree<uint64_t, uint64_t,
+ // Rationale for using an atomic u64 value type:
+ // It is expected that the set of bucket keys is much less frequently updated than their
+ // corresponding values. Since values must be stable for concurrent readers, all written values
+ // are _immutable_ once created. Consequently, every single mutation of a bucket will replace its
+ // value with a new (immutable) entry. For distributors, this replaces an entire array of values.
+ // Instead of constantly thawing and freezing subtrees for each bucket update, we atomically
+ // replace the value to point to a new u32 EntryRef mangled together with an u32 timestamp.
+ // This means updates that don't change the set of buckets leave the B-tree node structure
+ // itself entirely untouched.
+ // This requires great care to be taken when writing and reading to ensure memory visibility.
+ using BTree = vespalib::btree::BTree<uint64_t,
+ vespalib::datastore::AtomicValueWrapper<uint64_t>,
vespalib::btree::MinMaxAggregated,
std::less<>,
vespalib::btree::BTreeDefaultTraits,
diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
index 7b0d147df0d..9db36e96fc0 100644
--- a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
+++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp
@@ -77,16 +77,14 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::entry_from_iterator(const BTreeCon
if (!iter.valid()) {
return DataStoreTraitsT::make_invalid_value();
}
- const auto value = iter.getData();
- std::atomic_thread_fence(std::memory_order_acquire);
+ const auto value = iter.getData().load_acquire();
return DataStoreTraitsT::unwrap_from_key_value(_store, iter.getKey(), value);
}
template <typename DataStoreTraitsT>
typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ConstValueRef
GenericBTreeBucketDatabase<DataStoreTraitsT>::const_value_ref_from_valid_iterator(const BTreeConstIterator& iter) const {
- const auto value = iter.getData();
- std::atomic_thread_fence(std::memory_order_acquire);
+ const auto value = iter.getData().load_acquire();
return DataStoreTraitsT::unwrap_const_ref_from_key_value(_store, iter.getKey(), value);
}
@@ -304,7 +302,7 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::remove_by_raw_key(uint64_t ke
if (!iter.valid()) {
return false;
}
- const auto value = iter.getData();
+ const auto value = iter.getData().load_relaxed(); // Called from writer only
DataStoreTraitsT::remove_by_wrapped_value(_store, value);
_tree.remove(iter);
commit_tree_changes();
@@ -324,12 +322,11 @@ bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update_by_raw_key(uint64_t bu
auto iter = _tree.lowerBound(bucket_key);
const bool pre_existed = (iter.valid() && (iter.getKey() == bucket_key));
if (pre_existed) {
- DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData());
+ DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed());
// In-place update of value; does not require tree structure modification
- std::atomic_thread_fence(std::memory_order_release); // Must ensure visibility when new array ref is observed
- iter.writeData(new_value);
+ iter.getWData().store_release(new_value); // Must ensure visibility when new array ref is observed
} else {
- _tree.insert(iter, bucket_key, new_value);
+ _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value));
}
commit_tree_changes(); // TODO does publishing a new root imply an implicit memory fence?
return pre_existed;
@@ -359,18 +356,17 @@ GenericBTreeBucketDatabase<DataStoreTraitsT>::process_update(const BucketId& buc
ValueType entry(found ? entry_from_iterator(iter) : processor.create_entry(bucket));
bool keep = processor.process_entry(entry);
if (found) {
- DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData());
+ DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData().load_relaxed()); // Called from writer only
if (keep) {
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry);
- std::atomic_thread_fence(std::memory_order_release);
- iter.writeData(new_value);
+ iter.getWData().store_release(new_value);
} else {
_tree.remove(iter);
}
} else {
if (keep) {
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, entry);
- _tree.insert(iter, bucket_key, new_value);
+ _tree.insert(iter, bucket_key, AtomicValueWrapper(new_value));
}
}
commit_tree_changes();
@@ -491,7 +487,7 @@ struct BTreeBuilderMerger final : Merger<typename DataStoreTraitsT::ValueType> {
const uint64_t bucket_key = bucket_id.toKey();
assert(bucket_key < _current_key);
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e);
- _builder.insert(bucket_key, new_value);
+ _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value));
}
void update_iteration_state(uint64_t key, uint64_t value) {
@@ -520,7 +516,7 @@ struct BTreeTrailingInserter final : TrailingInserter<typename DataStoreTraitsT:
void insert_at_end(const BucketId& bucket_id, const ValueType& e) override {
const uint64_t bucket_key = bucket_id.toKey();
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_db.store(), e);
- _builder.insert(bucket_key, new_value);
+ _builder.insert(bucket_key, vespalib::datastore::AtomicValueWrapper<uint64_t>(new_value));
}
};
@@ -533,19 +529,19 @@ void GenericBTreeBucketDatabase<DataStoreTraitsT>::merge(MergingProcessor<ValueT
// TODO for_each instead?
for (auto iter = _tree.begin(); iter.valid(); ++iter) {
const uint64_t key = iter.getKey();
- const uint64_t value = iter.getData();
+ const uint64_t value = iter.getData().load_relaxed(); // Only called from writer
merger.update_iteration_state(key, value);
auto result = proc.merge(merger);
if (result == MergingProcessor<ValueType>::Result::KeepUnchanged) {
- builder.insert(key, value); // Reuse array store ref with no changes
+ builder.insert(key, AtomicValueWrapper(value)); // Reuse array store ref with no changes
} else if (result == MergingProcessor<ValueType>::Result::Update) {
assert(merger._valid_cached_value); // Must actually have been touched
assert(merger._cached_value.valid());
DataStoreTraitsT::remove_by_wrapped_value(_store, value);
const auto new_value = DataStoreTraitsT::wrap_and_store_value(_store, merger._cached_value);
- builder.insert(key, new_value);
+ builder.insert(key, AtomicValueWrapper(new_value));
} else if (result == MergingProcessor<ValueType>::Result::Skip) {
DataStoreTraitsT::remove_by_wrapped_value(_store, value);
} else {
diff --git a/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp b/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp
index 05323d1329a..9f98ba05493 100644
--- a/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp
+++ b/vespalib/src/vespa/vespalib/btree/btreenodestore.cpp
@@ -4,6 +4,7 @@
#include "btreerootbase.h"
#include "btreeroot.h"
#include "btreenodeallocator.h"
+#include <vespa/vespalib/datastore/atomic_value_wrapper.h>
#include <vespa/vespalib/datastore/datastore.h>
#include <vespa/vespalib/datastore/buffer_type.hpp>
@@ -43,6 +44,7 @@ VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint32_t, uint32_t, NoAggrega
VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint32_t, BTreeNoLeafData, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS);
VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint32_t, int32_t , MinMaxAggregated, BTreeDefaultTraits::LEAF_SLOTS);
VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint64_t, uint64_t , MinMaxAggregated, BTreeDefaultTraits::LEAF_SLOTS);
+VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(uint64_t, AtomicValueWrapper<uint64_t>, MinMaxAggregated, BTreeDefaultTraits::LEAF_SLOTS);
VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(AtomicEntryRef, AtomicEntryRef, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS);
VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(AtomicEntryRef, BTreeNoLeafData, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS);
VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_LEAFNODE(EntryRef, BTreeNoLeafData, NoAggregated, BTreeDefaultTraits::LEAF_SLOTS);
diff --git a/vespalib/src/vespa/vespalib/datastore/atomic_value_wrapper.h b/vespalib/src/vespa/vespalib/datastore/atomic_value_wrapper.h
new file mode 100644
index 00000000000..3ee871be9b0
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/atomic_value_wrapper.h
@@ -0,0 +1,63 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <atomic>
+
+namespace vespalib::datastore {
+
+/**
+ * Copyable atomic wrapper for a primitive value that offers value store and load
+ * functionality with explicit memory ordering constraints. Intended to be used for
+ * non-EntryRef values where atomicity and transitive visibility is a requirement.
+ *
+ * Copying always happens with relaxed ordering, as it expects that the copier has
+ * already loaded the source AtomicValueWrapper with an ordering that is appropriate
+ * for observing any transitive memory dependencies.
+ *
+ * This wrapper is intentionally not implicitly convertible to/from values of the
+ * underlying primitive type.
+ *
+ * Note: use AtomicEntryRef instead if you're wrapping an EntryRef directly.
+ */
+template <typename T>
+class AtomicValueWrapper {
+ static_assert(std::atomic<T>::is_always_lock_free);
+
+ std::atomic<T> _value;
+public:
+ constexpr AtomicValueWrapper() noexcept : _value() {}
+ constexpr explicit AtomicValueWrapper(T value) noexcept : _value(value) {}
+ AtomicValueWrapper(const AtomicValueWrapper& rhs) noexcept
+ : _value(rhs._value.load(std::memory_order_relaxed))
+ {}
+ AtomicValueWrapper(AtomicValueWrapper&& rhs) noexcept
+ : _value(rhs._value.load(std::memory_order_relaxed))
+ {}
+ AtomicValueWrapper& operator=(const AtomicValueWrapper& rhs) noexcept {
+ _value.store(rhs._value.load(std::memory_order_relaxed),
+ std::memory_order_relaxed);
+ return *this;
+ }
+ void store_release(T value) noexcept {
+ _value.store(value, std::memory_order_release);
+ }
+ void store_relaxed(T value) noexcept {
+ _value.store(value, std::memory_order_relaxed);
+ }
+ [[nodiscard]] T load_acquire() const noexcept {
+ return _value.load(std::memory_order_acquire);
+ }
+ [[nodiscard]] T load_relaxed() const noexcept {
+ return _value.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] bool operator==(const AtomicValueWrapper& rhs) const noexcept {
+ return (load_relaxed() == rhs.load_relaxed());
+ }
+ [[nodiscard]] bool operator!=(const AtomicValueWrapper& rhs) const noexcept {
+ return !(*this == rhs);
+ }
+};
+
+}