diff options
Diffstat (limited to 'vespalib/src/tests/util/rcuvector/rcuvector_test.cpp')
-rw-r--r-- | vespalib/src/tests/util/rcuvector/rcuvector_test.cpp | 151 |
1 files changed, 146 insertions, 5 deletions
diff --git a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp index 58256a96dac..7161eed24c9 100644 --- a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp +++ b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp @@ -2,14 +2,21 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/test/memory_allocator_observer.h> +#include <vespa/vespalib/datastore/atomic_value_wrapper.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/rcuvector.h> +#include <vespa/vespalib/util/rcuvector.hpp> #include <vespa/vespalib/util/round_up_to_page_size.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <random> using namespace vespalib; using vespalib::alloc::Alloc; using vespalib::alloc::MemoryAllocator; +using vespalib::datastore::AtomicValueWrapper; +using vespalib::makeLambdaTask; using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver; using AllocStats = MyMemoryAllocator::Stats; @@ -31,11 +38,13 @@ TEST(RcuVectorTest, basic) for (int32_t i = 0; i < 100; ++i) { v.push_back(i); EXPECT_EQ(i, v[i]); + EXPECT_EQ(i, v.acquire_elem_ref(i)); EXPECT_EQ((size_t)i + 1, v.size()); } for (int32_t i = 0; i < 100; ++i) { v[i] = i + 1; EXPECT_EQ(i + 1, v[i]); + EXPECT_EQ(i + 1, v.acquire_elem_ref(i)); EXPECT_EQ(100u, v.size()); } } @@ -276,13 +285,29 @@ TEST(RcuVectorTest, small_expand) g.trimHoldLists(2); } -struct Fixture { +struct FixtureBase { using generation_t = GenerationHandler::generation_t; AllocStats stats; std::unique_ptr<MemoryAllocator> allocator; Alloc initial_alloc; GenerationHolder g; + + FixtureBase(); + ~FixtureBase(); +}; + +FixtureBase::FixtureBase() + : stats(), + allocator(std::make_unique<MyMemoryAllocator>(stats)), + initial_alloc(Alloc::alloc_with_allocator(allocator.get())), + g() +{ +} + +FixtureBase::~FixtureBase() = default; + +struct Fixture : public FixtureBase { RcuVectorBase<int> arr; Fixture(); @@ -295,10 +320,7 @@ struct Fixture { }; Fixture::Fixture() - : stats(), - allocator(std::make_unique<MyMemoryAllocator>(stats)), - initial_alloc(Alloc::alloc_with_allocator(allocator.get())), - g(), + : FixtureBase(), arr(g, initial_alloc) { arr.reserve(100); @@ -346,4 +368,123 @@ TEST(RcuVectorTest, ensure_size_and_shrink_use_same_memory_allocator) EXPECT_EQ(AllocStats(4, 3), f.stats); } +namespace { + +class ReadStopper { + std::atomic<bool>& _stop_read; +public: + ReadStopper(std::atomic<bool>& stop_read) + : _stop_read(stop_read) + { + } + ~ReadStopper() { + _stop_read = true; + } +}; + +} + +struct StressFixture : public FixtureBase { + using AtomicIntWrapper = AtomicValueWrapper<int>; + RcuVectorBase<AtomicIntWrapper> arr; + std::atomic<bool> stop_read; + uint32_t read_area; + GenerationHandler generation_handler; + vespalib::ThreadStackExecutor writer; // 1 write thread + vespalib::ThreadStackExecutor readers; // multiple reader threads + StressFixture(); + ~StressFixture(); + void commit(); + void sync(); + void read_work(); + void write_work(uint32_t cnt); + void run_test(uint32_t cnt, uint32_t num_readers); +}; + +StressFixture::StressFixture() + : FixtureBase(), + arr(g, initial_alloc), + stop_read(false), + read_area(1000), + generation_handler(), + writer(1, 128_Ki), + readers(4, 128_Ki) +{ + arr.ensure_size(read_area, AtomicIntWrapper(0)); +} + +StressFixture::~StressFixture() = default; + +void +StressFixture::commit() +{ + auto current_gen = generation_handler.getCurrentGeneration(); + g.transferHoldLists(current_gen); + generation_handler.incGeneration(); + auto first_used_gen = generation_handler.getFirstUsedGeneration(); + g.trimHoldLists(first_used_gen); +} + +void +StressFixture::sync() +{ + writer.sync(); + readers.sync(); +} + +void +StressFixture::read_work() +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1); + std::vector<int> old(read_area); + while (!stop_read.load(std::memory_order_relaxed)) { + uint32_t idx = distrib(gen); + auto guard = generation_handler.takeGuard(); + int value = arr.acquire_elem_ref(idx).load_acquire(); + EXPECT_LE(old[idx], value); + old[idx] = value; + } +} + +void +StressFixture::write_work(uint32_t cnt) +{ + ReadStopper read_stopper(stop_read); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1); + for (uint32_t i = 0; i < cnt; ++i) { + if ((i % 1000) == 0) { + arr.ensure_size(64_Ki + 1, AtomicIntWrapper(0)); + } + if ((i % 1000) == 500) { + arr.shrink(read_area); + } + uint32_t idx = distrib(gen); + arr[idx].store_release(arr[idx].load_relaxed() + 1); + commit(); + } +} + +void +StressFixture::run_test(uint32_t cnt, uint32_t num_readers) +{ + auto failed_write_task = writer.execute(makeLambdaTask([this, cnt]() { write_work(cnt); })); + ASSERT_FALSE(failed_write_task); + for (uint32_t i = 0; i < num_readers; ++i) { + readers.execute(makeLambdaTask([this]() { read_work(); })); + } + sync(); + commit(); + EXPECT_LE((cnt / 1000) * 2, stats.alloc_cnt); +} + +TEST(RcuVectorTest, single_writer_four_readers) +{ + StressFixture f; + f.run_test(20000, 4); +} + GTEST_MAIN_RUN_ALL_TESTS() |