aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahooinc.com>2022-03-15 13:09:35 +0100
committerTor Egge <Tor.Egge@yahooinc.com>2022-03-15 13:09:35 +0100
commitb49b67321ada0694712e1a1926518024bbc0c121 (patch)
treea0b7886b3f6c5f22088e20109ebafdecd798eaab /vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
parentbbd8807e32787cf3ae0430d2e33084b586bf8bc6 (diff)
Add acquire_elem_ref() member function to rcu vector.
Diffstat (limited to 'vespalib/src/tests/util/rcuvector/rcuvector_test.cpp')
-rw-r--r--vespalib/src/tests/util/rcuvector/rcuvector_test.cpp151
1 files changed, 146 insertions, 5 deletions
diff --git a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
index 58256a96dac..7161eed24c9 100644
--- a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
+++ b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
@@ -2,14 +2,21 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/test/memory_allocator_observer.h>
+#include <vespa/vespalib/datastore/atomic_value_wrapper.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/rcuvector.h>
+#include <vespa/vespalib/util/rcuvector.hpp>
#include <vespa/vespalib/util/round_up_to_page_size.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <random>
using namespace vespalib;
using vespalib::alloc::Alloc;
using vespalib::alloc::MemoryAllocator;
+using vespalib::datastore::AtomicValueWrapper;
+using vespalib::makeLambdaTask;
using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver;
using AllocStats = MyMemoryAllocator::Stats;
@@ -31,11 +38,13 @@ TEST(RcuVectorTest, basic)
for (int32_t i = 0; i < 100; ++i) {
v.push_back(i);
EXPECT_EQ(i, v[i]);
+ EXPECT_EQ(i, v.acquire_elem_ref(i));
EXPECT_EQ((size_t)i + 1, v.size());
}
for (int32_t i = 0; i < 100; ++i) {
v[i] = i + 1;
EXPECT_EQ(i + 1, v[i]);
+ EXPECT_EQ(i + 1, v.acquire_elem_ref(i));
EXPECT_EQ(100u, v.size());
}
}
@@ -276,13 +285,29 @@ TEST(RcuVectorTest, small_expand)
g.trimHoldLists(2);
}
-struct Fixture {
+struct FixtureBase {
using generation_t = GenerationHandler::generation_t;
AllocStats stats;
std::unique_ptr<MemoryAllocator> allocator;
Alloc initial_alloc;
GenerationHolder g;
+
+ FixtureBase();
+ ~FixtureBase();
+};
+
+FixtureBase::FixtureBase()
+ : stats(),
+ allocator(std::make_unique<MyMemoryAllocator>(stats)),
+ initial_alloc(Alloc::alloc_with_allocator(allocator.get())),
+ g()
+{
+}
+
+FixtureBase::~FixtureBase() = default;
+
+struct Fixture : public FixtureBase {
RcuVectorBase<int> arr;
Fixture();
@@ -295,10 +320,7 @@ struct Fixture {
};
Fixture::Fixture()
- : stats(),
- allocator(std::make_unique<MyMemoryAllocator>(stats)),
- initial_alloc(Alloc::alloc_with_allocator(allocator.get())),
- g(),
+ : FixtureBase(),
arr(g, initial_alloc)
{
arr.reserve(100);
@@ -346,4 +368,123 @@ TEST(RcuVectorTest, ensure_size_and_shrink_use_same_memory_allocator)
EXPECT_EQ(AllocStats(4, 3), f.stats);
}
+namespace {
+
+class ReadStopper {
+ std::atomic<bool>& _stop_read;
+public:
+ ReadStopper(std::atomic<bool>& stop_read)
+ : _stop_read(stop_read)
+ {
+ }
+ ~ReadStopper() {
+ _stop_read = true;
+ }
+};
+
+}
+
+struct StressFixture : public FixtureBase {
+ using AtomicIntWrapper = AtomicValueWrapper<int>;
+ RcuVectorBase<AtomicIntWrapper> arr;
+ std::atomic<bool> stop_read;
+ uint32_t read_area;
+ GenerationHandler generation_handler;
+ vespalib::ThreadStackExecutor writer; // 1 write thread
+ vespalib::ThreadStackExecutor readers; // multiple reader threads
+ StressFixture();
+ ~StressFixture();
+ void commit();
+ void sync();
+ void read_work();
+ void write_work(uint32_t cnt);
+ void run_test(uint32_t cnt, uint32_t num_readers);
+};
+
+StressFixture::StressFixture()
+ : FixtureBase(),
+ arr(g, initial_alloc),
+ stop_read(false),
+ read_area(1000),
+ generation_handler(),
+ writer(1, 128_Ki),
+ readers(4, 128_Ki)
+{
+ arr.ensure_size(read_area, AtomicIntWrapper(0));
+}
+
+StressFixture::~StressFixture() = default;
+
+void
+StressFixture::commit()
+{
+ auto current_gen = generation_handler.getCurrentGeneration();
+ g.transferHoldLists(current_gen);
+ generation_handler.incGeneration();
+ auto first_used_gen = generation_handler.getFirstUsedGeneration();
+ g.trimHoldLists(first_used_gen);
+}
+
+void
+StressFixture::sync()
+{
+ writer.sync();
+ readers.sync();
+}
+
+void
+StressFixture::read_work()
+{
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1);
+ std::vector<int> old(read_area);
+ while (!stop_read.load(std::memory_order_relaxed)) {
+ uint32_t idx = distrib(gen);
+ auto guard = generation_handler.takeGuard();
+ int value = arr.acquire_elem_ref(idx).load_acquire();
+ EXPECT_LE(old[idx], value);
+ old[idx] = value;
+ }
+}
+
+void
+StressFixture::write_work(uint32_t cnt)
+{
+ ReadStopper read_stopper(stop_read);
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1);
+ for (uint32_t i = 0; i < cnt; ++i) {
+ if ((i % 1000) == 0) {
+ arr.ensure_size(64_Ki + 1, AtomicIntWrapper(0));
+ }
+ if ((i % 1000) == 500) {
+ arr.shrink(read_area);
+ }
+ uint32_t idx = distrib(gen);
+ arr[idx].store_release(arr[idx].load_relaxed() + 1);
+ commit();
+ }
+}
+
+void
+StressFixture::run_test(uint32_t cnt, uint32_t num_readers)
+{
+ auto failed_write_task = writer.execute(makeLambdaTask([this, cnt]() { write_work(cnt); }));
+ ASSERT_FALSE(failed_write_task);
+ for (uint32_t i = 0; i < num_readers; ++i) {
+ readers.execute(makeLambdaTask([this]() { read_work(); }));
+ }
+ sync();
+ commit();
+ EXPECT_LE((cnt / 1000) * 2, stats.alloc_cnt);
+}
+
+TEST(RcuVectorTest, single_writer_four_readers)
+{
+ StressFixture f;
+ f.run_test(20000, 4);
+}
+
GTEST_MAIN_RUN_ALL_TESTS()