summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-03-01 19:19:22 +0100
committerTor Egge <Tor.Egge@online.no>2022-03-01 19:22:32 +0100
commit726654b5dc5c349d9c3c4ed8abd816449cefd091 (patch)
treeb488537ceb3455c9ac8905266256001efc7d14d7 /vespalib
parent2090ed6965c0c5b627a656ac272703326a75e1fb (diff)
Test lifetime of indirect values accessed via atomic pointer.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp113
1 files changed, 109 insertions, 4 deletions
diff --git a/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp b/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp
index 0689909da09..43e72ca73f8 100644
--- a/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp
+++ b/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp
@@ -3,11 +3,14 @@
LOG_SETUP("generation_handler_stress_test");
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/generationhandler.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <thread>
using vespalib::Executor;
using vespalib::GenerationHandler;
+using vespalib::makeLambdaTask;
using vespalib::ThreadStackExecutor;
namespace {
@@ -27,6 +30,24 @@ struct WorkContext
}
};
+struct IndirectContext {
+ std::atomic<uint64_t *> _value_ptr;
+ char _pad[256];
+ static constexpr size_t values_size = 65536;
+ uint64_t _values[values_size];
+
+ IndirectContext();
+ uint64_t* calc_value_ptr(uint64_t idx) { return &_values[(idx & (values_size - 1))]; }
+};
+
+IndirectContext::IndirectContext()
+ : _value_ptr(nullptr),
+ _pad(),
+ _values()
+{
+ _value_ptr = &_values[0];
+}
+
class Fixture : public ::testing::Test {
protected:
GenerationHandler _generationHandler;
@@ -36,7 +57,7 @@ protected:
std::atomic<long> _readSeed;
std::atomic<long> _doneWriteWork;
std::atomic<long> _doneReadWork;
- std::atomic<int> _stopRead;
+ std::atomic<bool> _stopRead;
bool _reportWork;
Fixture();
@@ -46,9 +67,12 @@ protected:
uint32_t getReadThreads() const { return _readThreads; }
void stressTest(uint32_t writeCnt);
+ void stress_test_indirect(uint64_t write_cnt);
public:
void readWork(const WorkContext &context);
void writeWork(uint32_t cnt, WorkContext &context);
+ void read_indirect_work(const IndirectContext& context);
+ void write_indirect_work(uint64_t cnt, IndirectContext& context);
private:
Fixture(const Fixture &index) = delete;
Fixture(Fixture &&index) = delete;
@@ -65,7 +89,7 @@ Fixture::Fixture()
_readers(),
_doneWriteWork(0),
_doneReadWork(0),
- _stopRead(0),
+ _stopRead(false),
_reportWork(false)
{
set_read_threads(1);
@@ -104,7 +128,7 @@ Fixture::readWork(const WorkContext &context)
uint32_t i;
uint32_t cnt = std::numeric_limits<uint32_t>::max();
- for (i = 0; i < cnt && _stopRead.load() == 0; ++i) {
+ for (i = 0; i < cnt && !_stopRead.load(); ++i) {
auto guard = _generationHandler.takeGuard();
auto generation = context._generation.load(std::memory_order_relaxed);
EXPECT_GE(generation, guard.getGeneration());
@@ -122,7 +146,7 @@ Fixture::writeWork(uint32_t cnt, WorkContext &context)
_generationHandler.incGeneration();
}
_doneWriteWork += cnt;
- _stopRead = 1;
+ _stopRead = true;
LOG(info, "done %u write work", cnt);
}
@@ -178,6 +202,75 @@ Fixture::stressTest(uint32_t writeCnt)
_readers->sync();
}
+void
+Fixture::read_indirect_work(const IndirectContext& context)
+{
+ uint64_t i;
+ uint64_t cnt = std::numeric_limits<uint32_t>::max();
+ uint64_t old_value = 0;
+ for (i = 0; i < cnt && !_stopRead.load(); ++i) {
+ auto guard = _generationHandler.takeGuard();
+ // Data referenced by pointer is protected by guard
+ auto v_ptr = context._value_ptr.load(std::memory_order_acquire);
+ EXPECT_GE(*v_ptr, old_value);
+ old_value = *v_ptr;
+ }
+ _doneReadWork += i;
+ LOG(info, "done %" PRIu64 " read work", i);
+}
+
+
+void
+Fixture::write_indirect_work(uint64_t cnt, IndirectContext& context)
+{
+ uint32_t sleep_cnt = 0;
+ ASSERT_EQ(0, _generationHandler.getCurrentGeneration());
+ auto oldest_gen = _generationHandler.getFirstUsedGeneration();
+ for (uint64_t i = 0; i < cnt; ++i) {
+ auto gen = _generationHandler.getCurrentGeneration();
+ // Hold data for gen, write new data for next_gen
+ auto next_gen = gen + 1;
+ auto *v_ptr = context.calc_value_ptr(next_gen);
+ ASSERT_EQ(0u, *v_ptr) << (_stopRead = true, "");
+ *v_ptr = next_gen;
+ context._value_ptr.store(v_ptr, std::memory_order_release);
+ _generationHandler.incGeneration();
+ auto first_used_gen = _generationHandler.getFirstUsedGeneration();
+ while (oldest_gen < first_used_gen) {
+ // Clear data that readers should no longer have access to.
+ *context.calc_value_ptr(oldest_gen) = 0;
+ ++oldest_gen;
+ }
+ while ((next_gen - first_used_gen) >= context.values_size - 2) {
+ // Sleep if writer gets too much ahead of readers.
+ std::this_thread::sleep_for(1ms);
+ ++sleep_cnt;
+ _generationHandler.updateFirstUsedGeneration();
+ first_used_gen = _generationHandler.getFirstUsedGeneration();
+ }
+ }
+ _doneWriteWork += cnt;
+ _stopRead = true;
+ LOG(info, "done %" PRIu64 " write work, %u sleeps", cnt, sleep_cnt);
+}
+
+void
+Fixture::stress_test_indirect(uint64_t write_cnt)
+{
+ _reportWork = true;
+ uint32_t read_threads = getReadThreads();
+ LOG(info, "starting stress test indirect, 1 write thread, %u read threads, %" PRIu64 " writes", read_threads, write_cnt);
+ auto context = std::make_shared<IndirectContext>();
+ _writer.execute(makeLambdaTask([this, context, write_cnt]() { write_indirect_work(write_cnt, *context); }));
+#if 1
+ for (uint32_t i = 0; i < read_threads; ++i) {
+ _readers->execute(makeLambdaTask([this, context]() { read_indirect_work(*context); }));
+ }
+#endif
+ _writer.sync();
+ _readers->sync();
+}
+
using GenerationHandlerStressTest = Fixture;
TEST_F(GenerationHandlerStressTest, stress_test_2_readers)
@@ -192,6 +285,18 @@ TEST_F(GenerationHandlerStressTest, stress_test_4_readers)
stressTest(smoke_test ? 10000 : 1000000);
}
+TEST_F(GenerationHandlerStressTest, stress_test_indirect_2_readers)
+{
+ set_read_threads(2);
+ stress_test_indirect(smoke_test ? 10000 : 1000000);
+}
+
+TEST_F(GenerationHandlerStressTest, stress_test_indirect_4_readers)
+{
+ set_read_threads(4);
+ stress_test_indirect(smoke_test ? 10000 : 1000000);
+}
+
int main(int argc, char **argv) {
if (argc > 1 && argv[1] == smoke_test_option) {
smoke_test = true;