diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-03-01 19:19:22 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-03-01 19:22:32 +0100 |
commit | 726654b5dc5c349d9c3c4ed8abd816449cefd091 (patch) | |
tree | b488537ceb3455c9ac8905266256001efc7d14d7 /vespalib/src | |
parent | 2090ed6965c0c5b627a656ac272703326a75e1fb (diff) |
Test lifetime of indirect values accessed via atomic pointer.
Diffstat (limited to 'vespalib/src')
-rw-r--r-- | vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp | 113 |
1 files changed, 109 insertions, 4 deletions
diff --git a/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp b/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp index 0689909da09..43e72ca73f8 100644 --- a/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp +++ b/vespalib/src/tests/util/generationhandler_stress/generation_handler_stress_test.cpp @@ -3,11 +3,14 @@ LOG_SETUP("generation_handler_stress_test"); #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/generationhandler.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/size_literals.h> +#include <thread> using vespalib::Executor; using vespalib::GenerationHandler; +using vespalib::makeLambdaTask; using vespalib::ThreadStackExecutor; namespace { @@ -27,6 +30,24 @@ struct WorkContext } }; +struct IndirectContext { + std::atomic<uint64_t *> _value_ptr; + char _pad[256]; + static constexpr size_t values_size = 65536; + uint64_t _values[values_size]; + + IndirectContext(); + uint64_t* calc_value_ptr(uint64_t idx) { return &_values[(idx & (values_size - 1))]; } +}; + +IndirectContext::IndirectContext() + : _value_ptr(nullptr), + _pad(), + _values() +{ + _value_ptr = &_values[0]; +} + class Fixture : public ::testing::Test { protected: GenerationHandler _generationHandler; @@ -36,7 +57,7 @@ protected: std::atomic<long> _readSeed; std::atomic<long> _doneWriteWork; std::atomic<long> _doneReadWork; - std::atomic<int> _stopRead; + std::atomic<bool> _stopRead; bool _reportWork; Fixture(); @@ -46,9 +67,12 @@ protected: uint32_t getReadThreads() const { return _readThreads; } void stressTest(uint32_t writeCnt); + void stress_test_indirect(uint64_t write_cnt); public: void readWork(const WorkContext &context); void writeWork(uint32_t cnt, WorkContext &context); + void read_indirect_work(const IndirectContext& context); + void write_indirect_work(uint64_t cnt, IndirectContext& context); private: Fixture(const Fixture &index) = delete; Fixture(Fixture &&index) = delete; @@ -65,7 +89,7 @@ Fixture::Fixture() _readers(), _doneWriteWork(0), _doneReadWork(0), - _stopRead(0), + _stopRead(false), _reportWork(false) { set_read_threads(1); @@ -104,7 +128,7 @@ Fixture::readWork(const WorkContext &context) uint32_t i; uint32_t cnt = std::numeric_limits<uint32_t>::max(); - for (i = 0; i < cnt && _stopRead.load() == 0; ++i) { + for (i = 0; i < cnt && !_stopRead.load(); ++i) { auto guard = _generationHandler.takeGuard(); auto generation = context._generation.load(std::memory_order_relaxed); EXPECT_GE(generation, guard.getGeneration()); @@ -122,7 +146,7 @@ Fixture::writeWork(uint32_t cnt, WorkContext &context) _generationHandler.incGeneration(); } _doneWriteWork += cnt; - _stopRead = 1; + _stopRead = true; LOG(info, "done %u write work", cnt); } @@ -178,6 +202,75 @@ Fixture::stressTest(uint32_t writeCnt) _readers->sync(); } +void +Fixture::read_indirect_work(const IndirectContext& context) +{ + uint64_t i; + uint64_t cnt = std::numeric_limits<uint32_t>::max(); + uint64_t old_value = 0; + for (i = 0; i < cnt && !_stopRead.load(); ++i) { + auto guard = _generationHandler.takeGuard(); + // Data referenced by pointer is protected by guard + auto v_ptr = context._value_ptr.load(std::memory_order_acquire); + EXPECT_GE(*v_ptr, old_value); + old_value = *v_ptr; + } + _doneReadWork += i; + LOG(info, "done %" PRIu64 " read work", i); +} + + +void +Fixture::write_indirect_work(uint64_t cnt, IndirectContext& context) +{ + uint32_t sleep_cnt = 0; + ASSERT_EQ(0, _generationHandler.getCurrentGeneration()); + auto oldest_gen = _generationHandler.getFirstUsedGeneration(); + for (uint64_t i = 0; i < cnt; ++i) { + auto gen = _generationHandler.getCurrentGeneration(); + // Hold data for gen, write new data for next_gen + auto next_gen = gen + 1; + auto *v_ptr = context.calc_value_ptr(next_gen); + ASSERT_EQ(0u, *v_ptr) << (_stopRead = true, ""); + *v_ptr = next_gen; + context._value_ptr.store(v_ptr, std::memory_order_release); + _generationHandler.incGeneration(); + auto first_used_gen = _generationHandler.getFirstUsedGeneration(); + while (oldest_gen < first_used_gen) { + // Clear data that readers should no longer have access to. + *context.calc_value_ptr(oldest_gen) = 0; + ++oldest_gen; + } + while ((next_gen - first_used_gen) >= context.values_size - 2) { + // Sleep if writer gets too much ahead of readers. + std::this_thread::sleep_for(1ms); + ++sleep_cnt; + _generationHandler.updateFirstUsedGeneration(); + first_used_gen = _generationHandler.getFirstUsedGeneration(); + } + } + _doneWriteWork += cnt; + _stopRead = true; + LOG(info, "done %" PRIu64 " write work, %u sleeps", cnt, sleep_cnt); +} + +void +Fixture::stress_test_indirect(uint64_t write_cnt) +{ + _reportWork = true; + uint32_t read_threads = getReadThreads(); + LOG(info, "starting stress test indirect, 1 write thread, %u read threads, %" PRIu64 " writes", read_threads, write_cnt); + auto context = std::make_shared<IndirectContext>(); + _writer.execute(makeLambdaTask([this, context, write_cnt]() { write_indirect_work(write_cnt, *context); })); +#if 1 + for (uint32_t i = 0; i < read_threads; ++i) { + _readers->execute(makeLambdaTask([this, context]() { read_indirect_work(*context); })); + } +#endif + _writer.sync(); + _readers->sync(); +} + using GenerationHandlerStressTest = Fixture; TEST_F(GenerationHandlerStressTest, stress_test_2_readers) @@ -192,6 +285,18 @@ TEST_F(GenerationHandlerStressTest, stress_test_4_readers) stressTest(smoke_test ? 10000 : 1000000); } +TEST_F(GenerationHandlerStressTest, stress_test_indirect_2_readers) +{ + set_read_threads(2); + stress_test_indirect(smoke_test ? 10000 : 1000000); +} + +TEST_F(GenerationHandlerStressTest, stress_test_indirect_4_readers) +{ + set_read_threads(4); + stress_test_indirect(smoke_test ? 10000 : 1000000); +} + int main(int argc, char **argv) { if (argc > 1 && argv[1] == smoke_test_option) { smoke_test = true; |