aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahooinc.com>2022-03-15 13:09:35 +0100
committerTor Egge <Tor.Egge@yahooinc.com>2022-03-15 13:09:35 +0100
commitb49b67321ada0694712e1a1926518024bbc0c121 (patch)
treea0b7886b3f6c5f22088e20109ebafdecd798eaab
parentbbd8807e32787cf3ae0430d2e33084b586bf8bc6 (diff)
Add acquire_elem_ref() member function to rcu vector.
-rw-r--r--vespalib/src/tests/util/rcuvector/rcuvector_test.cpp151
-rw-r--r--vespalib/src/vespa/vespalib/util/rcuvector.h9
-rw-r--r--vespalib/src/vespa/vespalib/util/rcuvector.hpp19
3 files changed, 173 insertions, 6 deletions
diff --git a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
index 58256a96dac..7161eed24c9 100644
--- a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
+++ b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
@@ -2,14 +2,21 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/test/memory_allocator_observer.h>
+#include <vespa/vespalib/datastore/atomic_value_wrapper.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/rcuvector.h>
+#include <vespa/vespalib/util/rcuvector.hpp>
#include <vespa/vespalib/util/round_up_to_page_size.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <random>
using namespace vespalib;
using vespalib::alloc::Alloc;
using vespalib::alloc::MemoryAllocator;
+using vespalib::datastore::AtomicValueWrapper;
+using vespalib::makeLambdaTask;
using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver;
using AllocStats = MyMemoryAllocator::Stats;
@@ -31,11 +38,13 @@ TEST(RcuVectorTest, basic)
for (int32_t i = 0; i < 100; ++i) {
v.push_back(i);
EXPECT_EQ(i, v[i]);
+ EXPECT_EQ(i, v.acquire_elem_ref(i));
EXPECT_EQ((size_t)i + 1, v.size());
}
for (int32_t i = 0; i < 100; ++i) {
v[i] = i + 1;
EXPECT_EQ(i + 1, v[i]);
+ EXPECT_EQ(i + 1, v.acquire_elem_ref(i));
EXPECT_EQ(100u, v.size());
}
}
@@ -276,13 +285,29 @@ TEST(RcuVectorTest, small_expand)
g.trimHoldLists(2);
}
-struct Fixture {
+struct FixtureBase {
using generation_t = GenerationHandler::generation_t;
AllocStats stats;
std::unique_ptr<MemoryAllocator> allocator;
Alloc initial_alloc;
GenerationHolder g;
+
+ FixtureBase();
+ ~FixtureBase();
+};
+
+FixtureBase::FixtureBase()
+ : stats(),
+ allocator(std::make_unique<MyMemoryAllocator>(stats)),
+ initial_alloc(Alloc::alloc_with_allocator(allocator.get())),
+ g()
+{
+}
+
+FixtureBase::~FixtureBase() = default;
+
+struct Fixture : public FixtureBase {
RcuVectorBase<int> arr;
Fixture();
@@ -295,10 +320,7 @@ struct Fixture {
};
Fixture::Fixture()
- : stats(),
- allocator(std::make_unique<MyMemoryAllocator>(stats)),
- initial_alloc(Alloc::alloc_with_allocator(allocator.get())),
- g(),
+ : FixtureBase(),
arr(g, initial_alloc)
{
arr.reserve(100);
@@ -346,4 +368,123 @@ TEST(RcuVectorTest, ensure_size_and_shrink_use_same_memory_allocator)
EXPECT_EQ(AllocStats(4, 3), f.stats);
}
+namespace {
+
+class ReadStopper {
+ std::atomic<bool>& _stop_read;
+public:
+ ReadStopper(std::atomic<bool>& stop_read)
+ : _stop_read(stop_read)
+ {
+ }
+ ~ReadStopper() {
+ _stop_read = true;
+ }
+};
+
+}
+
+struct StressFixture : public FixtureBase {
+ using AtomicIntWrapper = AtomicValueWrapper<int>;
+ RcuVectorBase<AtomicIntWrapper> arr;
+ std::atomic<bool> stop_read;
+ uint32_t read_area;
+ GenerationHandler generation_handler;
+ vespalib::ThreadStackExecutor writer; // 1 write thread
+ vespalib::ThreadStackExecutor readers; // multiple reader threads
+ StressFixture();
+ ~StressFixture();
+ void commit();
+ void sync();
+ void read_work();
+ void write_work(uint32_t cnt);
+ void run_test(uint32_t cnt, uint32_t num_readers);
+};
+
+StressFixture::StressFixture()
+ : FixtureBase(),
+ arr(g, initial_alloc),
+ stop_read(false),
+ read_area(1000),
+ generation_handler(),
+ writer(1, 128_Ki),
+ readers(4, 128_Ki)
+{
+ arr.ensure_size(read_area, AtomicIntWrapper(0));
+}
+
+StressFixture::~StressFixture() = default;
+
+void
+StressFixture::commit()
+{
+ auto current_gen = generation_handler.getCurrentGeneration();
+ g.transferHoldLists(current_gen);
+ generation_handler.incGeneration();
+ auto first_used_gen = generation_handler.getFirstUsedGeneration();
+ g.trimHoldLists(first_used_gen);
+}
+
+void
+StressFixture::sync()
+{
+ writer.sync();
+ readers.sync();
+}
+
+void
+StressFixture::read_work()
+{
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1);
+ std::vector<int> old(read_area);
+ while (!stop_read.load(std::memory_order_relaxed)) {
+ uint32_t idx = distrib(gen);
+ auto guard = generation_handler.takeGuard();
+ int value = arr.acquire_elem_ref(idx).load_acquire();
+ EXPECT_LE(old[idx], value);
+ old[idx] = value;
+ }
+}
+
+void
+StressFixture::write_work(uint32_t cnt)
+{
+ ReadStopper read_stopper(stop_read);
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::uniform_int_distribution<uint32_t> distrib(0, read_area - 1);
+ for (uint32_t i = 0; i < cnt; ++i) {
+ if ((i % 1000) == 0) {
+ arr.ensure_size(64_Ki + 1, AtomicIntWrapper(0));
+ }
+ if ((i % 1000) == 500) {
+ arr.shrink(read_area);
+ }
+ uint32_t idx = distrib(gen);
+ arr[idx].store_release(arr[idx].load_relaxed() + 1);
+ commit();
+ }
+}
+
+void
+StressFixture::run_test(uint32_t cnt, uint32_t num_readers)
+{
+ auto failed_write_task = writer.execute(makeLambdaTask([this, cnt]() { write_work(cnt); }));
+ ASSERT_FALSE(failed_write_task);
+ for (uint32_t i = 0; i < num_readers; ++i) {
+ readers.execute(makeLambdaTask([this]() { read_work(); }));
+ }
+ sync();
+ commit();
+ EXPECT_LE((cnt / 1000) * 2, stats.alloc_cnt);
+}
+
+TEST(RcuVectorTest, single_writer_four_readers)
+{
+ StressFixture f;
+ f.run_test(20000, 4);
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.h b/vespalib/src/vespa/vespalib/util/rcuvector.h
index 00d050fa8d1..d2d0a946b91 100644
--- a/vespalib/src/vespa/vespalib/util/rcuvector.h
+++ b/vespalib/src/vespa/vespalib/util/rcuvector.h
@@ -44,6 +44,7 @@ protected:
using GenerationHolderType = GenerationHolder;
private:
ArrayType _data;
+ std::atomic<const T*> _vector_start;
size_t _growPercent;
size_t _growDelta;
GenerationHolderType &_genHolder;
@@ -57,6 +58,8 @@ private:
}
void expand(size_t newCapacity);
void expandAndInsert(const T & v);
+ void update_vector_start();
+protected:
virtual void onReallocation();
public:
@@ -118,6 +121,12 @@ public:
void clear() { _data.clear(); }
T & operator[](size_t i) { return _data[i]; }
const T & operator[](size_t i) const { return _data[i]; }
+ /*
+ * Readers holding a generation guard can call acquire_elem_ref(i)
+ * to get a const reference to element i. Array bound must be handled
+ * by reader, cf. committed docid limit in attribute vectors.
+ */
+ const T& acquire_elem_ref(size_t i) const noexcept { return *(_vector_start.load(std::memory_order_acquire) + i); }
void reset();
void shrink(size_t newSize) __attribute__((noinline));
diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.hpp b/vespalib/src/vespa/vespalib/util/rcuvector.hpp
index 7c70539da0e..81928251b19 100644
--- a/vespalib/src/vespa/vespalib/util/rcuvector.hpp
+++ b/vespalib/src/vespa/vespalib/util/rcuvector.hpp
@@ -21,12 +21,14 @@ template <typename T>
void
RcuVectorBase<T>::unsafe_resize(size_t n) {
_data.resize(n);
+ update_vector_start();
}
template <typename T>
void
RcuVectorBase<T>::unsafe_reserve(size_t n) {
_data.reserve(n);
+ update_vector_start();
}
template <typename T>
@@ -112,11 +114,13 @@ template <typename T>
RcuVectorBase<T>::RcuVectorBase(GenerationHolderType &genHolder,
const Alloc &initialAlloc)
: _data(initialAlloc),
+ _vector_start(nullptr),
_growPercent(100),
_growDelta(0),
_genHolder(genHolder)
{
_data.reserve(16);
+ update_vector_start();
}
template <typename T>
@@ -126,11 +130,13 @@ RcuVectorBase<T>::RcuVectorBase(size_t initialCapacity,
GenerationHolderType &genHolder,
const Alloc &initialAlloc)
: _data(initialAlloc),
+ _vector_start(nullptr),
_growPercent(growPercent),
_growDelta(growDelta),
_genHolder(genHolder)
{
_data.reserve(initialCapacity);
+ update_vector_start();
}
template <typename T>
@@ -154,11 +160,22 @@ RcuVectorBase<T>::getMemoryUsage() const
template <typename T>
void
-RcuVectorBase<T>::onReallocation() { }
+RcuVectorBase<T>::update_vector_start()
+{
+ _vector_start.store(&_data[0], std::memory_order_release);
+}
+
+template <typename T>
+void
+RcuVectorBase<T>::onReallocation()
+{
+ update_vector_start();
+}
template <typename T>
void
RcuVector<T>::onReallocation() {
+ RcuVectorBase<T>::onReallocation();
_genHolderStore.transferHoldLists(_generation);
}