diff options
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/util/rcuvector/rcuvector_test.cpp | 151 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/rcuvector.h | 9 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/rcuvector.hpp | 19 |
3 files changed, 173 insertions, 6 deletions
diff --git a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp index 58256a96dac..7161eed24c9 100644 --- a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp +++ b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp @@ -2,14 +2,21 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/test/memory_allocator_observer.h> +#include <vespa/vespalib/datastore/atomic_value_wrapper.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/rcuvector.h> +#include <vespa/vespalib/util/rcuvector.hpp> #include <vespa/vespalib/util/round_up_to_page_size.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <random> using namespace vespalib; using vespalib::alloc::Alloc; using vespalib::alloc::MemoryAllocator; +using vespalib::datastore::AtomicValueWrapper; +using vespalib::makeLambdaTask; using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver; using AllocStats = MyMemoryAllocator::Stats; @@ -31,11 +38,13 @@ TEST(RcuVectorTest, basic) for (int32_t i = 0; i < 100; ++i) { v.push_back(i); EXPECT_EQ(i, v[i]); + EXPECT_EQ(i, v.acquire_elem_ref(i)); EXPECT_EQ((size_t)i + 1, v.size()); } for (int32_t i = 0; i < 100; ++i) { v[i] = i + 1; EXPECT_EQ(i + 1, v[i]); + EXPECT_EQ(i + 1, v.acquire_elem_ref(i)); EXPECT_EQ(100u, v.size()); } } @@ -276,13 +285,29 @@ TEST(RcuVectorTest, small_expand) g.trimHoldLists(2); } -struct Fixture { +struct FixtureBase { using generation_t = GenerationHandler::generation_t; AllocStats stats; std::unique_ptr<MemoryAllocator> allocator; Alloc initial_alloc; GenerationHolder g; + + FixtureBase(); + ~FixtureBase(); +}; + +FixtureBase::FixtureBase() + : stats(), + allocator(std::make_unique<MyMemoryAllocator>(stats)), + initial_alloc(Alloc::alloc_with_allocator(allocator.get())), + g() +{ +} + +FixtureBase::~FixtureBase() = default; + +struct Fixture : public FixtureBase { RcuVectorBase<int> arr; Fixture(); @@ -295,10 +320,7 @@ struct Fixture { }; Fixture::Fixture() - : stats(), - allocator(std::make_unique<MyMemoryAllocator>(stats)), - initial_alloc(Alloc::alloc_with_allocator(allocator.get())), - g(), + : FixtureBase(), arr(g, initial_alloc) { arr.reserve(100); @@ -346,4 +368,123 @@ TEST(RcuVectorTest, ensure_size_and_shrink_use_same_memory_allocator) EXPECT_EQ(AllocStats(4, 3), f.stats); } +namespace { + +class ReadStopper { + std::atomic<bool>& _stop_read; +public: + ReadStopper(std::atomic<bool>& stop_read) + : _stop_read(stop_read) + { + } + ~ReadStopper() { + _stop_read = true; + } +}; + +} + +struct StressFixture : public FixtureBase { + using AtomicIntWrapper = AtomicValueWrapper<int>; + RcuVectorBase<AtomicIntWrapper> arr; + std::atomic<bool> stop_read; + uint32_t read_area; + GenerationHandler generation_handler; + vespalib::ThreadStackExecutor writer; // 1 write thread + vespalib::ThreadStackExecutor readers; // multiple reader threads + StressFixture(); + ~StressFixture(); + void commit(); + void sync(); + void read_work(); + void write_work(uint32_t cnt); + void run_test(uint32_t cnt, uint32_t num_readers); +}; + +StressFixture::StressFixture() + : FixtureBase(), + arr(g, initial_alloc), + stop_read(false), + read_area(1000), + generation_handler(), + writer(1, 128_Ki), + readers(4, 128_Ki) +{ + arr.ensure_size(read_area, AtomicIntWrapper(0)); +} + +StressFixture::~StressFixture() = default; + +void +StressFixture::commit() +{ + auto current_gen = generation_handler.getCurrentGeneration(); + g.transferHoldLists(current_gen); + generation_handler.incGeneration(); + auto first_used_gen = generation_handler.getFirstUsedGeneration(); + g.trimHoldLists(first_used_gen); +} + +void +StressFixture::sync() +{ + writer.sync(); + readers.sync(); +} + +void +StressFixture::read_work() +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1); + std::vector<int> old(read_area); + while (!stop_read.load(std::memory_order_relaxed)) { + uint32_t idx = distrib(gen); + auto guard = generation_handler.takeGuard(); + int value = arr.acquire_elem_ref(idx).load_acquire(); + EXPECT_LE(old[idx], value); + old[idx] = value; + } +} + +void +StressFixture::write_work(uint32_t cnt) +{ + ReadStopper read_stopper(stop_read); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1); + for (uint32_t i = 0; i < cnt; ++i) { + if ((i % 1000) == 0) { + arr.ensure_size(64_Ki + 1, AtomicIntWrapper(0)); + } + if ((i % 1000) == 500) { + arr.shrink(read_area); + } + uint32_t idx = distrib(gen); + arr[idx].store_release(arr[idx].load_relaxed() + 1); + commit(); + } +} + +void +StressFixture::run_test(uint32_t cnt, uint32_t num_readers) +{ + auto failed_write_task = writer.execute(makeLambdaTask([this, cnt]() { write_work(cnt); })); + ASSERT_FALSE(failed_write_task); + for (uint32_t i = 0; i < num_readers; ++i) { + readers.execute(makeLambdaTask([this]() { read_work(); })); + } + sync(); + commit(); + EXPECT_LE((cnt / 1000) * 2, stats.alloc_cnt); +} + +TEST(RcuVectorTest, single_writer_four_readers) +{ + StressFixture f; + f.run_test(20000, 4); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.h b/vespalib/src/vespa/vespalib/util/rcuvector.h index 00d050fa8d1..d2d0a946b91 100644 --- a/vespalib/src/vespa/vespalib/util/rcuvector.h +++ b/vespalib/src/vespa/vespalib/util/rcuvector.h @@ -44,6 +44,7 @@ protected: using GenerationHolderType = GenerationHolder; private: ArrayType _data; + std::atomic<const T*> _vector_start; size_t _growPercent; size_t _growDelta; GenerationHolderType &_genHolder; @@ -57,6 +58,8 @@ private: } void expand(size_t newCapacity); void expandAndInsert(const T & v); + void update_vector_start(); +protected: virtual void onReallocation(); public: @@ -118,6 +121,12 @@ public: void clear() { _data.clear(); } T & operator[](size_t i) { return _data[i]; } const T & operator[](size_t i) const { return _data[i]; } + /* + * Readers holding a generation guard can call acquire_elem_ref(i) + * to get a const reference to element i. Array bound must be handled + * by reader, cf. committed docid limit in attribute vectors. + */ + const T& acquire_elem_ref(size_t i) const noexcept { return *(_vector_start.load(std::memory_order_acquire) + i); } void reset(); void shrink(size_t newSize) __attribute__((noinline)); diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.hpp b/vespalib/src/vespa/vespalib/util/rcuvector.hpp index 7c70539da0e..81928251b19 100644 --- a/vespalib/src/vespa/vespalib/util/rcuvector.hpp +++ b/vespalib/src/vespa/vespalib/util/rcuvector.hpp @@ -21,12 +21,14 @@ template <typename T> void RcuVectorBase<T>::unsafe_resize(size_t n) { _data.resize(n); + update_vector_start(); } template <typename T> void RcuVectorBase<T>::unsafe_reserve(size_t n) { _data.reserve(n); + update_vector_start(); } template <typename T> @@ -112,11 +114,13 @@ template <typename T> RcuVectorBase<T>::RcuVectorBase(GenerationHolderType &genHolder, const Alloc &initialAlloc) : _data(initialAlloc), + _vector_start(nullptr), _growPercent(100), _growDelta(0), _genHolder(genHolder) { _data.reserve(16); + update_vector_start(); } template <typename T> @@ -126,11 +130,13 @@ RcuVectorBase<T>::RcuVectorBase(size_t initialCapacity, GenerationHolderType &genHolder, const Alloc &initialAlloc) : _data(initialAlloc), + _vector_start(nullptr), _growPercent(growPercent), _growDelta(growDelta), _genHolder(genHolder) { _data.reserve(initialCapacity); + update_vector_start(); } template <typename T> @@ -154,11 +160,22 @@ RcuVectorBase<T>::getMemoryUsage() const template <typename T> void -RcuVectorBase<T>::onReallocation() { } +RcuVectorBase<T>::update_vector_start() +{ + _vector_start.store(&_data[0], std::memory_order_release); +} + +template <typename T> +void +RcuVectorBase<T>::onReallocation() +{ + update_vector_start(); +} template <typename T> void RcuVector<T>::onReallocation() { + RcuVectorBase<T>::onReallocation(); _genHolderStore.transferHoldLists(_generation); } |