diff options
Diffstat (limited to 'vespalib')
71 files changed, 2753 insertions, 283 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index eb7e1f7d4c0..09cf354d027 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -85,6 +85,7 @@ vespa_define_module( src/tests/net/tls/policy_checking_certificate_verifier src/tests/net/tls/protocol_snooping src/tests/net/tls/transport_options + src/tests/nice src/tests/objects/nbostream src/tests/optimized src/tests/overload @@ -101,6 +102,7 @@ vespa_define_module( src/tests/require src/tests/runnable_pair src/tests/sha1 + src/tests/shared_operation_throttler src/tests/shared_string_repo src/tests/sharedptr src/tests/signalhandler diff --git a/vespalib/src/tests/alloc/alloc_test.cpp b/vespalib/src/tests/alloc/alloc_test.cpp index 2b6d3ee4613..da41d75b479 100644 --- a/vespalib/src/tests/alloc/alloc_test.cpp +++ b/vespalib/src/tests/alloc/alloc_test.cpp @@ -139,22 +139,28 @@ TEST("rounding of large mmaped buffer") { EXPECT_EQUAL(MemoryAllocator::HUGEPAGE_SIZE*12ul, buf.size()); } +void verifyExtension(Alloc& buf, size_t currSZ, size_t newSZ) { + bool expectSuccess = (currSZ != newSZ); + void* oldPtr = buf.get(); + EXPECT_EQUAL(currSZ, buf.size()); + EXPECT_EQUAL(expectSuccess, buf.resize_inplace(currSZ + 1)); + EXPECT_EQUAL(oldPtr, buf.get()); + EXPECT_EQUAL(newSZ, buf.size()); +} + TEST("heap alloc can not be extended") { Alloc buf = Alloc::allocHeap(100); - void * oldPtr = buf.get(); - EXPECT_EQUAL(100ul, buf.size()); - EXPECT_FALSE(buf.resize_inplace(101)); - EXPECT_EQUAL(oldPtr, buf.get()); - EXPECT_EQUAL(100ul, buf.size()); + verifyExtension(buf, 100, 100); +} + +TEST("mmap alloc cannot be extended from zero") { + Alloc buf = Alloc::allocMMap(0); + verifyExtension(buf, 0, 0); } TEST("auto alloced heap alloc can not be extended") { Alloc buf = Alloc::alloc(100); - void * oldPtr = buf.get(); - EXPECT_EQUAL(100ul, buf.size()); - EXPECT_FALSE(buf.resize_inplace(101)); - EXPECT_EQUAL(oldPtr, buf.get()); - EXPECT_EQUAL(100ul, buf.size()); + verifyExtension(buf, 100, 100); } TEST("auto alloced heap alloc can not be extended, even if resize will be mmapped") { @@ -166,15 +172,6 @@ TEST("auto alloced heap alloc can not be extended, even if resize will be mmappe EXPECT_EQUAL(100ul, buf.size()); } -void verifyExtension(Alloc & buf, size_t currSZ, size_t newSZ) { - bool expectSuccess = (currSZ != newSZ); - void * oldPtr = buf.get(); - EXPECT_EQUAL(currSZ, buf.size()); - EXPECT_EQUAL(expectSuccess, buf.resize_inplace(currSZ+1)); - EXPECT_EQUAL(oldPtr, buf.get()); - EXPECT_EQUAL(newSZ, buf.size()); -} - void ensureRoomForExtension(const Alloc & buf, Alloc & reserved) { // Normally mmapping starts at the top and grows down in address space. // Then there is no room to extend the last mapping. @@ -253,6 +250,11 @@ TEST("heap alloc can not be shrinked") { EXPECT_EQUAL(101ul, buf.size()); } +TEST("heap alloc cannot be shrunk to zero") { + Alloc buf = Alloc::allocHeap(101); + EXPECT_FALSE(buf.resize_inplace(0)); +} + TEST("mmap alloc can be shrinked") { Alloc buf = Alloc::allocMMap(4097); void * oldPtr = buf.get(); @@ -262,6 +264,11 @@ TEST("mmap alloc can be shrinked") { EXPECT_EQUAL(4_Ki, buf.size()); } +TEST("mmap alloc cannot be shrunk to zero") { + Alloc buf = Alloc::allocMMap(4097); + EXPECT_FALSE(buf.resize_inplace(0)); +} + TEST("auto alloced heap alloc can not be shrinked") { Alloc buf = Alloc::alloc(101); void * oldPtr = buf.get(); @@ -271,6 +278,11 @@ TEST("auto alloced heap alloc can not be shrinked") { EXPECT_EQUAL(101ul, buf.size()); } +TEST("auto alloced heap alloc cannot be shrunk to zero") { + Alloc buf = Alloc::alloc(101); + EXPECT_FALSE(buf.resize_inplace(0)); +} + TEST("auto alloced mmap alloc can be shrinked") { static constexpr size_t SZ = MemoryAllocator::HUGEPAGE_SIZE; Alloc buf = Alloc::alloc(SZ + 1); @@ -281,6 +293,11 @@ TEST("auto alloced mmap alloc can be shrinked") { EXPECT_EQUAL(SZ, buf.size()); } +TEST("auto alloced mmap alloc cannot be shrunk to zero") { + Alloc buf = Alloc::alloc(MemoryAllocator::HUGEPAGE_SIZE + 1); + EXPECT_FALSE(buf.resize_inplace(0)); +} + TEST("auto alloced mmap alloc can not be shrinked below HUGEPAGE_SIZE/2 + 1 ") { static constexpr size_t SZ = MemoryAllocator::HUGEPAGE_SIZE; Alloc buf = Alloc::alloc(SZ + 1); diff --git a/vespalib/src/tests/array/array_test.cpp b/vespalib/src/tests/array/array_test.cpp index 719623c9401..511c5ae11f1 100644 --- a/vespalib/src/tests/array/array_test.cpp +++ b/vespalib/src/tests/array/array_test.cpp @@ -2,7 +2,9 @@ #include <vespa/vespalib/stllike/string.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/test/memory_allocator_observer.h> #include <vespa/vespalib/util/array.hpp> +#include <vespa/vespalib/util/round_up_to_page_size.h> #include <vespa/vespalib/util/size_literals.h> #include <deque> #include <atomic> @@ -27,6 +29,11 @@ std::ostream & operator << (std::ostream & os, const Array<T> & a) } +using alloc::Alloc; +using alloc::MemoryAllocator; +using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver; +using AllocStats = MyMemoryAllocator::Stats; + class Clever { public: Clever() : _counter(&_global) { (*_counter)++; } @@ -324,31 +331,77 @@ TEST("test move assignment") struct UnreserveFixture { Array<int> arr; - UnreserveFixture() : arr(1025, 7, alloc::Alloc::allocMMap(0)) + UnreserveFixture() : arr(page_ints() + 1, 7, alloc::Alloc::allocMMap(0)) { - EXPECT_EQUAL(1025u, arr.size()); - EXPECT_EQUAL(2048u, arr.capacity()); + EXPECT_EQUAL(page_ints() + 1, arr.size()); + EXPECT_EQUAL(2 * page_ints(), arr.capacity()); + } + + static size_t page_ints() { + return round_up_to_page_size(1) / sizeof(int); } }; TEST_F("require that try_unreserve() fails if wanted capacity >= current capacity", UnreserveFixture) { - EXPECT_FALSE(f.arr.try_unreserve(2048)); + EXPECT_FALSE(f.arr.try_unreserve(2 * UnreserveFixture::page_ints())); } TEST_F("require that try_unreserve() fails if wanted capacity < current size", UnreserveFixture) { - EXPECT_FALSE(f.arr.try_unreserve(1024)); + EXPECT_FALSE(f.arr.try_unreserve(UnreserveFixture::page_ints())); } TEST_F("require that try_unreserve() succeedes if mmap can be shrinked", UnreserveFixture) { int *oldPtr = &f.arr[0]; f.arr.resize(512); - EXPECT_TRUE(f.arr.try_unreserve(1023)); - EXPECT_EQUAL(1_Ki, f.arr.capacity()); + EXPECT_TRUE(f.arr.try_unreserve(UnreserveFixture::page_ints() - 1)); + EXPECT_EQUAL(UnreserveFixture::page_ints(), f.arr.capacity()); int *newPtr = &f.arr[0]; EXPECT_EQUAL(oldPtr, newPtr); } +struct Fixture { + AllocStats stats; + std::unique_ptr<MemoryAllocator> allocator; + Alloc initial_alloc; + Array<int> arr; + + Fixture(); + ~Fixture(); +}; + +Fixture::Fixture() + : stats(), + allocator(std::make_unique<MyMemoryAllocator>(stats)), + initial_alloc(Alloc::alloc_with_allocator(allocator.get())), + arr(initial_alloc) +{ +} + +Fixture::~Fixture() = default; + +TEST_F("require that memory allocator can be set", Fixture) +{ + f.arr.resize(1); + EXPECT_EQUAL(AllocStats(1, 0), f.stats); +} + +TEST_F("require that memory allocator is preserved across reset", Fixture) +{ + f.arr.resize(1); + f.arr.reset(); + f.arr.resize(1); + EXPECT_EQUAL(AllocStats(2, 1), f.stats); +} + +TEST_F("require that created array uses same memory allocator", Fixture) +{ + auto arr2 = f.arr.create(); + EXPECT_EQUAL(AllocStats(0, 0), f.stats); + arr2.resize(1); + EXPECT_EQUAL(AllocStats(1, 0), f.stats); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp index 98a7bd780a7..83f49d6c73b 100644 --- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp +++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp @@ -2,11 +2,16 @@ #include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/benchmark_timer.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/test/thread_meets.h> #include <vespa/vespalib/testkit/test_kit.h> +#include <sys/resource.h> #include <thread> using namespace vespalib; +using namespace vespalib::test; +using vespalib::make_string_short::fmt; bool verbose = false; size_t loop_cnt = 10; @@ -16,6 +21,16 @@ using Sampler = vespalib::cpu_usage::ThreadSampler; //----------------------------------------------------------------------------- +class EndTime { +private: + steady_time _end_time; +public: + EndTime(duration test_time) : _end_time(steady_clock::now() + test_time) {} + bool operator()() const { return (steady_clock::now() >= _end_time); } +}; + +//----------------------------------------------------------------------------- + void be_busy(duration d) { if (d > 0ms) { volatile int tmp = 123; @@ -45,22 +60,30 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*> TEST_BARRIER(); // #1 auto t0 = steady_clock::now(); std::vector<duration> pre_usage = sample(samplers); + auto pre_total = cpu_usage::total_cpu_usage(); TEST_BARRIER(); // #2 TEST_BARRIER(); // #3 auto t1 = steady_clock::now(); std::vector<duration> post_usage = sample(samplers); + auto post_total = cpu_usage::total_cpu_usage(); TEST_BARRIER(); // #4 double wall = to_s(t1 - t0); - std::vector<double> load(4, 0.0); + std::vector<double> util(4, 0.0); for (size_t i = 0; i < 4; ++i) { - load[i] = to_s(post_usage[i] - pre_usage[i]) / wall; + util[i] = to_s(post_usage[i] - pre_usage[i]) / wall; } - EXPECT_GREATER(load[3], load[0]); - fprintf(stderr, "loads: { %.2f, %.2f, %.2f, %.2f }\n", load[0], load[1], load[2], load[3]); + double total_util = to_s(post_total - pre_total) / wall; + EXPECT_GREATER(util[3], util[0]); + // NB: cannot expect total_util to be greater than util[3] + // here due to mock utils being 'as expected' while valgrind + // will cut all utils in about half. + EXPECT_GREATER(total_util, util[0]); + fprintf(stderr, "utils: { %.3f, %.3f, %.3f, %.3f }\n", util[0], util[1], util[2], util[3]); + fprintf(stderr, "total util: %.3f\n", total_util); } else { int idx = (thread_id - 1); - double target_load = double(thread_id - 1) / (num_threads - 2); - auto sampler = cpu_usage::create_thread_sampler(force_mock, target_load); + double target_util = double(thread_id - 1) / (num_threads - 2); + auto sampler = cpu_usage::create_thread_sampler(force_mock, target_util); samplers[idx] = sampler.get(); TEST_BARRIER(); // #1 TEST_BARRIER(); // #2 @@ -74,7 +97,7 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*> //----------------------------------------------------------------------------- -TEST_MT_F("require that dummy thread-based CPU usage sampling with known expected load works", 5, std::vector<Sampler*>(4, nullptr)) { +TEST_MT_F("require that dummy thread-based CPU usage sampling with known expected util works", 5, std::vector<Sampler*>(4, nullptr)) { TEST_DO(verify_sampling(thread_id, num_threads, f1, true)); } @@ -89,6 +112,353 @@ TEST("measure thread CPU clock overhead") { fprintf(stderr, "approx overhead per sample (thread CPU clock): %f us\n", min_time_us); } +TEST("measure total cpu usage overhead") { + duration d; + double min_time_us = BenchmarkTimer::benchmark([&d]() noexcept { d = cpu_usage::total_cpu_usage(); }, budget) * 1000000.0; + fprintf(stderr, "approx overhead per RUsage sample: %f us\n", min_time_us); +} + +//----------------------------------------------------------------------------- + +void verify_category(CpuUsage::Category cat, size_t idx, const vespalib::string &name) { + switch (cat) { // make sure we known all categories + case CpuUsage::Category::SETUP: + case CpuUsage::Category::READ: + case CpuUsage::Category::WRITE: + case CpuUsage::Category::COMPACT: + case CpuUsage::Category::OTHER: + EXPECT_EQUAL(CpuUsage::index_of(cat), idx); + EXPECT_EQUAL(CpuUsage::name_of(cat), name); + } +} + +TEST("require that CPU categories are as expected") { + TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u, "setup")); + TEST_DO(verify_category(CpuUsage::Category::READ, 1u, "read")); + TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u, "write")); + TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u, "compact")); + TEST_DO(verify_category(CpuUsage::Category::OTHER, 4u, "other")); + EXPECT_EQUAL(CpuUsage::num_categories, 5u); +} + +TEST("require that empty sample is zero") { + CpuUsage::Sample sample; + EXPECT_EQUAL(sample.size(), CpuUsage::num_categories); + for (uint32_t i = 0; i < sample.size(); ++i) { + EXPECT_EQUAL(sample[i].count(), 0); + } +} + +TEST("require that cpu samples can be manipulated and inspected") { + CpuUsage::Sample a; + CpuUsage::Sample b; + const CpuUsage::Sample &c = a; + a[CpuUsage::Category::SETUP] = 1ms; + a[CpuUsage::Category::READ] = 2ms; + a[CpuUsage::Category::WRITE] = 3ms; + a[CpuUsage::Category::COMPACT] = 4ms; + a[CpuUsage::Category::OTHER] = 5ms; + for (uint32_t i = 0; i < b.size(); ++i) { + b[i] = 10ms * (i + 1); + } + a.merge(b); + for (uint32_t i = 0; i < c.size(); ++i) { + EXPECT_EQUAL(c[i], 11ms * (i + 1)); + } + EXPECT_EQUAL(c[CpuUsage::Category::SETUP], 11ms); + EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms); + EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms); + EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms); + EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 55ms); +} + +//----------------------------------------------------------------------------- + +struct CpuUsage::Test { + struct BlockingTracker : ThreadTracker { + std::atomic<size_t> called; + ThreadMeets::Nop sync_entry; + ThreadMeets::Swap<Sample> swap_sample; + BlockingTracker() + : called(0), sync_entry(2), swap_sample() {} + Sample sample() noexcept override { + if (called++) { + return Sample(); + } + sync_entry(); + return swap_sample(Sample()); + } + }; + struct SimpleTracker : ThreadTracker { + Sample my_sample; + std::atomic<size_t> called; + SimpleTracker(Sample sample) noexcept + : my_sample(sample), called(0) {} + Sample sample() noexcept override { + ++called; + return my_sample; + } + }; + struct Fixture { + CpuUsage my_usage; + std::shared_ptr<BlockingTracker> blocking; + std::vector<std::shared_ptr<SimpleTracker>> simple_list; + Fixture() : my_usage() {} + void add_blocking() { + ASSERT_TRUE(!blocking); + blocking = std::make_unique<BlockingTracker>(); + my_usage.add_thread(blocking); + } + void add_simple(Sample sample) { + auto simple = std::make_shared<SimpleTracker>(sample); + simple_list.push_back(simple); + my_usage.add_thread(simple); + } + void add_remove_simple(Sample sample) { + auto simple = std::make_shared<SimpleTracker>(sample); + my_usage.add_thread(simple); + my_usage.remove_thread(simple); + } + size_t count_threads() { + Guard guard(my_usage._lock); + return my_usage._threads.size(); + } + bool is_sampling() { + Guard guard(my_usage._lock); + return my_usage._sampling; + } + size_t count_conflicts() { + Guard guard(my_usage._lock); + if (!my_usage._conflict) { + return 0; + } + return my_usage._conflict->waiters; + } + size_t count_simple_samples() { + size_t result = 0; + for (const auto &simple: simple_list) { + result += simple->called; + } + return result; + } + TimedSample sample() { return my_usage.sample_or_wait(); } + ~Fixture() { + if (blocking) { + my_usage.remove_thread(std::move(blocking)); + } + for (auto &simple: simple_list) { + my_usage.remove_thread(std::move(simple)); + } + ASSERT_EQUAL(count_threads(), 0u); + } + }; + struct TrackerImpl { + ThreadTrackerImpl impl; + TrackerImpl(cpu_usage::ThreadSampler::UP sampler) + : impl(std::move(sampler)) {} + CpuUsage::Sample sample() { return impl.sample(); } + CpuUsage::Category set_category(CpuUsage::Category cat) { return impl.set_category(cat); } + }; +}; + +TEST_F("require that CpuUsage sample calls sample on thread trackers", CpuUsage::Test::Fixture()) { + CpuUsage::Sample sample; + sample[CpuUsage::Category::READ] = 10ms; + f1.add_simple(sample); + f1.add_simple(sample); + f1.add_simple(sample); + EXPECT_EQUAL(f1.count_threads(), 3u); + auto result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); + EXPECT_EQUAL(f1.count_simple_samples(), 3u); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(60ms)); + EXPECT_EQUAL(f1.count_simple_samples(), 6u); +} + +TEST_F("require that threads added and removed between CpuUsage sample calls are tracked", CpuUsage::Test::Fixture()) { + CpuUsage::Sample sample; + sample[CpuUsage::Category::READ] = 10ms; + auto result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(0ms)); + f1.add_remove_simple(sample); + f1.add_remove_simple(sample); + f1.add_remove_simple(sample); + EXPECT_EQUAL(f1.count_threads(), 0u); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); + result = f1.sample(); + EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms)); +} + +TEST_MT_FF("require that sample conflicts are resolved correctly", 5, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(num_threads - 1)) { + if (thread_id == 0) { + CpuUsage::Sample s1; + s1[CpuUsage::Category::SETUP] = 10ms; + CpuUsage::Sample s2; + s2[CpuUsage::Category::READ] = 20ms; + CpuUsage::Sample s3; + s3[CpuUsage::Category::WRITE] = 30ms; + CpuUsage::Sample s4; + s4[CpuUsage::Category::COMPACT] = 40ms; + f1.add_blocking(); + f1.add_simple(s1); // should be sampled + EXPECT_TRUE(!f1.is_sampling()); + EXPECT_EQUAL(f1.count_conflicts(), 0u); + TEST_BARRIER(); // #1 + f1.blocking->sync_entry(); + EXPECT_TRUE(f1.is_sampling()); + while (f1.count_conflicts() < (num_threads - 2)) { + // wait for appropriate number of conflicts + std::this_thread::sleep_for(1ms); + } + f1.add_simple(s2); // should NOT be sampled (pending add) + f1.add_remove_simple(s3); // should be sampled (pending remove); + EXPECT_EQUAL(f1.count_threads(), 2u); + EXPECT_TRUE(f1.is_sampling()); + EXPECT_EQUAL(f1.count_conflicts(), (num_threads - 2)); + f1.blocking->swap_sample(s4); + TEST_BARRIER(); // #2 + EXPECT_TRUE(!f1.is_sampling()); + EXPECT_EQUAL(f1.count_conflicts(), 0u); + EXPECT_EQUAL(f1.count_threads(), 3u); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::SETUP], duration(10ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::READ], duration(0ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::WRITE], duration(30ms)); + EXPECT_EQUAL(f2[0].second[CpuUsage::Category::COMPACT], duration(40ms)); + for (size_t i = 1; i < (num_threads - 1); ++i) { + EXPECT_EQUAL(f2[i].first, f2[0].first); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::SETUP], f2[0].second[CpuUsage::Category::SETUP]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::READ], f2[0].second[CpuUsage::Category::READ]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::WRITE], f2[0].second[CpuUsage::Category::WRITE]); + EXPECT_EQUAL(f2[i].second[CpuUsage::Category::COMPACT], f2[0].second[CpuUsage::Category::COMPACT]); + } + } else { + TEST_BARRIER(); // #1 + f2[thread_id - 1] = f1.sample(); + TEST_BARRIER(); // #2 + } +} + +//----------------------------------------------------------------------------- + +struct DummySampler : public cpu_usage::ThreadSampler { + duration &ref; + DummySampler(duration &ref_in) : ref(ref_in) {} + duration sample() const noexcept override { return ref; } +}; + +TEST("require that thread tracker implementation can track cpu use") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + t += 10ms; + tracker.set_category(CpuUsage::Category::SETUP); + t += 15ms; + tracker.set_category(CpuUsage::Category::READ); + t += 10ms; + auto sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(15ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(10ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(0ms)); + t += 15ms; + tracker.set_category(CpuUsage::Category::WRITE); + t += 10ms; + sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(0ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(15ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(10ms)); +} + +TEST("require that thread tracker implementation reports previous CPU category") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::OTHER), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::SETUP))); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::SETUP), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ))); + EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::READ), + CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ))); +} + +TEST("require that thread tracker implementation does not track OTHER cpu use") { + duration t = duration::zero(); + CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t)); + t += 10ms; + tracker.set_category(CpuUsage::Category::OTHER); + t += 15ms; + tracker.set_category(CpuUsage::Category::READ); + tracker.set_category(CpuUsage::Category::OTHER); + t += 15ms; + auto sample = tracker.sample(); + EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(0ms)); + EXPECT_EQUAL(sample[CpuUsage::Category::OTHER], duration(0ms)); +} + +//----------------------------------------------------------------------------- + +void do_sample_cpu_usage(const EndTime &end_time) { + auto my_usage = CpuUsage::use(CpuUsage::Category::SETUP); + CpuUtil cpu(8ms); + while (!end_time()) { + std::this_thread::sleep_for(verbose ? 1s : 10ms); + auto util = cpu.get_util(); + vespalib::string body; + for (size_t i = 0; i < util.size(); ++i) { + if (!body.empty()) { + body.append(", "); + } + body.append(fmt("%s: %.3f", CpuUsage::name_of(CpuUsage::Category(i)).c_str(), util[i])); + } + fprintf(stderr, "CPU: %s\n", body.c_str()); + } +} + +void do_full_work(CpuUsage::Category cat, const EndTime &end_time) { + auto my_usage = CpuUsage::use(cat); + while (!end_time()) { + be_busy(4ms); + } +} + +void do_some_work(CpuUsage::Category cat, const EndTime &end_time) { + auto my_usage = CpuUsage::use(cat); + while (!end_time()) { + be_busy(4ms); + std::this_thread::sleep_for(4ms); + } +} + +void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) { + auto my_usage1 = CpuUsage::use(cat1); + while (!end_time()) { + be_busy(4ms); + auto my_usage2 = CpuUsage::use(cat2); + be_busy(4ms); + } +} + +void do_external_work(CpuUsage::Category cat, const EndTime &end_time) { + auto my_usage1 = CpuUsage::use(CpuUsage::Category::SETUP); + while (!end_time()) { + std::thread thread([cat](){ + auto my_usage2 = CpuUsage::use(cat); + be_busy(4ms); + }); + thread.join(); + } +} + +TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 100ms)) { + switch (thread_id) { + case 0: return do_sample_cpu_usage(f1); + case 1: return do_full_work(CpuUsage::Category::WRITE, f1); + case 2: return do_some_work(CpuUsage::Category::READ, f1); + case 3: return do_nested_work(CpuUsage::Category::OTHER, CpuUsage::Category::READ, f1); + case 4: return do_external_work(CpuUsage::Category::COMPACT, f1); + default: TEST_FATAL("missing thread id case"); + } +} + //----------------------------------------------------------------------------- int main(int argc, char **argv) { diff --git a/vespalib/src/tests/datastore/array_store/array_store_test.cpp b/vespalib/src/tests/datastore/array_store/array_store_test.cpp index c58e357a9a1..2672ed70f6d 100644 --- a/vespalib/src/tests/datastore/array_store/array_store_test.cpp +++ b/vespalib/src/tests/datastore/array_store/array_store_test.cpp @@ -7,6 +7,7 @@ #include <vespa/vespalib/datastore/compaction_strategy.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/test/memory_allocator_observer.h> #include <vespa/vespalib/test/insertion_operators.h> #include <vespa/vespalib/util/memory_allocator.h> #include <vespa/vespalib/util/size_literals.h> @@ -19,6 +20,9 @@ using vespalib::ArrayRef; using generation_t = vespalib::GenerationHandler::generation_t; using MemStats = vespalib::datastore::test::MemStats; using BufferStats = vespalib::datastore::test::BufferStats; +using vespalib::alloc::MemoryAllocator; +using vespalib::alloc::test::MemoryAllocatorObserver; +using AllocStats = MemoryAllocatorObserver::Stats; namespace { @@ -40,18 +44,20 @@ struct Fixture using value_type = EntryT; using ReferenceStore = vespalib::hash_map<EntryRef, EntryVector>; + AllocStats stats; ArrayStoreType store; ReferenceStore refStore; generation_t generation; Fixture(uint32_t maxSmallArraySize, bool enable_free_lists = true) : store(ArrayStoreConfig(maxSmallArraySize, ArrayStoreConfig::AllocSpec(16, RefT::offsetSize(), 8_Ki, - ALLOC_GROW_FACTOR)).enable_free_lists(enable_free_lists)), + ALLOC_GROW_FACTOR)).enable_free_lists(enable_free_lists), + std::make_unique<MemoryAllocatorObserver>(stats)), refStore(), generation(1) {} Fixture(const ArrayStoreConfig &storeCfg) - : store(storeCfg), + : store(storeCfg, std::make_unique<MemoryAllocatorObserver>(stats)), refStore(), generation(1) {} @@ -162,10 +168,10 @@ TEST("require that we test with trivial and non-trivial types") TEST_F("control static sizes", NumberFixture(3)) { #ifdef _LIBCPP_VERSION - EXPECT_EQUAL(424u, sizeof(f.store)); + EXPECT_EQUAL(440u, sizeof(f.store)); EXPECT_EQUAL(296u, sizeof(NumberFixture::ArrayStoreType::DataStoreType)); #else - EXPECT_EQUAL(456u, sizeof(f.store)); + EXPECT_EQUAL(472u, sizeof(f.store)); EXPECT_EQUAL(328u, sizeof(NumberFixture::ArrayStoreType::DataStoreType)); #endif EXPECT_EQUAL(96u, sizeof(NumberFixture::ArrayStoreType::SmallArrayType)); @@ -447,4 +453,9 @@ TEST_F("require that offset in EntryRefT is within bounds when allocating memory f.assertStoreContent(); } +TEST_F("require that provided memory allocator is used", NumberFixture(3)) +{ + EXPECT_EQUAL(AllocStats(4, 0), f.stats); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/datastore/datastore/.gitignore b/vespalib/src/tests/datastore/datastore/.gitignore new file mode 100644 index 00000000000..d033739fd78 --- /dev/null +++ b/vespalib/src/tests/datastore/datastore/.gitignore @@ -0,0 +1 @@ +/core.* diff --git a/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp b/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp index 417b92af9ac..599cb209e6c 100644 --- a/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp +++ b/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp @@ -68,7 +68,7 @@ struct DataStoreFixedSizeHashTest : public ::testing::Test DataStoreFixedSizeHashTest::DataStoreFixedSizeHashTest() : _generation_handler(), _generation_holder(), - _allocator(), + _allocator({}), _store(_allocator.get_data_store()), _comp(std::make_unique<MyCompare>(_store)), _hash_map(), diff --git a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp index 796e19a97d1..13f9ae251b6 100644 --- a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp +++ b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp @@ -132,7 +132,7 @@ struct DataStoreShardedHashTest : public ::testing::Test DataStoreShardedHashTest::DataStoreShardedHashTest() : _generationHandler(), - _allocator(), + _allocator({}), _store(_allocator.get_data_store()), _hash_map(std::make_unique<MyCompare>(_store)), _writer(1, 128_Ki), diff --git a/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp b/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp index 917c91f2dff..7f279689985 100644 --- a/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp +++ b/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp @@ -9,6 +9,7 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/test/datastore/buffer_stats.h> #include <vespa/vespalib/test/insertion_operators.h> +#include <vespa/vespalib/test/memory_allocator_observer.h> #include <vespa/vespalib/util/traits.h> #include <vector> @@ -21,6 +22,9 @@ using namespace vespalib::datastore; using vespalib::ArrayRef; using generation_t = vespalib::GenerationHandler::generation_t; using vespalib::datastore::test::BufferStats; +using vespalib::alloc::MemoryAllocator; +using vespalib::alloc::test::MemoryAllocatorObserver; +using AllocStats = MemoryAllocatorObserver::Stats; template <typename UniqueStoreT> struct TestBaseValues { @@ -39,6 +43,7 @@ struct TestBase : public ::testing::Test { using ReferenceStoreValueType = std::conditional_t<std::is_same_v<ValueType, const char *>, std::string, ValueType>; using ReferenceStore = std::map<EntryRef, std::pair<ReferenceStoreValueType,uint32_t>>; + AllocStats stats; UniqueStoreType store; ReferenceStore refStore; generation_t generation; @@ -148,7 +153,8 @@ struct TestBase : public ::testing::Test { template <typename UniqueStoreTypeAndDictionaryType> TestBase<UniqueStoreTypeAndDictionaryType>::TestBase() - : store(), + : stats(), + store(std::make_unique<MemoryAllocatorObserver>(stats)), refStore(), generation(1) { @@ -424,6 +430,15 @@ TYPED_TEST(TestBase, store_can_be_enumerated) EXPECT_EQ(2u, enumValue2); } +TYPED_TEST(TestBase, provided_memory_allocator_is_used) +{ + if constexpr (std::is_same_v<const char *, typename TestFixture::ValueType>) { + EXPECT_EQ(AllocStats(18, 0), this->stats); + } else { + EXPECT_EQ(AllocStats(1, 0), this->stats); + } +} + #pragma GCC diagnostic pop TEST_F(DoubleTest, nan_is_handled) diff --git a/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp b/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp index 158b85f6bf5..21330c22166 100644 --- a/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp +++ b/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/test/datastore/buffer_stats.h> #include <vespa/vespalib/test/insertion_operators.h> +#include <vespa/vespalib/test/memory_allocator_observer.h> #include <vespa/vespalib/util/traits.h> #include <vector> @@ -11,6 +12,10 @@ using namespace vespalib::datastore; using vespalib::MemoryUsage; using generation_t = vespalib::GenerationHandler::generation_t; using BufferStats = vespalib::datastore::test::BufferStats; +using vespalib::alloc::MemoryAllocator; +using vespalib::alloc::test::MemoryAllocatorObserver; +using AllocStats = MemoryAllocatorObserver::Stats; + namespace { @@ -24,10 +29,12 @@ template <typename RefT = EntryRefT<22>> struct TestBase : public ::testing::Test { using EntryRefType = RefT; + AllocStats stats; UniqueStoreStringAllocator<EntryRefType> allocator; generation_t generation; TestBase() - : allocator(), + : stats(), + allocator(std::make_unique<MemoryAllocatorObserver>(stats)), generation(1) {} void assert_add(const char *input) { @@ -170,6 +177,11 @@ TEST_F(StringTest, free_list_is_never_used_for_move) assert_buffer_state(ref2, BufferStats().used(4).hold(0).dead(2).extra_used(2002)); } +TEST_F(StringTest, provided_memory_allocator_is_used) +{ + EXPECT_EQ(AllocStats(18, 0), stats); +} + TEST_F(SmallOffsetStringTest, new_underlying_buffer_is_allocated_when_current_is_full) { uint32_t first_buffer_id = get_buffer_id(add(small.c_str())); @@ -184,6 +196,7 @@ TEST_F(SmallOffsetStringTest, new_underlying_buffer_is_allocated_when_current_is uint32_t buffer_id = get_buffer_id(add(small.c_str())); EXPECT_EQ(second_buffer_id, buffer_id); } + EXPECT_LT(18, stats.alloc_cnt); } GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/tests/nice/CMakeLists.txt b/vespalib/src/tests/nice/CMakeLists.txt new file mode 100644 index 00000000000..a97ec52c22c --- /dev/null +++ b/vespalib/src/tests/nice/CMakeLists.txt @@ -0,0 +1,10 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_nice_test_app TEST + SOURCES + nice_test.cpp + DEPENDS + vespalib +) +if(NOT CMAKE_HOST_SYSTEM_NAME STREQUAL "Darwin") + vespa_add_test(NAME vespalib_nice_test_app COMMAND vespalib_nice_test_app) +endif() diff --git a/vespalib/src/tests/nice/nice_test.cpp b/vespalib/src/tests/nice/nice_test.cpp new file mode 100644 index 00000000000..ec1d25be3bd --- /dev/null +++ b/vespalib/src/tests/nice/nice_test.cpp @@ -0,0 +1,98 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/util/nice.h> +#include <vespa/vespalib/test/thread_meets.h> +#include <vespa/vespalib/testkit/test_kit.h> +#include <unistd.h> +#include <functional> +#include <thread> + +using vespalib::Runnable; +using vespalib::be_nice; +using vespalib::test::ThreadMeets; + +double how_nice(int now, int target) { + int max = 19; + int wanted_param = (target - now); + int num_zones = ((max + 1) - now); + // make sure we are in the middle of the wanted nice zone + double result = (0.5 + wanted_param) / num_zones; + fprintf(stderr, " ... using how_nice=%g to get from %d to %d in nice value\n", result, now, target); + return result; +} + +struct RunFun : Runnable { + std::function<void()> my_fun; + RunFun(std::function<void()> fun_in) : my_fun(fun_in) {} + void run() override { my_fun(); } +}; + +int my_init_fun(Runnable &target) { + target.run(); + return 1; +} + +std::thread run_with_init(std::function<void()> my_fun, Runnable::init_fun_t init_fun = my_init_fun) { + return std::thread([init_fun, my_fun] + { + RunFun run_fun(my_fun); + init_fun(run_fun); + }); +} + +TEST("require that initial nice value is 0") { + EXPECT_EQUAL(nice(0), 0); +} + +TEST("require that nice value is tracked per thread") { + ThreadMeets::Nop barrier(5); + std::vector<std::thread> threads; + for (int i = 0; i < 5; ++i) { + threads.push_back(run_with_init([my_barrier = &barrier, i] + { + nice(i); + (*my_barrier)(); + EXPECT_EQUAL(nice(0), i); + })); + } + for (auto &thread: threads) { + thread.join(); + } +} + +void verify_max_nice_value() { + int now = nice(0); + now = nice(19 - now); + EXPECT_EQUAL(now, 19); + now = nice(1); + EXPECT_EQUAL(now, 19); +} + +TEST("require that max nice value is 19") { + auto thread = run_with_init([]{ verify_max_nice_value(); }); + thread.join(); +} + +TEST("require that nice value can be set with init function") { + for (int i = 0; i <= 19; ++i) { + auto thread = run_with_init([i]() + { + EXPECT_EQUAL(nice(0), i); + }, be_nice(my_init_fun, how_nice(0, i))); + thread.join(); + } +} + +TEST("require that niceness can be nested and will act on a limited nice value range") { + auto thread1 = run_with_init([]{ EXPECT_EQUAL(nice(0), 7); }, + be_nice(be_nice(my_init_fun, how_nice(3, 7)), how_nice(0, 3))); + auto thread2 = run_with_init([]{ EXPECT_EQUAL(nice(0), 15); }, + be_nice(be_nice(my_init_fun, how_nice(10, 15)), how_nice(0, 10))); + auto thread3 = run_with_init([]{ EXPECT_EQUAL(nice(0), 19); }, + be_nice(be_nice(my_init_fun, how_nice(10, 19)), how_nice(0, 10))); + thread1.join(); + thread2.join(); + thread3.join(); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt new file mode 100644 index 00000000000..6e977cdb59f --- /dev/null +++ b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_shared_operation_throttler_test_app TEST + SOURCES + shared_operation_throttler_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_shared_operation_throttler_test_app COMMAND vespalib_shared_operation_throttler_test_app) diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp new file mode 100644 index 00000000000..d9b6ae7f908 --- /dev/null +++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp @@ -0,0 +1,212 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/util/shared_operation_throttler.h> +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/barrier.h> +#include <thread> + +namespace vespalib { + +using ThrottleToken = SharedOperationThrottler::Token; + +struct DynamicThrottleFixture { + std::unique_ptr<SharedOperationThrottler> _throttler; + + DynamicThrottleFixture() { + SharedOperationThrottler::DynamicThrottleParams params; + params.window_size_increment = 1; + params.min_window_size = 1; + _throttler = SharedOperationThrottler::make_dynamic_throttler(params); + } +}; + +TEST("unlimited throttler does not throttle") { + // We technically can't test that the unlimited throttler _never_ throttles, but at + // least check that it doesn't throttle _twice_, and then induce from this ;) + auto throttler = SharedOperationThrottler::make_unlimited_throttler(); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->blocking_acquire_one(); + EXPECT_TRUE(token2.valid()); + // Window size should be zero (i.e. unlimited) for unlimited throttler + EXPECT_EQUAL(throttler->current_window_size(), 0u); +} + +TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) { + auto token1 = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = f1._throttler->try_acquire_one(); + EXPECT_FALSE(token2.valid()); + + EXPECT_EQUAL(f1._throttler->current_window_size(), 1u); +} + +TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) { + auto token = f1._throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + token.reset(); + token = f1._throttler->blocking_acquire_one(600s); // Should never block. + EXPECT_TRUE(token.valid()); +} + +TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixture()) { + vespalib::Barrier barrier(2); + std::thread t([&] { + auto token = f1._throttler->try_acquire_one(); + assert(token.valid()); + barrier.await(); + while (f1._throttler->waiting_threads() != 1) { + std::this_thread::sleep_for(100us); + } + // Implicit token release at thread scope exit + }); + barrier.await(); + auto token = f1._throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + t.join(); +} + +TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) { + auto window_filling_token = f1._throttler->try_acquire_one(); + auto before = std::chrono::steady_clock::now(); + // Will block for at least 1ms. Since no window slot will be available by that time, + // an invalid token should be returned. + auto token = f1._throttler->blocking_acquire_one(1ms); + auto after = std::chrono::steady_clock::now(); + EXPECT_TRUE((after - before) >= 1ms); + EXPECT_FALSE(token.valid()); +} + +TEST("default constructed token is invalid") { + ThrottleToken token; + EXPECT_FALSE(token.valid()); + token.reset(); // no-op + EXPECT_FALSE(token.valid()); +} + +TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture()) { + { + auto token = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); + } + auto token = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); +} + +TEST_F("token can be moved and reset", DynamicThrottleFixture()) { + auto token1 = f1._throttler->try_acquire_one(); + auto token2 = std::move(token1); // move ctor + EXPECT_TRUE(token2.valid()); + EXPECT_FALSE(token1.valid()); + ThrottleToken token3; + token3 = std::move(token2); // move assignment op + EXPECT_TRUE(token3.valid()); + EXPECT_FALSE(token2.valid()); + + // Trying to fetch new token should not succeed due to active token and win size of 1 + token1 = f1._throttler->try_acquire_one(); + EXPECT_FALSE(token1.valid()); + // Resetting the token should free up the slot in the window + token3.reset(); + token1 = f1._throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); +} + +// Note on test semantics: these tests are adapted from a subset of the MessageBus +// throttling tests. Some tests have been simplified due to no longer having access +// to the low-level DynamicThrottlePolicy API. + +struct WindowFixture { + uint64_t _milli_time; + std::unique_ptr<SharedOperationThrottler> _throttler; + + WindowFixture(uint32_t window_size_increment = 5, + uint32_t min_window_size = 20, + uint32_t max_window_size = INT_MAX) + : _milli_time(0), + _throttler() + { + SharedOperationThrottler::DynamicThrottleParams params; + params.resize_rate = 1; + params.window_size_increment = window_size_increment; + params.min_window_size = min_window_size; + params.max_window_size = max_window_size; + params.window_size_decrement_factor = 2; + params.window_size_backoff = 0.9; + _throttler = SharedOperationThrottler::make_dynamic_throttler(params, [&]() noexcept { + return steady_time(std::chrono::milliseconds(_milli_time)); + }); + } + + std::vector<SharedOperationThrottler::Token> fill_entire_throttle_window() { + std::vector<SharedOperationThrottler::Token> tokens; + while (true) { + auto token = _throttler->try_acquire_one(); + if (!token.valid()) { + break; + } + tokens.emplace_back(std::move(token)); + } + return tokens; + } + + uint32_t attempt_converge_on_stable_window_size(uint32_t max_pending) { + for (uint32_t i = 0; i < 999; ++i) { + auto tokens = fill_entire_throttle_window(); + uint32_t num_pending = static_cast<uint32_t>(tokens.size()); + + uint64_t trip_time = (num_pending < max_pending) ? 1000 : 1000 + (num_pending - max_pending) * 1000; + _milli_time += trip_time; + // Throttle window slots implicitly freed up as tokens are destructed. + } + uint32_t ret = _throttler->current_window_size(); + fprintf(stderr, "attempt_converge_on_stable_window_size() = %u\n", ret); + return ret; + } +}; + +TEST_F("window size changes dynamically based on throughput", WindowFixture()) { + uint32_t window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 90 && window_size <= 105); + + window_size = f1.attempt_converge_on_stable_window_size(200); + ASSERT_TRUE(window_size >= 180 && window_size <= 205); + + window_size = f1.attempt_converge_on_stable_window_size(50); + ASSERT_TRUE(window_size >= 45 && window_size <= 55); + + window_size = f1.attempt_converge_on_stable_window_size(500); + ASSERT_TRUE(window_size >= 450 && window_size <= 505); + + window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 90 && window_size <= 115); +} + +TEST_F("window size is reset after idle time period", WindowFixture(5, 1)) { + double window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 90 && window_size <= 110); + + f1._milli_time += 30001; // Not yet past 60s idle time + auto tokens = f1.fill_entire_throttle_window(); + ASSERT_TRUE(tokens.size() >= 90 && tokens.size() <= 110); + tokens.clear(); + + f1._milli_time += 60001; // Idle time passed + tokens = f1.fill_entire_throttle_window(); + EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size +} + +TEST_F("minimum window size is respected", WindowFixture(5, 150, INT_MAX)) { + double window_size = f1.attempt_converge_on_stable_window_size(200); + ASSERT_TRUE(window_size >= 150 && window_size <= 210); +} + +TEST_F("maximum window size is respected", WindowFixture(5, 1, 50)) { + double window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 40 && window_size <= 50); +} + +} + +TEST_MAIN() { + TEST_RUN_ALL(); +} diff --git a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp index 81c8271f755..5b267c3b9e9 100644 --- a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp +++ b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp @@ -101,8 +101,8 @@ std::unique_ptr<Handles> copy_strong_handles(const Handles &handles) { return result; } -std::unique_ptr<std::vector<string_id>> make_weak_handles(const Handles &handles) { - return std::make_unique<std::vector<string_id>>(handles.view()); +std::unique_ptr<StringIdVector> make_weak_handles(const Handles &handles) { + return std::make_unique<StringIdVector>(handles.view()); } //----------------------------------------------------------------------------- @@ -202,7 +202,7 @@ struct Fixture { std::vector<vespalib::string> get_direct_result; std::unique_ptr<Handles> strong; std::unique_ptr<Handles> strong_copy; - std::unique_ptr<std::vector<string_id>> weak; + std::unique_ptr<StringIdVector> weak; auto copy_strings_task = [&](){ copy_strings_result = copy_strings(work); }; auto copy_and_hash_task = [&](){ copy_and_hash_result = copy_and_hash(work); }; auto local_enum_task = [&](){ local_enum_result = local_enum(work); }; diff --git a/vespalib/src/tests/spin_lock/spin_lock_test.cpp b/vespalib/src/tests/spin_lock/spin_lock_test.cpp index 54ad354f584..78e35a3e8d1 100644 --- a/vespalib/src/tests/spin_lock/spin_lock_test.cpp +++ b/vespalib/src/tests/spin_lock/spin_lock_test.cpp @@ -1,12 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/util/spin_lock.h> +#include <vespa/vespalib/util/atomic.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <vespa/vespalib/util/time.h> #include <vespa/vespalib/testkit/test_kit.h> #include <array> using namespace vespalib; +using namespace vespalib::atomic; bool verbose = false; double budget = 0.25; @@ -25,15 +27,15 @@ struct MyState { void update() { std::array<size_t,SZ> tmp; for (size_t i = 0; i < SZ; ++i) { - tmp[i] = state[i]; + store_ref_relaxed(tmp[i], load_ref_relaxed(state[i])); } for (size_t i = 0; i < SZ; ++i) { - state[i] = tmp[i] + 1; + store_ref_relaxed(state[i], load_ref_relaxed(tmp[i]) + 1); } } bool check(size_t expect) const { - for (size_t value: state) { - if (value != expect) { + for (const auto& value: state) { + if (load_ref_relaxed(value) != expect) { return false; } } diff --git a/vespalib/src/tests/stllike/asciistream_test.cpp b/vespalib/src/tests/stllike/asciistream_test.cpp index be0bc1cb694..3042595e18c 100644 --- a/vespalib/src/tests/stllike/asciistream_test.cpp +++ b/vespalib/src/tests/stllike/asciistream_test.cpp @@ -409,27 +409,14 @@ AsciistreamTest::testWriteThenRead() void AsciistreamTest::testGetLine() { - asciistream is(""); - EXPECT_TRUE(is.getlines().empty()); - is = asciistream("line 1"); - std::vector<string> v = is.getlines(); - EXPECT_EQUAL(1u, v.size()); - EXPECT_EQUAL("line 1", v[0]); - is = asciistream("line 1\nline 2"); - v = is.getlines(); - EXPECT_EQUAL(2u, v.size()); - EXPECT_EQUAL("line 1", v[0]); - EXPECT_EQUAL("line 2", v[1]); - is = asciistream("line 1\nline 2\n\n"); - v = is.getlines(); - EXPECT_EQUAL(3u, v.size()); - EXPECT_EQUAL("line 1", v[0]); - EXPECT_EQUAL("line 2", v[1]); - EXPECT_EQUAL("", v[2]); - is = asciistream("line 1"); + asciistream is = asciistream("line 1\nline 2\nline 3"); string s; getline(is, s); EXPECT_EQUAL("line 1", s); + getline(is, s); + EXPECT_EQUAL("line 2", s); + getline(is, s); + EXPECT_EQUAL("line 3", s); } #define VERIFY_DOUBLE_SERIALIZATION(value, expected, format, precision) { \ diff --git a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp index cf84ab03a25..14802a60ff1 100644 --- a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp +++ b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp @@ -1,11 +1,17 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/test/memory_allocator_observer.h> #include <vespa/vespalib/util/rcuvector.h> #include <vespa/vespalib/util/size_literals.h> using namespace vespalib; +using vespalib::alloc::Alloc; +using vespalib::alloc::MemoryAllocator; +using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver; +using AllocStats = MyMemoryAllocator::Stats; + bool assertUsage(const MemoryUsage & exp, const MemoryUsage & act) { @@ -288,4 +294,70 @@ TEST("test small expand") g.trimHoldLists(2); } +struct Fixture { + using generation_t = GenerationHandler::generation_t; + + AllocStats stats; + std::unique_ptr<MemoryAllocator> allocator; + Alloc initial_alloc; + GenerationHolder g; + RcuVectorBase<int> arr; + + Fixture(); + ~Fixture(); + void transfer_and_trim(generation_t transfer_gen, generation_t trim_gen) + { + g.transferHoldLists(transfer_gen); + g.trimHoldLists(trim_gen); + } +}; + +Fixture::Fixture() + : stats(), + allocator(std::make_unique<MyMemoryAllocator>(stats)), + initial_alloc(Alloc::alloc_with_allocator(allocator.get())), + g(), + arr(g, initial_alloc) +{ + arr.reserve(100); +} + +Fixture::~Fixture() = default; + +TEST_F("require that memory allocator can be set", Fixture) +{ + EXPECT_EQUAL(AllocStats(2, 0), f.stats); + f.transfer_and_trim(1, 2); + EXPECT_EQUAL(AllocStats(2, 1), f.stats); +} + +TEST_F("require that memory allocator is preserved across reset", Fixture) +{ + f.arr.reset(); + f.arr.reserve(100); + EXPECT_EQUAL(AllocStats(4, 1), f.stats); + f.transfer_and_trim(1, 2); + EXPECT_EQUAL(AllocStats(4, 3), f.stats); +} + +TEST_F("require that created replacement vector uses same memory allocator", Fixture) +{ + auto arr2 = f.arr.create_replacement_vector(); + EXPECT_EQUAL(AllocStats(2, 0), f.stats); + arr2.reserve(100); + EXPECT_EQUAL(AllocStats(3, 0), f.stats); + f.transfer_and_trim(1, 2); + EXPECT_EQUAL(AllocStats(3, 1), f.stats); +} + +TEST_F("require that ensure_size and shrink use same memory allocator", Fixture) +{ + f.arr.ensure_size(2000); + EXPECT_EQUAL(AllocStats(3, 0), f.stats); + f.arr.shrink(1000); + EXPECT_EQUAL(AllocStats(4, 0), f.stats); + f.transfer_and_trim(1, 2); + EXPECT_EQUAL(AllocStats(4, 3), f.stats); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/wakeup/wakeup_bench.cpp b/vespalib/src/tests/wakeup/wakeup_bench.cpp index 1d9817508d3..c39b8899159 100644 --- a/vespalib/src/tests/wakeup/wakeup_bench.cpp +++ b/vespalib/src/tests/wakeup/wakeup_bench.cpp @@ -5,6 +5,7 @@ #include <condition_variable> #include <thread> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/cpu_usage.h> #ifdef __linux__ #include <linux/futex.h> @@ -121,18 +122,33 @@ struct UsePipe : State { set_wakeup(); char token = 'T'; [[maybe_unused]] ssize_t res = write(pipefd[1], &token, 1); - assert(res == 1); + // assert(res == 1); } void stop() { set_stop(); char token = 'T'; [[maybe_unused]] ssize_t res = write(pipefd[1], &token, 1); - assert(res == 1); + // assert(res == 1); } void wait() { char token_trash[128]; [[maybe_unused]] ssize_t res = read(pipefd[0], token_trash, sizeof(token_trash)); - assert(res == 1); + // assert(res == 1); + } +}; + +struct UseAtomic : State { + void wakeup() { + set_wakeup(); + value.notify_one(); + } + void stop() { + set_stop(); + value.notify_one(); + } + void wait() { + value.wait(0); + // assert(!is_ready()); } }; @@ -162,9 +178,11 @@ struct Wakeup : T { using T::should_stop; using T::set_ready; using T::wait; + cpu_usage::ThreadSampler::UP cpu; std::thread thread; Wakeup() : thread([this]{ run(); }) {} void run() { + cpu = cpu_usage::create_thread_sampler(); while (!should_stop()) { set_ready(); wait(); @@ -211,6 +229,15 @@ void wait_until_ready(const T &list) { } template <typename T> +duration sample_cpu(T &list) { + duration result = duration::zero(); + for (auto *item: list) { + result += item->cpu->sample(); + } + return result; +} + +template <typename T> auto perform_wakeups(T &list, size_t target) __attribute__((noinline)); template <typename T> auto perform_wakeups(T &list, size_t target) { @@ -239,11 +266,17 @@ void benchmark() { perform_wakeups(list, WAKE_CNT / 64); } auto t1 = steady_clock::now(); + auto cpu0 = sample_cpu(list); auto res = perform_wakeups(list, WAKE_CNT); auto t2 = steady_clock::now(); + auto cpu1 = sample_cpu(list); wait_until_ready(list); destroy_list(list); - fprintf(stderr, "wakeups per second: %zu (skipped: %zu)\n", size_t(res.first / to_s(t2 - t1)), res.second); + double run_time = to_s(t2 - t1); + double cpu_time = to_s(cpu1 - cpu0); + double cpu_load = (cpu_time / (N * run_time)); + fprintf(stderr, "wakeups per second: %zu (skipped: %zu, cpu load: %.3f)\n", + size_t(res.first / run_time), res.second, cpu_load); } TEST(WakeupBench, using_spin) { benchmark<Wakeup<UseSpin>>(); } @@ -251,6 +284,7 @@ TEST(WakeupBench, using_spin_yield) { benchmark<Wakeup<UseSpinYield>>(); } TEST(WakeupBench, using_cond) { benchmark<Wakeup<UseCond>>(); } TEST(WakeupBench, using_cond_nolock) { benchmark<Wakeup<UseCondNolock>>(); } TEST(WakeupBench, using_pipe) { benchmark<Wakeup<UsePipe>>(); } +TEST(WakeupBench, using_atomic) { benchmark<Wakeup<UseAtomic>>(); } #ifdef __linux__ TEST(WakeupBench, using_futex) { benchmark<Wakeup<UseFutex>>(); } diff --git a/vespalib/src/vespa/vespalib/data/slime/array_value.h b/vespalib/src/vespa/vespalib/data/slime/array_value.h index b1b257e8476..216d116498c 100644 --- a/vespalib/src/vespa/vespalib/data/slime/array_value.h +++ b/vespalib/src/vespa/vespalib/data/slime/array_value.h @@ -5,6 +5,7 @@ #include "value.h" #include "nix_value.h" #include "value_factory.h" +#include <vespa/vespalib/stllike/allocator.h> #include <vector> namespace vespalib::slime { @@ -18,7 +19,7 @@ class ArrayValue final : public Value private: SymbolTable &_symbolTable; Stash &_stash; - std::vector<Value*> _values; + std::vector<Value*, vespalib::allocator_large<Value*>> _values; protected: Cursor &addLeaf(const ValueFactory &input) override { diff --git a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt index d628843279d..a2f7a4d4ff4 100644 --- a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt @@ -12,8 +12,11 @@ vespa_add_library(vespalib_vespalib_datastore OBJECT entryref.cpp entry_ref_filter.cpp fixed_size_hash_map.cpp + large_array_buffer_type.cpp sharded_hash_map.cpp + small_array_buffer_type.cpp unique_store.cpp + unique_store_buffer_type.cpp unique_store_string_allocator.cpp DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.cpp b/vespalib/src/vespa/vespalib/datastore/array_store.cpp index 0e03b36d2f9..b03f21402a5 100644 --- a/vespalib/src/vespa/vespalib/datastore/array_store.cpp +++ b/vespalib/src/vespa/vespalib/datastore/array_store.cpp @@ -5,10 +5,4 @@ namespace vespalib::datastore { -template class BufferType<vespalib::Array<uint8_t>>; -template class BufferType<vespalib::Array<uint32_t>>; -template class BufferType<vespalib::Array<int32_t>>; -template class BufferType<vespalib::Array<std::string>>; -template class BufferType<vespalib::Array<AtomicEntryRef>>; - } diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.h b/vespalib/src/vespa/vespalib/datastore/array_store.h index d9b62c310b5..ed3af451b04 100644 --- a/vespalib/src/vespa/vespalib/datastore/array_store.h +++ b/vespalib/src/vespa/vespalib/datastore/array_store.h @@ -9,6 +9,8 @@ #include "entryref.h" #include "atomic_entry_ref.h" #include "i_compaction_context.h" +#include "large_array_buffer_type.h" +#include "small_array_buffer_type.h" #include <vespa/vespalib/util/array.h> namespace vespalib::datastore { @@ -32,27 +34,15 @@ public: using SmallArrayType = BufferType<EntryT>; using LargeArray = vespalib::Array<EntryT>; using AllocSpec = ArrayStoreConfig::AllocSpec; - private: - class LargeArrayType : public BufferType<LargeArray> { - private: - using ParentType = BufferType<LargeArray>; - using ParentType::_emptyEntry; - using CleanContext = typename ParentType::CleanContext; - public: - LargeArrayType(const AllocSpec &spec); - void cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override; - }; - - uint32_t _largeArrayTypeId; uint32_t _maxSmallArraySize; DataStoreType _store; - std::vector<SmallArrayType> _smallArrayTypes; - LargeArrayType _largeArrayType; + std::vector<SmallArrayBufferType<EntryT>> _smallArrayTypes; + LargeArrayBufferType<EntryT> _largeArrayType; using generation_t = vespalib::GenerationHandler::generation_t; - void initArrayTypes(const ArrayStoreConfig &cfg); + void initArrayTypes(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator); // 1-to-1 mapping between type ids and sizes for small arrays is enforced during initialization. uint32_t getTypeId(size_t arraySize) const { return arraySize; } size_t getArraySize(uint32_t typeId) const { return typeId; } @@ -68,7 +58,7 @@ private: } public: - ArrayStore(const ArrayStoreConfig &cfg); + ArrayStore(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator); ~ArrayStore(); EntryRef add(const ConstArrayRef &array); ConstArrayRef get(EntryRef ref) const { diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.hpp b/vespalib/src/vespa/vespalib/datastore/array_store.hpp index bbbd52c354d..00c1615b173 100644 --- a/vespalib/src/vespa/vespalib/datastore/array_store.hpp +++ b/vespalib/src/vespa/vespalib/datastore/array_store.hpp @@ -6,40 +6,23 @@ #include "compaction_spec.h" #include "entry_ref_filter.h" #include "datastore.hpp" +#include "large_array_buffer_type.hpp" +#include "small_array_buffer_type.hpp" #include <atomic> #include <algorithm> namespace vespalib::datastore { template <typename EntryT, typename RefT> -ArrayStore<EntryT, RefT>::LargeArrayType::LargeArrayType(const AllocSpec &spec) - : BufferType<LargeArray>(1, spec.minArraysInBuffer, spec.maxArraysInBuffer, spec.numArraysForNewBuffer, spec.allocGrowFactor) -{ -} - -template <typename EntryT, typename RefT> -void -ArrayStore<EntryT, RefT>::LargeArrayType::cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) -{ - LargeArray *elem = static_cast<LargeArray *>(buffer) + offset; - for (size_t i = 0; i < numElems; ++i) { - cleanCtx.extraBytesCleaned(sizeof(EntryT) * elem->size()); - *elem = _emptyEntry; - ++elem; - } -} - -template <typename EntryT, typename RefT> void -ArrayStore<EntryT, RefT>::initArrayTypes(const ArrayStoreConfig &cfg) +ArrayStore<EntryT, RefT>::initArrayTypes(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) { _largeArrayTypeId = _store.addType(&_largeArrayType); assert(_largeArrayTypeId == 0); _smallArrayTypes.reserve(_maxSmallArraySize); for (uint32_t arraySize = 1; arraySize <= _maxSmallArraySize; ++arraySize) { const AllocSpec &spec = cfg.specForSize(arraySize); - _smallArrayTypes.emplace_back(arraySize, spec.minArraysInBuffer, spec.maxArraysInBuffer, - spec.numArraysForNewBuffer, spec.allocGrowFactor); + _smallArrayTypes.emplace_back(arraySize, spec, memory_allocator); } for (auto & type : _smallArrayTypes) { uint32_t typeId = _store.addType(&type); @@ -48,14 +31,14 @@ ArrayStore<EntryT, RefT>::initArrayTypes(const ArrayStoreConfig &cfg) } template <typename EntryT, typename RefT> -ArrayStore<EntryT, RefT>::ArrayStore(const ArrayStoreConfig &cfg) +ArrayStore<EntryT, RefT>::ArrayStore(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) : _largeArrayTypeId(0), _maxSmallArraySize(cfg.maxSmallArraySize()), _store(), _smallArrayTypes(), - _largeArrayType(cfg.specForSize(0)) + _largeArrayType(cfg.specForSize(0), memory_allocator) { - initArrayTypes(cfg); + initArrayTypes(cfg, std::move(memory_allocator)); _store.init_primary_buffers(); if (cfg.enable_free_lists()) { _store.enableFreeLists(); diff --git a/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.cpp b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.cpp new file mode 100644 index 00000000000..f4ccb27abad --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.cpp @@ -0,0 +1,20 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "large_array_buffer_type.hpp" +#include "buffer_type.hpp" + +namespace vespalib::datastore { + +template class BufferType<Array<uint8_t>>; +template class BufferType<Array<uint32_t>>; +template class BufferType<Array<int32_t>>; +template class BufferType<Array<std::string>>; +template class BufferType<Array<AtomicEntryRef>>; + +template class LargeArrayBufferType<uint8_t>; +template class LargeArrayBufferType<uint32_t>; +template class LargeArrayBufferType<int32_t>; +template class LargeArrayBufferType<std::string>; +template class LargeArrayBufferType<AtomicEntryRef>; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.h b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.h new file mode 100644 index 00000000000..50d15d4a27c --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.h @@ -0,0 +1,39 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "array_store_config.h" +#include "buffer_type.h" +#include <vespa/vespalib/util/array.h> +#include <memory> + +namespace vespalib::alloc { class MemoryAllocator; } + +namespace vespalib::datastore { + +/* + * Class representing buffer type for large arrays in ArrayStore + */ +template <typename EntryT> +class LargeArrayBufferType : public BufferType<Array<EntryT>> +{ + using AllocSpec = ArrayStoreConfig::AllocSpec; + using ArrayType = Array<EntryT>; + using ParentType = BufferType<ArrayType>; + using ParentType::_emptyEntry; + using CleanContext = typename ParentType::CleanContext; + std::shared_ptr<alloc::MemoryAllocator> _memory_allocator; +public: + LargeArrayBufferType(const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept; + ~LargeArrayBufferType() override; + void cleanHold(void* buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override; + const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override; +}; + +extern template class LargeArrayBufferType<uint8_t>; +extern template class LargeArrayBufferType<uint32_t>; +extern template class LargeArrayBufferType<int32_t>; +extern template class LargeArrayBufferType<std::string>; +extern template class LargeArrayBufferType<AtomicEntryRef>; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.hpp b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.hpp new file mode 100644 index 00000000000..aeeef8166c6 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.hpp @@ -0,0 +1,39 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "large_array_buffer_type.h" +#include <vespa/vespalib/util/array.hpp> + +namespace vespalib::datastore { + +template <typename EntryT> +LargeArrayBufferType<EntryT>::LargeArrayBufferType(const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept + : BufferType<Array<EntryT>>(1u, spec.minArraysInBuffer, spec.maxArraysInBuffer, spec.numArraysForNewBuffer, spec.allocGrowFactor), + _memory_allocator(std::move(memory_allocator)) +{ +} + +template <typename EntryT> +LargeArrayBufferType<EntryT>::~LargeArrayBufferType() = default; + +template <typename EntryT> +void +LargeArrayBufferType<EntryT>::cleanHold(void* buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) +{ + ArrayType* elem = static_cast<ArrayType*>(buffer) + offset; + for (size_t i = 0; i < numElems; ++i) { + cleanCtx.extraBytesCleaned(sizeof(EntryT) * elem->size()); + *elem = _emptyEntry; + ++elem; + } +} + +template <typename EntryT> +const vespalib::alloc::MemoryAllocator* +LargeArrayBufferType<EntryT>::get_memory_allocator() const +{ + return _memory_allocator.get(); +} + +} diff --git a/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.cpp b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.cpp new file mode 100644 index 00000000000..06a4c6007a9 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.cpp @@ -0,0 +1,14 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "small_array_buffer_type.hpp" +#include "buffer_type.hpp" + +namespace vespalib::datastore { + +template class SmallArrayBufferType<uint8_t>; +template class SmallArrayBufferType<uint32_t>; +template class SmallArrayBufferType<int32_t>; +template class SmallArrayBufferType<std::string>; +template class SmallArrayBufferType<AtomicEntryRef>; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.h b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.h new file mode 100644 index 00000000000..4e4568c3d16 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.h @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "array_store_config.h" +#include "buffer_type.h" +#include <memory> + +namespace vespalib::alloc { class MemoryAllocator; } + +namespace vespalib::datastore { + +/* + * Class representing buffer type for small arrays in ArrayStore + */ +template <typename EntryT> +class SmallArrayBufferType : public BufferType<EntryT> +{ + using AllocSpec = ArrayStoreConfig::AllocSpec; + std::shared_ptr<alloc::MemoryAllocator> _memory_allocator; +public: + SmallArrayBufferType(const SmallArrayBufferType&) = delete; + SmallArrayBufferType& operator=(const SmallArrayBufferType&) = delete; + SmallArrayBufferType(SmallArrayBufferType&&) noexcept = default; + SmallArrayBufferType& operator=(SmallArrayBufferType&&) noexcept = default; + SmallArrayBufferType(uint32_t array_size, const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept; + ~SmallArrayBufferType() override; + const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override; +}; + +extern template class SmallArrayBufferType<uint8_t>; +extern template class SmallArrayBufferType<uint32_t>; +extern template class SmallArrayBufferType<int32_t>; +extern template class SmallArrayBufferType<std::string>; +extern template class SmallArrayBufferType<AtomicEntryRef>; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.hpp b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.hpp new file mode 100644 index 00000000000..414804417eb --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.hpp @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "small_array_buffer_type.h" + +namespace vespalib::datastore { + +template <typename EntryT> +SmallArrayBufferType<EntryT>::SmallArrayBufferType(uint32_t array_size, const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept + : BufferType<EntryT>(array_size, spec.minArraysInBuffer, spec.maxArraysInBuffer, spec.numArraysForNewBuffer, spec.allocGrowFactor), + _memory_allocator(std::move(memory_allocator)) +{ +} + +template <typename EntryT> +SmallArrayBufferType<EntryT>::~SmallArrayBufferType() = default; + +template <typename EntryT> +const vespalib::alloc::MemoryAllocator* +SmallArrayBufferType<EntryT>::get_memory_allocator() const +{ + return _memory_allocator.get(); +} + +} diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.cpp b/vespalib/src/vespa/vespalib/datastore/unique_store.cpp index 0d740c30c84..dcd9b38fab8 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store.cpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store.cpp @@ -5,14 +5,6 @@ namespace vespalib::datastore { -template class BufferType<UniqueStoreEntry<int8_t>>; -template class BufferType<UniqueStoreEntry<int16_t>>; -template class BufferType<UniqueStoreEntry<int32_t>>; -template class BufferType<UniqueStoreEntry<int64_t>>; -template class BufferType<UniqueStoreEntry<uint32_t>>; -template class BufferType<UniqueStoreEntry<float>>; -template class BufferType<UniqueStoreEntry<double>>; - using namespace btree; VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_INTERNALNODE(EntryRef, NoAggregated, uniquestore::DefaultDictionaryTraits::INTERNAL_SLOTS); diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.h b/vespalib/src/vespa/vespalib/datastore/unique_store.h index aea98f406e8..81034ab4210 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store.h +++ b/vespalib/src/vespa/vespalib/datastore/unique_store.h @@ -13,6 +13,8 @@ #include "unique_store_comparator.h" #include "unique_store_entry.h" +namespace vespalib::alloc { class MemoryAllocator; } + namespace vespalib::datastore { template <typename Allocator> @@ -47,8 +49,8 @@ private: using generation_t = vespalib::GenerationHandler::generation_t; public: - UniqueStore(); - UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict); + UniqueStore(std::shared_ptr<alloc::MemoryAllocator> memory_allocator); + UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict, std::shared_ptr<alloc::MemoryAllocator> memory_allocator); ~UniqueStore(); void set_dictionary(std::unique_ptr<IUniqueStoreDictionary> dict); UniqueStoreAddResult add(EntryConstRefType value); diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp index b73b714a6bc..b1a7db56545 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp @@ -27,14 +27,14 @@ using DefaultUniqueStoreDictionary = UniqueStoreDictionary<DefaultDictionary>; } template <typename EntryT, typename RefT, typename Compare, typename Allocator> -UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore() - : UniqueStore<EntryT, RefT, Compare, Allocator>(std::make_unique<uniquestore::DefaultUniqueStoreDictionary>(std::unique_ptr<EntryComparator>())) +UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore(std::shared_ptr<alloc::MemoryAllocator> memory_allocator) + : UniqueStore<EntryT, RefT, Compare, Allocator>(std::make_unique<uniquestore::DefaultUniqueStoreDictionary>(std::unique_ptr<EntryComparator>()), std::move(memory_allocator)) { } template <typename EntryT, typename RefT, typename Compare, typename Allocator> -UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict) - : _allocator(), +UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) + : _allocator(std::move(memory_allocator)), _store(_allocator.get_data_store()), _dict(std::move(dict)) { diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h index 0648f466bae..025165ee0e0 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h @@ -5,9 +5,12 @@ #include "datastore.h" #include "entryref.h" #include "unique_store_add_result.h" +#include "unique_store_buffer_type.h" #include "unique_store_entry.h" #include "i_compactable.h" +namespace vespalib::alloc { class MemoryAllocator; } + namespace vespalib::datastore { /** @@ -23,13 +26,12 @@ public: using EntryConstRefType = const EntryType &; using WrappedEntryType = UniqueStoreEntry<EntryType>; using RefType = RefT; - using UniqueStoreBufferType = BufferType<WrappedEntryType>; private: DataStoreType _store; - UniqueStoreBufferType _typeHandler; + UniqueStoreBufferType<WrappedEntryType> _typeHandler; public: - UniqueStoreAllocator(); + UniqueStoreAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator); ~UniqueStoreAllocator() override; EntryRef allocate(const EntryType& value); void hold(EntryRef ref); diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp index eb634503c9d..04a229d4ffa 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp @@ -3,6 +3,7 @@ #pragma once #include "unique_store_allocator.h" +#include "unique_store_buffer_type.hpp" #include "unique_store_value_filter.h" #include "datastore.hpp" #include <vespa/vespalib/util/size_literals.h> @@ -13,10 +14,10 @@ constexpr size_t NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER = 1_Ki; constexpr float ALLOC_GROW_FACTOR = 0.2; template <typename EntryT, typename RefT> -UniqueStoreAllocator<EntryT, RefT>::UniqueStoreAllocator() +UniqueStoreAllocator<EntryT, RefT>::UniqueStoreAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator) : ICompactable(), _store(), - _typeHandler(1, 2u, RefT::offsetSize(), NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR) + _typeHandler(2u, RefT::offsetSize(), NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR, std::move(memory_allocator)) { auto typeId = _store.addType(&_typeHandler); assert(typeId == 0u); diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.cpp b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.cpp new file mode 100644 index 00000000000..3441a9435a1 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.cpp @@ -0,0 +1,25 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "unique_store_buffer_type.hpp" +#include "unique_store_entry.h" + +namespace vespalib::datastore { + +template class BufferType<UniqueStoreEntry<int8_t>>; +template class BufferType<UniqueStoreEntry<int16_t>>; +template class BufferType<UniqueStoreEntry<int32_t>>; +template class BufferType<UniqueStoreEntry<int64_t>>; +template class BufferType<UniqueStoreEntry<uint32_t>>; +template class BufferType<UniqueStoreEntry<float>>; +template class BufferType<UniqueStoreEntry<double>>; + +template class UniqueStoreBufferType<UniqueStoreEntry<int8_t>>; +template class UniqueStoreBufferType<UniqueStoreEntry<int16_t>>; +template class UniqueStoreBufferType<UniqueStoreEntry<int32_t>>; +template class UniqueStoreBufferType<UniqueStoreEntry<int64_t>>; +template class UniqueStoreBufferType<UniqueStoreEntry<uint32_t>>; +template class UniqueStoreBufferType<UniqueStoreEntry<float>>; +template class UniqueStoreBufferType<UniqueStoreEntry<double>>; + +}; + diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.h b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.h new file mode 100644 index 00000000000..2aba785dd91 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.h @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "buffer_type.h" +#include <memory> + +namespace vespalib::alloc { class MemoryAllocator; } + +namespace vespalib::datastore { + +/* + * Class representing buffer type for a normal unique store allocator. + */ +template <typename WrappedEntry> +class UniqueStoreBufferType : public BufferType<WrappedEntry> +{ + std::shared_ptr<alloc::MemoryAllocator> _memory_allocator; +public: + UniqueStoreBufferType(uint32_t min_arrays, uint32_t max_arrays, + uint32_t num_arrays_for_new_buffer, float alloc_grow_factor, + std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept; + ~UniqueStoreBufferType() override; + const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override; +}; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.hpp new file mode 100644 index 00000000000..c99033106ee --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.hpp @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "unique_store_buffer_type.h" +#include "buffer_type.hpp" + +namespace vespalib::datastore { + +template <typename WrappedEntry> +UniqueStoreBufferType<WrappedEntry>::UniqueStoreBufferType(uint32_t min_arrays, uint32_t max_arrays, + uint32_t num_arrays_for_new_buffer, float alloc_grow_factor, + std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept + : BufferType<WrappedEntry>(1u, min_arrays, max_arrays, num_arrays_for_new_buffer, alloc_grow_factor), + _memory_allocator(std::move(memory_allocator)) +{ +} + +template <typename WrappedEntry> +UniqueStoreBufferType<WrappedEntry>::~UniqueStoreBufferType() = default; + +template <typename WrappedEntry> +const vespalib::alloc::MemoryAllocator* +UniqueStoreBufferType<WrappedEntry>::get_memory_allocator() const +{ + return _memory_allocator.get(); +} + +} diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp index 664f1e4432e..9c639067615 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp @@ -33,8 +33,9 @@ get_type_id(size_t string_len) } -UniqueStoreSmallStringBufferType::UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays) - : BufferType<char>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR) +UniqueStoreSmallStringBufferType::UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator) + : BufferType<char>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR), + _memory_allocator(std::move(memory_allocator)) { } @@ -68,8 +69,15 @@ UniqueStoreSmallStringBufferType::cleanHold(void *buffer, size_t offset, ElemCou assert(e == e_end); } -UniqueStoreExternalStringBufferType::UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays) - : BufferType<UniqueStoreEntry<std::string>>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR) +const vespalib::alloc::MemoryAllocator* +UniqueStoreSmallStringBufferType::get_memory_allocator() const +{ + return _memory_allocator.get(); +} + +UniqueStoreExternalStringBufferType::UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator) + : BufferType<UniqueStoreEntry<std::string>>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR), + _memory_allocator(std::move(memory_allocator)) { } @@ -86,6 +94,12 @@ UniqueStoreExternalStringBufferType::cleanHold(void *buffer, size_t offset, Elem } } +const vespalib::alloc::MemoryAllocator* +UniqueStoreExternalStringBufferType::get_memory_allocator() const +{ + return _memory_allocator.get(); +} + template class UniqueStoreStringAllocator<EntryRefT<22>>; template class BufferType<UniqueStoreEntry<std::string>>; diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h index 7b4c578f248..be5fa8f6c1e 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h @@ -10,6 +10,8 @@ #include <cassert> #include <string> +namespace vespalib::alloc { class MemoryAllocator; } + namespace vespalib::datastore { namespace string_allocator { @@ -56,22 +58,26 @@ public: * bytes */ class UniqueStoreSmallStringBufferType : public BufferType<char> { + std::shared_ptr<vespalib::alloc::MemoryAllocator> _memory_allocator; public: - UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays); + UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator); ~UniqueStoreSmallStringBufferType() override; void destroyElements(void *, ElemCount) override; void fallbackCopy(void *newBuffer, const void *oldBuffer, ElemCount numElems) override; void cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext) override; + const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override; }; /* * Buffer type for external strings in unique store. */ class UniqueStoreExternalStringBufferType : public BufferType<UniqueStoreEntry<std::string>> { + std::shared_ptr<vespalib::alloc::MemoryAllocator> _memory_allocator; public: - UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays); + UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator); ~UniqueStoreExternalStringBufferType() override; void cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override; + const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override; }; /** @@ -101,7 +107,7 @@ private: static uint32_t get_type_id(const char *value); public: - UniqueStoreStringAllocator(); + UniqueStoreStringAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator); ~UniqueStoreStringAllocator() override; EntryRef allocate(const char *value); void hold(EntryRef ref); diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp index 90c44200fbb..71ea16bcde2 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp @@ -8,14 +8,14 @@ namespace vespalib::datastore { template <typename RefT> -UniqueStoreStringAllocator<RefT>::UniqueStoreStringAllocator() +UniqueStoreStringAllocator<RefT>::UniqueStoreStringAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator) : ICompactable(), _store(), _type_handlers() { - _type_handlers.emplace_back(std::make_unique<UniqueStoreExternalStringBufferType>(1, RefT::offsetSize())); + _type_handlers.emplace_back(std::make_unique<UniqueStoreExternalStringBufferType>(1, RefT::offsetSize(), memory_allocator)); for (auto size : string_allocator::array_sizes) { - _type_handlers.emplace_back(std::make_unique<UniqueStoreSmallStringBufferType>(size, RefT::offsetSize())); + _type_handlers.emplace_back(std::make_unique<UniqueStoreSmallStringBufferType>(size, RefT::offsetSize(), memory_allocator)); } uint32_t exp_type_id = 0; for (auto &type_handler : _type_handlers) { diff --git a/vespalib/src/vespa/vespalib/stllike/asciistream.cpp b/vespalib/src/vespa/vespalib/stllike/asciistream.cpp index bedd0f119c6..eb127c7051a 100644 --- a/vespalib/src/vespa/vespalib/stllike/asciistream.cpp +++ b/vespalib/src/vespa/vespalib/stllike/asciistream.cpp @@ -3,15 +3,14 @@ #include "asciistream.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/memory.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/alloc.h> #include <vespa/vespalib/locale/c.h> #include <vespa/fastos/file.h> #include <limits> -#include <stdexcept> #include <cassert> -#include <cmath> #include <charconv> +#include <vector> #include <vespa/log/log.h> LOG_SETUP(".vespalib.stllike.asciistream"); @@ -19,20 +18,23 @@ LOG_SETUP(".vespalib.stllike.asciistream"); namespace vespalib { namespace { - std::vector<string> getPrecisions(const char type) { - std::vector<string> result(VESPALIB_ASCIISTREAM_MAX_PRECISION + 1); - for (uint32_t i=0; i<result.size(); ++i) { - char buf[8]; - int count = snprintf(buf, sizeof(buf), "%%.%u%c", i, type); - assert(size_t(count) < sizeof(buf)); // Assert no truncation. - (void) count; - result[i] = buf; - } - return result; + +std::vector<string> +getPrecisions(const char type) { + std::vector<string> result(VESPALIB_ASCIISTREAM_MAX_PRECISION + 1); + for (uint32_t i=0; i<result.size(); ++i) { + char buf[8]; + int count = snprintf(buf, sizeof(buf), "%%.%u%c", i, type); + assert(size_t(count) < sizeof(buf)); // Assert no truncation. + (void) count; + result[i] = buf; } - std::vector<string> fixedPrecisions = getPrecisions('f'); - std::vector<string> scientificPrecisions = getPrecisions('e'); - std::vector<string> autoPrecisions = getPrecisions('g'); + return result; +} +std::vector<string> fixedPrecisions = getPrecisions('f'); +std::vector<string> scientificPrecisions = getPrecisions('e'); +std::vector<string> autoPrecisions = getPrecisions('g'); + } asciistream & @@ -93,7 +95,8 @@ asciistream::asciistream(const asciistream & rhs) : { } -asciistream & asciistream::operator = (const asciistream & rhs) +asciistream & +asciistream::operator = (const asciistream & rhs) { if (this != &rhs) { asciistream newStream(rhs); @@ -108,7 +111,8 @@ asciistream::asciistream(asciistream && rhs) noexcept swap(rhs); } -asciistream & asciistream::operator = (asciistream && rhs) noexcept +asciistream & +asciistream::operator = (asciistream && rhs) noexcept { if (this != &rhs) { swap(rhs); @@ -116,7 +120,8 @@ asciistream & asciistream::operator = (asciistream && rhs) noexcept return *this; } -void asciistream::swap(asciistream & rhs) noexcept +void +asciistream::swap(asciistream & rhs) noexcept { std::swap(_rPos, rhs._rPos); // If read-only, _wbuf is empty and _rbuf is set @@ -150,7 +155,8 @@ void throwUnderflow(size_t pos) __attribute__((noinline)); template <typename T> T strToInt(T & v, const char *begin, const char *end) __attribute__((noinline)); -void throwInputError(int e, const char * t, const char * buf) +void +throwInputError(int e, const char * t, const char * buf) { if (e == 0) { throw IllegalArgumentException("Failed decoding a " + string(t) + " from '" + string(buf) + "'.", VESPA_STRLOC); @@ -163,7 +169,8 @@ void throwInputError(int e, const char * t, const char * buf) } } -void throwInputError(std::errc e, const char * t, const char * buf) { +void +throwInputError(std::errc e, const char * t, const char * buf) { if (e == std::errc::invalid_argument) { throw IllegalArgumentException("Illegal " + string(t) + " value '" + string(buf) + "'.", VESPA_STRLOC); } else if (e == std::errc::result_out_of_range) { @@ -173,12 +180,14 @@ void throwInputError(std::errc e, const char * t, const char * buf) { } } -void throwUnderflow(size_t pos) +void +throwUnderflow(size_t pos) { throw IllegalArgumentException(make_string("buffer underflow at pos %ld.", pos), VESPA_STRLOC); } -int getValue(double & val, const char *buf) +int +getValue(double & val, const char *buf) { char *ebuf; errno = 0; @@ -189,7 +198,8 @@ int getValue(double & val, const char *buf) return ebuf - buf; } -int getValue(float & val, const char *buf) +int +getValue(float & val, const char *buf) { char *ebuf; errno = 0; @@ -201,7 +211,8 @@ int getValue(float & val, const char *buf) } template <typename T> -T strToInt(T & v, const char *begin, const char *end) +T +strToInt(T & v, const char *begin, const char *end) { const char * curr = begin; for (;(curr < end) && std::isspace(*curr); curr++); @@ -226,7 +237,8 @@ T strToInt(T & v, const char *begin, const char *end) } -asciistream & asciistream::operator >> (bool & v) +asciistream & +asciistream::operator >> (bool & v) { for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++); if (_rPos < length()) { @@ -237,7 +249,8 @@ asciistream & asciistream::operator >> (bool & v) return *this; } -asciistream & asciistream::operator >> (char & v) +asciistream & +asciistream::operator >> (char & v) { for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++); if (_rPos < length()) { @@ -248,7 +261,8 @@ asciistream & asciistream::operator >> (char & v) return *this; } -asciistream & asciistream::operator >> (signed char & v) +asciistream & +asciistream::operator >> (signed char & v) { for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++); if (_rPos < length()) { @@ -259,7 +273,8 @@ asciistream & asciistream::operator >> (signed char & v) return *this; } -asciistream & asciistream::operator >> (unsigned char & v) +asciistream & +asciistream::operator >> (unsigned char & v) { for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++); if (_rPos < length()) { @@ -270,55 +285,64 @@ asciistream & asciistream::operator >> (unsigned char & v) return *this; } -asciistream & asciistream::operator >> (unsigned short & v) +asciistream & +asciistream::operator >> (unsigned short & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (unsigned int & v) +asciistream & +asciistream::operator >> (unsigned int & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (unsigned long & v) +asciistream & +asciistream::operator >> (unsigned long & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (unsigned long long & v) +asciistream & +asciistream::operator >> (unsigned long long & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (short & v) +asciistream & +asciistream::operator >> (short & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (int & v) +asciistream & +asciistream::operator >> (int & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (long & v) +asciistream & +asciistream::operator >> (long & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (long long & v) +asciistream & +asciistream::operator >> (long long & v) { _rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]); return *this; } -asciistream & asciistream::operator >> (double & v) +asciistream & +asciistream::operator >> (double & v) { double l(0); _rPos += getValue(l, &_rbuf[_rPos]); @@ -326,7 +350,8 @@ asciistream & asciistream::operator >> (double & v) return *this; } -asciistream & asciistream::operator >> (float & v) +asciistream & +asciistream::operator >> (float & v) { float l(0); _rPos += getValue(l, &_rbuf[_rPos]); @@ -334,7 +359,8 @@ asciistream & asciistream::operator >> (float & v) return *this; } -void asciistream::eatWhite() +void +asciistream::eatWhite() { for (;(_rPos < length()) && isspace(_rbuf[_rPos]); _rPos++); } @@ -344,7 +370,8 @@ void asciistream::eatNonWhite() for (;(_rPos < length()) && !isspace(_rbuf[_rPos]); _rPos++); } -asciistream & asciistream::operator >> (std::string & v) +asciistream & +asciistream::operator >> (std::string & v) { eatWhite(); size_t start(_rPos); @@ -353,7 +380,8 @@ asciistream & asciistream::operator >> (std::string & v) return *this; } -asciistream & asciistream::operator >> (string & v) +asciistream & +asciistream::operator >> (string & v) { eatWhite(); size_t start(_rPos); @@ -366,7 +394,8 @@ asciistream & asciistream::operator >> (string & v) namespace { const char * _C_char = "0123456789abcdefg"; -char * prependInt(char * tmp, Base base) +char * +prependInt(char * tmp, Base base) { if (base == bin) { tmp[1] = 'b'; @@ -376,7 +405,8 @@ char * prependInt(char * tmp, Base base) return tmp + 2; } -char * prependSign(bool sign, char * tmp) +char * +prependSign(bool sign, char * tmp) { if (sign) { tmp[0] = '-'; @@ -389,7 +419,8 @@ template <uint8_t base> uint8_t printInt(unsigned long long r, char * tmp, uint8_t i) __attribute__((noinline)); template <uint8_t base> -uint8_t printInt(unsigned long long r, char * tmp, uint8_t i) +uint8_t +printInt(unsigned long long r, char * tmp, uint8_t i) { for(; r; i--, r/=base) { uint8_t d = r%base; @@ -401,7 +432,8 @@ uint8_t printInt(unsigned long long r, char * tmp, uint8_t i) } -asciistream & asciistream::operator << (long long v) +asciistream & +asciistream::operator << (long long v) { char tmp[72]; uint8_t i(sizeof(tmp)); @@ -432,14 +464,16 @@ asciistream & asciistream::operator << (long long v) return *this; } -void asciistream::doReallyFill(size_t currWidth) +void +asciistream::doReallyFill(size_t currWidth) { for (; _width > currWidth; currWidth++) { write(&_fill, 1); } } -asciistream & asciistream::operator << (unsigned long long v) +asciistream & +asciistream::operator << (unsigned long long v) { char tmp[72]; uint8_t i(sizeof(tmp)); @@ -477,13 +511,15 @@ struct BaseStateSaver { }; } -asciistream& asciistream::operator<<(const void* p) +asciistream & +asciistream::operator<<(const void* p) { BaseStateSaver saver(*this, _base); return *this << "0x" << hex << reinterpret_cast<uint64_t>(p); } -asciistream & asciistream::operator << (float v) +asciistream & +asciistream::operator << (float v) { if (_floatSpec == fixed) { printFixed(v); @@ -493,7 +529,8 @@ asciistream & asciistream::operator << (float v) return *this; } -asciistream & asciistream::operator << (double v) +asciistream & +asciistream::operator << (double v) { if (_floatSpec == fixed) { printFixed(v); @@ -516,20 +553,20 @@ void asciistream::printFixed(T v) } namespace { - bool hasDotOrIsScientific(const char* string, size_t len) { - for (size_t i=0; i<len; ++i) { - switch (string[i]) { - case '.': - case ',': - case 'e': - case 'E': - return true; - default: - break; - } +bool hasDotOrIsScientific(const char* string, size_t len) { + for (size_t i=0; i<len; ++i) { + switch (string[i]) { + case '.': + case ',': + case 'e': + case 'E': + return true; + default: + break; } - return false; } + return false; +} } template <typename T> @@ -548,7 +585,8 @@ void asciistream::printScientific(T v) } } -void asciistream::write(const void * buf, size_t len) +void +asciistream::write(const void * buf, size_t len) { if (_rPos > 0 && _rPos == length()) { clear(); @@ -564,16 +602,8 @@ void asciistream::write(const void * buf, size_t len) _rbuf = _wbuf; } -std::vector<string> asciistream::getlines(char delim) -{ - std::vector<string> lines; - while (!eof()) { - lines.push_back(getline(delim)); - } - return lines; -} - -string asciistream::getline(char delim) +string +asciistream::getline(char delim) { string line; const size_t start(_rPos); @@ -588,7 +618,8 @@ string asciistream::getline(char delim) return line; } -asciistream asciistream::createFromFile(stringref fileName) +asciistream +asciistream::createFromFile(stringref fileName) { FastOS_File file(vespalib::string(fileName).c_str()); asciistream is; @@ -597,32 +628,35 @@ asciistream asciistream::createFromFile(stringref fileName) if (sz < 0) { throw IoException("Failed getting size of file " + fileName + " : Error=" + file.getLastErrorString(), IoException::UNSPECIFIED, VESPA_STRLOC); } - MallocPtr buf(sz); - ssize_t actual = file.Read(buf, sz); + alloc::Alloc buf = alloc::Alloc::alloc(sz); + ssize_t actual = file.Read(buf.get(), sz); if (actual != sz) { asciistream e; e << "Failed reading " << sz << " bytes from file " << fileName; throw IoException(e.str() + " : Error=" + file.getLastErrorString(), IoException::UNSPECIFIED, VESPA_STRLOC); } - is << stringref(buf.c_str(), buf.size()); + is << stringref(static_cast<const char *>(buf.get()), sz); } return is; } -asciistream asciistream::createFromDevice(stringref fileName) +asciistream +asciistream::createFromDevice(stringref fileName) { FastOS_File file(vespalib::string(fileName).c_str()); asciistream is; if (file.OpenReadOnly()) { - MallocPtr buf(64_Ki); - for (ssize_t actual = file.Read(buf, buf.size()); actual > 0; actual = file.Read(buf, buf.size())) { - is << stringref(buf.c_str(), actual); + constexpr size_t SZ = 64_Ki; + auto buf = std::make_unique<char []>(SZ); + for (ssize_t actual = file.Read(buf.get(), SZ); actual > 0; actual = file.Read(buf.get(), SZ)) { + is << stringref(buf.get(), actual); } } return is; } -ssize_t getline(asciistream & is, string & line, char delim) +ssize_t +getline(asciistream & is, string & line, char delim) { line = is.getline(delim); return line.size(); diff --git a/vespalib/src/vespa/vespalib/stllike/asciistream.h b/vespalib/src/vespa/vespalib/stllike/asciistream.h index 6a4b4378634..b333be6cec2 100644 --- a/vespalib/src/vespa/vespalib/stllike/asciistream.h +++ b/vespalib/src/vespa/vespalib/stllike/asciistream.h @@ -2,7 +2,6 @@ #pragma once #include <vespa/vespalib/stllike/string.h> -#include <vector> namespace vespalib { @@ -153,7 +152,6 @@ public: static asciistream createFromFile(stringref fileName); static asciistream createFromDevice(stringref fileName); string getline(char delim='\n'); - std::vector<string> getlines(char delim='\n'); char getFill() const noexcept { return _fill; } size_t getWidth() const noexcept { return static_cast<size_t>(_width); } // match input type of setw Base getBase() const noexcept { return _base; } diff --git a/vespalib/src/vespa/vespalib/test/CMakeLists.txt b/vespalib/src/vespa/vespalib/test/CMakeLists.txt index d658b28f94b..a60eb15a4d4 100644 --- a/vespalib/src/vespa/vespalib/test/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/test/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_library(vespalib_vespalib_test OBJECT make_tls_options_for_testing.cpp memory_allocator_observer.cpp peer_policy_utils.cpp + thread_meets.cpp time_tracer.cpp DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.cpp b/vespalib/src/vespa/vespalib/test/thread_meets.cpp new file mode 100644 index 00000000000..9d23e0eab28 --- /dev/null +++ b/vespalib/src/vespa/vespalib/test/thread_meets.cpp @@ -0,0 +1,12 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "thread_meets.h" + +namespace vespalib::test { + +void +ThreadMeets::Nop::mingle() +{ +} + +} diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.h b/vespalib/src/vespa/vespalib/test/thread_meets.h new file mode 100644 index 00000000000..62ca7779935 --- /dev/null +++ b/vespalib/src/vespa/vespalib/test/thread_meets.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/rendezvous.h> + +namespace vespalib::test { + +/** + * Generally useful rendezvous implementations. + **/ +struct ThreadMeets { + // can be used as a simple thread barrier + struct Nop : vespalib::Rendezvous<bool,bool> { + Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {} + void operator()() { rendezvous(false); } + void mingle() override; + }; + // swap values between 2 threads + template <typename T> + struct Swap : vespalib::Rendezvous<T,T> { + using vespalib::Rendezvous<T,T>::in; + using vespalib::Rendezvous<T,T>::out; + using vespalib::Rendezvous<T,T>::rendezvous; + Swap() : vespalib::Rendezvous<T,T>(2) {} + T operator()(T input) { return rendezvous(input); } + void mingle() override { + out(1) = in(0); + out(0) = in(1); + } + }; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 32f679d22d7..752bf60c688 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -42,6 +42,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT mmap_file_allocator.cpp mmap_file_allocator_factory.cpp monitored_refcount.cpp + nice.cpp printable.cpp priority_queue.cpp random.cpp @@ -56,6 +57,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT runnable_pair.cpp sequence.cpp sha1.cpp + shared_operation_throttler.cpp shared_string_repo.cpp sig_catch.cpp signalhandler.cpp diff --git a/vespalib/src/vespa/vespalib/util/alloc.cpp b/vespalib/src/vespa/vespalib/util/alloc.cpp index 69f2eedcecb..bf5f0200600 100644 --- a/vespalib/src/vespa/vespalib/util/alloc.cpp +++ b/vespalib/src/vespa/vespalib/util/alloc.cpp @@ -381,6 +381,9 @@ MMapAllocator::sresize_inplace(PtrAndSize current, size_t newSize) { size_t MMapAllocator::extend_inplace(PtrAndSize current, size_t newSize) { + if (current.second == 0u) { + return 0u; + } PtrAndSize got = MMapAllocator::salloc(newSize - current.second, static_cast<char *>(current.first)+current.second); if ((static_cast<const char *>(current.first) + current.second) == static_cast<const char *>(got.first)) { return current.second + got.second; @@ -477,6 +480,9 @@ Alloc::allocHeap(size_t sz) bool Alloc::resize_inplace(size_t newSize) { + if (newSize == 0u) { + return size() == 0u; + } size_t extendedSize = _allocator->resize_inplace(_alloc, newSize); if (extendedSize >= newSize) { _alloc.second = extendedSize; diff --git a/vespalib/src/vespa/vespalib/util/alloc.h b/vespalib/src/vespa/vespalib/util/alloc.h index d25fb6f6a7c..4066894b4e3 100644 --- a/vespalib/src/vespa/vespalib/util/alloc.h +++ b/vespalib/src/vespa/vespalib/util/alloc.h @@ -62,6 +62,13 @@ public: std::swap(_alloc, rhs._alloc); std::swap(_allocator, rhs._allocator); } + void reset() { + if (_alloc.first != nullptr) { + _allocator->free(_alloc); + _alloc.first = nullptr; + _alloc.second = 0u; + } + } Alloc create(size_t sz) const noexcept { return (sz == 0) ? Alloc(_allocator) : Alloc(_allocator, sz); } diff --git a/vespalib/src/vespa/vespalib/util/array.h b/vespalib/src/vespa/vespalib/util/array.h index 30d87cd98f6..cb5d5c7cc63 100644 --- a/vespalib/src/vespa/vespalib/util/array.h +++ b/vespalib/src/vespa/vespalib/util/array.h @@ -135,6 +135,7 @@ public: std::destroy(array(0), array(_sz)); _sz = 0; } + void reset(); bool empty() const { return _sz == 0; } T & operator [] (size_t i) { return *array(i); } const T & operator [] (size_t i) const { return *array(i); } @@ -145,6 +146,7 @@ public: rhs._sz = 0; return std::move(rhs._array); } + Array<T> create() const; private: T * array(size_t i) { return static_cast<T *>(_array.get()) + i; } const T * array(size_t i) const { return static_cast<const T *>(_array.get()) + i; } diff --git a/vespalib/src/vespa/vespalib/util/array.hpp b/vespalib/src/vespa/vespalib/util/array.hpp index e9070f5759c..72178f0391b 100644 --- a/vespalib/src/vespa/vespalib/util/array.hpp +++ b/vespalib/src/vespa/vespalib/util/array.hpp @@ -205,5 +205,20 @@ void Array<T>::cleanup() Alloc().swap(_array); } +template <typename T> +void Array<T>::reset() +{ + std::destroy(array(0), array(_sz)); + _sz = 0; + _array.reset(); +} + +template <typename T> +Array<T> +Array<T>::create() const +{ + return Array<T>(_array); // Use same memory allocator +} + } diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp index 73e70e7fd89..8f3dd8ab006 100644 --- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp +++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp @@ -2,11 +2,11 @@ #pragma once -#include <stdint.h> -#include <stdlib.h> +#include "traits.h" +#include <cstdint> +#include <cstdlib> #include <cassert> #include <algorithm> -#include "traits.h" namespace vespalib { diff --git a/vespalib/src/vespa/vespalib/util/atomic.h b/vespalib/src/vespa/vespalib/util/atomic.h new file mode 100644 index 00000000000..8bf258ffa50 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/atomic.h @@ -0,0 +1,151 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <atomic> +#include <type_traits> +#include <version> + +/** + * Utility functions for single value atomic memory accesses. + * + * store/load_ref_* functions can be used to provide well-defined atomic + * memory access to memory locations that aren't explicitly wrapped in std::atomic + * objects. In this case, all potentially racing loads/stores _must_ be through + * atomic utility functions (or atomic_ref). + * + * Non-ref store/load_* functions are just syntactic sugar to make code using + * atomics more readable, but additionally adds sanity checks that all atomics + * are always lock-free. + */ + +namespace vespalib::atomic { + +// +// std::atomic_ref<T> helpers +// + +namespace detail { +template <typename T> struct is_std_atomic : std::false_type {}; +template <typename T> struct is_std_atomic<std::atomic<T>> : std::true_type {}; +template <typename T> inline constexpr bool is_std_atomic_v = is_std_atomic<T>::value; +} + +// TODO can generalize atomic_ref code once no special casing is needed + +template <typename T1, typename T2> +constexpr void store_ref_relaxed(T1& lhs, T2&& v) noexcept { + static_assert(!detail::is_std_atomic_v<T1>, "atomic ref function invoked with a std::atomic, probably not intended"); +#if __cpp_lib_atomic_ref + static_assert(std::atomic_ref<T1>::is_always_lock_free); + std::atomic_ref<T1>(lhs).store(std::forward<T2>(v), std::memory_order_relaxed); +#else + // TODO replace with compiler intrinsic + lhs = std::forward<T2>(v); +#endif +} + +template <typename T1, typename T2> +constexpr void store_ref_release(T1& lhs, T2&& v) noexcept { + static_assert(!detail::is_std_atomic_v<T1>, "atomic ref function invoked with a std::atomic, probably not intended"); +#if __cpp_lib_atomic_ref + static_assert(std::atomic_ref<T1>::is_always_lock_free); + std::atomic_ref<T1>(lhs).store(std::forward<T2>(v), std::memory_order_release); +#else + // TODO replace with compiler intrinsic + lhs = std::forward<T2>(v); + std::atomic_thread_fence(std::memory_order_release); +#endif +} + +template <typename T1, typename T2> +constexpr void store_ref_seq_cst(T1& lhs, T2&& v) noexcept { + static_assert(!detail::is_std_atomic_v<T1>, "atomic ref function invoked with a std::atomic, probably not intended"); +#if __cpp_lib_atomic_ref + static_assert(std::atomic_ref<T1>::is_always_lock_free); + std::atomic_ref<T1>(lhs).store(std::forward<T2>(v), std::memory_order_seq_cst); +#else + // TODO replace with compiler intrinsic + lhs = std::forward<T2>(v); + std::atomic_thread_fence(std::memory_order_seq_cst); +#endif +} + +template <typename T> +[[nodiscard]] constexpr T load_ref_relaxed(const T& a) noexcept { + static_assert(!detail::is_std_atomic_v<T>, "atomic ref function invoked with a std::atomic, probably not intended"); +#if __cpp_lib_atomic_ref + static_assert(std::atomic_ref<const T>::is_always_lock_free); + return std::atomic_ref<const T>(a).load(std::memory_order_relaxed); +#else + // TODO replace with compiler intrinsic + return a; +#endif +} + +template <typename T> +[[nodiscard]] constexpr T load_ref_acquire(const T& a) noexcept { + static_assert(!detail::is_std_atomic_v<T>, "atomic ref function invoked with a std::atomic, probably not intended"); +#if __cpp_lib_atomic_ref + static_assert(std::atomic_ref<const T>::is_always_lock_free); + return std::atomic_ref<const T>(a).load(std::memory_order_acquire); +#else + // TODO replace with compiler intrinsic + std::atomic_thread_fence(std::memory_order_acquire); + return a; +#endif +} + +template <typename T> +[[nodiscard]] constexpr T load_ref_seq_cst(const T& a) noexcept { + static_assert(!detail::is_std_atomic_v<T>, "atomic ref function invoked with a std::atomic, probably not intended"); +#if __cpp_lib_atomic_ref + static_assert(std::atomic_ref<const T>::is_always_lock_free); + return std::atomic_ref<const T>(a).load(std::memory_order_seq_cst); +#else + // TODO replace with compiler intrinsic + std::atomic_thread_fence(std::memory_order_seq_cst); + return a; +#endif +} + +// +// std::atomic<T> helpers +// + +template <typename T1, typename T2> +constexpr void store_relaxed(std::atomic<T1>& lhs, T2&& v) noexcept { + static_assert(std::atomic<T1>::is_always_lock_free); + lhs.store(std::forward<T2>(v), std::memory_order_relaxed); +} + +template <typename T1, typename T2> +constexpr void store_release(std::atomic<T1>& lhs, T2&& v) noexcept { + static_assert(std::atomic<T1>::is_always_lock_free); + lhs.store(std::forward<T2>(v), std::memory_order_release); +} + +template <typename T1, typename T2> +constexpr void store_seq_cst(std::atomic<T1>& lhs, T2&& v) noexcept { + static_assert(std::atomic<T1>::is_always_lock_free); + lhs.store(std::forward<T2>(v), std::memory_order_seq_cst); +} + +template <typename T> +[[nodiscard]] constexpr T load_relaxed(const std::atomic<T>& a) noexcept { + static_assert(std::atomic<T>::is_always_lock_free); + return a.load(std::memory_order_relaxed); +} + +template <typename T> +[[nodiscard]] constexpr T load_acquire(const std::atomic<T>& a) noexcept { + static_assert(std::atomic<T>::is_always_lock_free); + return a.load(std::memory_order_acquire); +} + +template <typename T> +[[nodiscard]] constexpr T load_seq_cst(const std::atomic<T>& a) noexcept { + static_assert(std::atomic<T>::is_always_lock_free); + return a.load(std::memory_order_seq_cst); +} + +} // vespalib::atomic diff --git a/vespalib/src/vespa/vespalib/util/child_process.cpp b/vespalib/src/vespa/vespalib/util/child_process.cpp index 7193a445f9a..694bed60f1b 100644 --- a/vespalib/src/vespa/vespalib/util/child_process.cpp +++ b/vespalib/src/vespa/vespalib/util/child_process.cpp @@ -67,7 +67,9 @@ ChildProcess::Reader::OnReceiveData(const void *data, size_t length) return; } if (buf == nullptr) { // EOF - _gotEOF = true; + if (--_num_streams == 0) { + _gotEOF = true; + } } else { _queue.push(std::string(buf, length)); } @@ -107,11 +109,12 @@ ChildProcess::Reader::updateEOF() } -ChildProcess::Reader::Reader() +ChildProcess::Reader::Reader(int num_streams) : _lock(), _cond(), _queue(), _data(), + _num_streams(num_streams), _gotEOF(false), _waitCnt(0), _readEOF(false) @@ -203,7 +206,7 @@ ChildProcess::checkProc() ChildProcess::ChildProcess(const char *cmd) - : _reader(), + : _reader(1), _proc(cmd, true, &_reader), _running(false), _failed(false), @@ -213,6 +216,17 @@ ChildProcess::ChildProcess(const char *cmd) _failed = !_running; } +ChildProcess::ChildProcess(const char *cmd, capture_stderr_tag) + : _reader(2), + _proc(cmd, true, &_reader, &_reader), + _running(false), + _failed(false), + _exitCode(-918273645) +{ + _running = _proc.CreateWithShell(); + _failed = !_running; +} + ChildProcess::~ChildProcess() = default; diff --git a/vespalib/src/vespa/vespalib/util/child_process.h b/vespalib/src/vespa/vespalib/util/child_process.h index 646a2c7c6c9..877c56a8cb1 100644 --- a/vespalib/src/vespa/vespalib/util/child_process.h +++ b/vespalib/src/vespa/vespalib/util/child_process.h @@ -28,6 +28,7 @@ private: std::condition_variable _cond; std::queue<std::string> _queue; std::string _data; + int _num_streams; bool _gotEOF; int _waitCnt; bool _readEOF; @@ -38,7 +39,7 @@ private: void updateEOF(); public: - Reader(); + Reader(int num_streams); ~Reader() override; uint32_t read(char *buf, uint32_t len, int msTimeout); @@ -57,6 +58,7 @@ private: public: ChildProcess(const ChildProcess &) = delete; ChildProcess &operator=(const ChildProcess &) = delete; + struct capture_stderr_tag{}; /** * @brief Run a child process @@ -66,6 +68,15 @@ public: **/ explicit ChildProcess(const char *cmd); + /** + * @brief Run a child process + * + * Starts a process running the given command. stderr is + * redirected into stdout. + * @param cmd A shell command line to run + **/ + explicit ChildProcess(const char *cmd, capture_stderr_tag); + /** @brief destructor doing cleanup if needed */ ~ChildProcess(); diff --git a/vespalib/src/vespa/vespalib/util/count_down_latch.h b/vespalib/src/vespa/vespalib/util/count_down_latch.h index d543d773909..613a60e90c5 100644 --- a/vespalib/src/vespa/vespalib/util/count_down_latch.h +++ b/vespalib/src/vespa/vespalib/util/count_down_latch.h @@ -22,7 +22,7 @@ namespace vespalib { class CountDownLatch { private: - std::mutex _lock; + mutable std::mutex _lock; std::condition_variable _cond; uint32_t _count; @@ -44,7 +44,7 @@ public: * blocked in the await method will be unblocked. **/ void countDown() { - std::lock_guard<std::mutex> guard(_lock); + std::lock_guard guard(_lock); if (_count != 0) { --_count; if (_count == 0) { @@ -59,7 +59,7 @@ public: * reduce the count to 0. **/ void await() { - std::unique_lock<std::mutex> guard(_lock); + std::unique_lock guard(_lock); _cond.wait(guard, [this]() { return (_count == 0); }); } @@ -72,7 +72,7 @@ public: * @return true if the counter reached 0, false if we timed out **/ bool await(vespalib::duration maxwait) { - std::unique_lock<std::mutex> guard(_lock); + std::unique_lock guard(_lock); return _cond.wait_for(guard, maxwait, [this]() { return (_count == 0); }); } @@ -82,7 +82,10 @@ public: * * @return current count **/ - uint32_t getCount() const { return _count; } + [[nodiscard]] uint32_t getCount() const noexcept { + std::lock_guard guard(_lock); + return _count; + } /** * Empty. Needs to be virtual to reduce compiler warnings. diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp index 4eee0a63870..97dbedad66f 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp @@ -3,6 +3,10 @@ #include "cpu_usage.h" #include "require.h" #include <pthread.h> +#include <optional> +#include <cassert> + +#include <sys/resource.h> namespace vespalib { @@ -13,11 +17,11 @@ namespace { class DummyThreadSampler : public ThreadSampler { private: steady_time _start; - double _load; + double _util; public: - DummyThreadSampler(double load) : _start(steady_clock::now()), _load(load) {} - duration sample() const override { - return from_s(to_s(steady_clock::now() - _start) * _load); + DummyThreadSampler(double util) : _start(steady_clock::now()), _util(util) {} + duration sample() const noexcept override { + return from_s(to_s(steady_clock::now() - _start) * _util); } }; @@ -30,9 +34,10 @@ public: LinuxThreadSampler() : _my_clock() { REQUIRE_EQ(pthread_getcpuclockid(pthread_self(), &_my_clock), 0); } - duration sample() const override { + duration sample() const noexcept override { timespec ts; - REQUIRE_EQ(clock_gettime(_my_clock, &ts), 0); + memset(&ts, 0, sizeof(ts)); + clock_gettime(_my_clock, &ts); return from_timespec(ts); } }; @@ -41,16 +46,246 @@ public: } // <unnamed> -ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_load) { +duration total_cpu_usage() noexcept { + timespec ts; + memset(&ts, 0, sizeof(ts)); + clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts); + return from_timespec(ts); +} + +ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_util) { if (force_mock_impl) { - return std::make_unique<DummyThreadSampler>(expected_load); + return std::make_unique<DummyThreadSampler>(expected_util); } #ifdef __linux__ return std::make_unique<LinuxThreadSampler>(); #endif - return std::make_unique<DummyThreadSampler>(expected_load); + return std::make_unique<DummyThreadSampler>(expected_util); } } // cpu_usage +CpuUsage::ThreadTrackerImpl::ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler) + : _lock(), + _cat(Category::OTHER), + _old_usage(), + _sampler(std::move(sampler)), + _pending() +{ +} + +CpuUsage::Category +CpuUsage::ThreadTrackerImpl::set_category(Category new_cat) noexcept +{ + // only owning thread may change category + if (new_cat == _cat) { + return new_cat; + } + Guard guard(_lock); + duration new_usage = _sampler->sample(); + if (_cat != Category::OTHER) { + _pending[_cat] += (new_usage - _old_usage); + } + _old_usage = new_usage; + auto old_cat = _cat; + _cat = new_cat; + return old_cat; +} + +CpuUsage::Sample +CpuUsage::ThreadTrackerImpl::sample() noexcept +{ + Guard guard(_lock); + if (_cat != Category::OTHER) { + duration new_usage = _sampler->sample(); + _pending[_cat] += (new_usage - _old_usage); + _old_usage = new_usage; + } + Sample sample = _pending; + _pending = Sample(); + return sample; +} + +vespalib::string & +CpuUsage::name_of(Category cat) +{ + static std::array<vespalib::string,num_categories> names = {"setup", "read", "write", "compact", "other"}; + return names[index_of(cat)]; +} + +CpuUsage::Category +CpuUsage::MyUsage::set_cpu_category_for_this_thread(Category cat) noexcept +{ + struct Wrapper { + std::shared_ptr<ThreadTrackerImpl> self; + Wrapper() : self(std::make_shared<ThreadTrackerImpl>(cpu_usage::create_thread_sampler())) { + CpuUsage::self().add_thread(self); + } + ~Wrapper() { + self->set_category(CpuUsage::Category::OTHER); + CpuUsage::self().remove_thread(std::move(self)); + } + }; + thread_local Wrapper wrapper; + return wrapper.self->set_category(cat); +} + +CpuUsage::CpuUsage() + : _lock(), + _usage(), + _threads(), + _sampling(false), + _conflict(), + _pending_add(), + _pending_remove() +{ +} + +CpuUsage & +CpuUsage::self() +{ + static CpuUsage me; + return me; +} + +void +CpuUsage::do_add_thread(const Guard &, ThreadTracker::SP tracker) +{ + assert(!_sampling); + auto *key = tracker.get(); + auto [ignore, was_inserted] = _threads.emplace(key, std::move(tracker)); + assert(was_inserted); +} + +void +CpuUsage::do_remove_thread(const Guard &, ThreadTracker::SP tracker) +{ + assert(!_sampling); + _usage.merge(tracker->sample()); + auto was_removed = _threads.erase(tracker.get()); + assert(was_removed); +} + +void +CpuUsage::add_thread(ThreadTracker::SP tracker) +{ + Guard guard(_lock); + if (_sampling) { + _pending_add.push_back(std::move(tracker)); + } else { + do_add_thread(guard, std::move(tracker)); + } +} + +void +CpuUsage::remove_thread(ThreadTracker::SP tracker) +{ + Guard guard(_lock); + if (_sampling) { + _pending_remove.push_back(std::move(tracker)); + } else { + do_remove_thread(guard, std::move(tracker)); + } +} + +void +CpuUsage::handle_pending(const Guard &guard) +{ + for (auto &thread: _pending_add) { + do_add_thread(guard, std::move(thread)); + } + _pending_add.clear(); + for (auto &thread: _pending_remove) { + do_remove_thread(guard, std::move(thread)); + } + _pending_remove.clear(); +} + +CpuUsage::TimedSample +CpuUsage::do_sample() +{ + assert(_sampling); + Sample my_sample; + std::optional<std::promise<TimedSample>> my_promise; + auto t = steady_clock::now(); + for (const auto &entry: _threads) { + my_sample.merge(entry.first->sample()); + } + { + Guard guard(_lock); + _sampling = false; + handle_pending(guard); + if (_conflict) { + my_promise = std::move(_conflict->sample_promise); + _conflict.reset(); + } + my_sample.merge(_usage); + _usage = my_sample; + } + auto total = cpu_usage::total_cpu_usage(); + for (size_t i = 0; i < index_of(Category::OTHER); ++i) { + total -= my_sample[i]; + } + my_sample[Category::OTHER] = std::max(total, duration::zero()); + TimedSample result{t, my_sample}; + if (my_promise.has_value()) { + my_promise.value().set_value(result); + } + return result; +} + +CpuUsage::TimedSample +CpuUsage::sample_or_wait() +{ + std::shared_future<TimedSample> my_future; + { + Guard guard(_lock); + if (_sampling) { + if (!_conflict) { + _conflict = std::make_unique<SampleConflict>(); + } + my_future = _conflict->future_sample; + _conflict->waiters++; + } else { + _sampling = true; + } + } + if (my_future.valid()) { + return my_future.get(); + } else { + return do_sample(); + } +} + +CpuUsage::TimedSample +CpuUsage::sample() +{ + return self().sample_or_wait(); +} + +Runnable::init_fun_t +CpuUsage::wrap(Runnable::init_fun_t init, Category cat) +{ + return [init,cat](Runnable &target) { + auto my_usage = CpuUsage::use(cat); + return init(target); + }; +} + +Executor::Task::UP +CpuUsage::wrap(Executor::Task::UP task, Category cat) +{ + struct CpuTask : Executor::Task { + UP task; + Category cat; + CpuTask(UP task_in, Category cat_in) + : task(std::move(task_in)), cat(cat_in) {} + void run() override { + auto my_usage = CpuUsage::use(cat); + task->run(); + } + }; + return std::make_unique<CpuTask>(std::move(task), cat); +} + } // namespace diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h index 09509a984b5..31309d88f5f 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.h +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h @@ -1,25 +1,218 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once +#include "runnable.h" +#include "executor.h" +#include "spin_lock.h" #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/stllike/string.h> +#include <array> #include <memory> +#include <future> +#include <vector> +#include <map> namespace vespalib { namespace cpu_usage { /** + * Samples the total CPU usage of this process so far. + **/ +duration total_cpu_usage() noexcept; + +/** * Samples the total CPU usage of the thread that created it. Note * that this must not be used after thread termination. Enables * sampling the CPU usage of a thread from outside the thread. **/ struct ThreadSampler { using UP = std::unique_ptr<ThreadSampler>; - virtual duration sample() const = 0; + virtual duration sample() const noexcept = 0; virtual ~ThreadSampler() {} }; -ThreadSampler::UP create_thread_sampler(bool force_mock_impl = false, double expected_load = 0.16); +ThreadSampler::UP create_thread_sampler(bool force_mock_impl = false, double expected_util = 0.16); } // cpu_usage +/** + * Tracks accumulative cpu usage across threads and work + * categories. Use the 'use' function to signal what kind of CPU you + * are using in the current thread. Use the 'sample' function to get a + * complete view of CPU usage so far. + * + * The 'use' function returns a MyUsage object that needs to be kept + * alive for as long as the current thread should contribute to the + * specified cpu use. Note that MyUsage instances may shadow each + * other, but must be destructed in reverse construction order. + **/ +class CpuUsage +{ +public: + // The kind of work performed by a thread. Used to separate + // different kinds of CPU usage. Note that categories are + // exclusive/non-overlapping; a thread can only perform one kind + // of CPU work at any specific time. + enum class Category { + SETUP = 0, // usage related to system setup (init/(re-)config/etc.) + READ = 1, // usage related to reading data from the system + WRITE = 2, // usage related to writing data to the system + COMPACT = 3, // usage related to internal data re-structuring + OTHER = 4 // all other cpu usage not in the categories above + }; + static vespalib::string &name_of(Category cat); + static constexpr size_t index_of(Category cat) { return static_cast<size_t>(cat); } + static constexpr size_t num_categories = 5; + + template <typename T> + class PerCategory { + private: + std::array<T,num_categories> _array; + public: + PerCategory() : _array() {} + size_t size() const { return _array.size(); } + T &operator[](size_t idx) { return _array[idx]; } + T &operator[](Category cat) { return _array[index_of(cat)]; } + const T &operator[](size_t idx) const { return _array[idx]; } + const T &operator[](Category cat) const { return _array[index_of(cat)]; } + }; + + // A sample contains how much CPU has been spent in each category. + class Sample : public PerCategory<duration> { + public: + void merge(const Sample &rhs) { + for (size_t i = 0; i < size(); ++i) { + (*this)[i] += rhs[i]; + } + } + }; + + // a sample tagged with the time it was taken + using TimedSample = std::pair<steady_time, Sample>; + + // Used by threads to signal what kind of CPU they are currently + // using. The thread will contribute to the declared CPU usage + // category while this object lives. MyUsage instances may shadow + // each other, but must be destructed in reverse construction + // order. The preferred way to use this class is by doing: + // + // auto my_usage = CpuUsage::use(my_cat); + class MyUsage { + private: + Category _old_cat; + static Category set_cpu_category_for_this_thread(Category cat) noexcept; + public: + MyUsage(Category cat) + : _old_cat(set_cpu_category_for_this_thread(cat)) {} + MyUsage(MyUsage &&) = delete; + MyUsage(const MyUsage &) = delete; + MyUsage &operator=(MyUsage &&) = delete; + MyUsage &operator=(const MyUsage &) = delete; + ~MyUsage() { set_cpu_category_for_this_thread(_old_cat); } + }; + + // grant extra access for testing + struct Test; + +private: + using Guard = std::lock_guard<SpinLock>; + + // Used when multiple threads call the 'sample' function at the + // same time. One thread will sample while the others will wait + // for the result. + struct SampleConflict { + std::promise<TimedSample> sample_promise; + std::shared_future<TimedSample> future_sample; + size_t waiters; + SampleConflict() : sample_promise(), + future_sample(sample_promise.get_future()), + waiters(0) {} + }; + + // Interface used to perform destructive sampling of the CPU spent + // in various categories since the last time it was sampled. + struct ThreadTracker { + using SP = std::shared_ptr<ThreadTracker>; + virtual Sample sample() noexcept = 0; + virtual ~ThreadTracker() {} + }; + + class ThreadTrackerImpl : public ThreadTracker { + private: + SpinLock _lock; + Category _cat; + duration _old_usage; + cpu_usage::ThreadSampler::UP _sampler; + Sample _pending; + + public: + ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler); + // only called by owning thread + Category set_category(Category new_cat) noexcept; + Sample sample() noexcept override; + }; + + SpinLock _lock; + Sample _usage; + std::map<ThreadTracker*,ThreadTracker::SP> _threads; + bool _sampling; + std::unique_ptr<SampleConflict> _conflict; + std::vector<ThreadTracker::SP> _pending_add; + std::vector<ThreadTracker::SP> _pending_remove; + + CpuUsage(); + CpuUsage(CpuUsage &&) = delete; + CpuUsage(const CpuUsage &) = delete; + CpuUsage &operator=(CpuUsage &&) = delete; + CpuUsage &operator=(const CpuUsage &) = delete; + + static CpuUsage &self(); + + void do_add_thread(const Guard &guard, ThreadTracker::SP tracker); + void do_remove_thread(const Guard &guard, ThreadTracker::SP tracker); + + void add_thread(ThreadTracker::SP tracker); + void remove_thread(ThreadTracker::SP tracker); + + void handle_pending(const Guard &guard); + TimedSample do_sample(); + TimedSample sample_or_wait(); + +public: + static MyUsage use(Category cat) { return MyUsage(cat); } + static TimedSample sample(); + static Runnable::init_fun_t wrap(Runnable::init_fun_t init, Category cat); + static Executor::Task::UP wrap(Executor::Task::UP task, Category cat); +}; + +/** + * Simple class used to track cpu utilization over time. + **/ +class CpuUtil +{ +private: + duration _min_delay; + CpuUsage::TimedSample _old_sample; + CpuUsage::PerCategory<double> _util; + +public: + CpuUtil(duration min_delay = 850ms) + : _min_delay(min_delay), + _old_sample(CpuUsage::sample()), + _util() {} + + CpuUsage::PerCategory<double> get_util() { + if (steady_clock::now() >= (_old_sample.first + _min_delay)) { + auto new_sample = CpuUsage::sample(); + auto dt = to_s(new_sample.first - _old_sample.first); + for (size_t i = 0; i < _util.size(); ++i) { + _util[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt; + } + _old_sample = new_sample; + } + return _util; + } +}; + } // namespace diff --git a/vespalib/src/vespa/vespalib/util/nice.cpp b/vespalib/src/vespa/vespalib/util/nice.cpp new file mode 100644 index 00000000000..cefaaa0347b --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/nice.cpp @@ -0,0 +1,32 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "nice.h" + +#include <unistd.h> +#include <algorithm> + +namespace vespalib { + +namespace { + +void set_nice_value(double how_nice) { + if (how_nice > 0.0) { +#ifndef __APPLE__ + int now = nice(0); + int max = 19; + int max_inc = (max - now); + nice(std::min(max_inc, int(how_nice * (max_inc + 1)))); +#endif + } +} + +} + +Runnable::init_fun_t be_nice(Runnable::init_fun_t init, double how_nice) { + return [init,how_nice](Runnable &target) { + set_nice_value(how_nice); + return init(target); + }; +} + +} // namespace diff --git a/vespalib/src/vespa/vespalib/util/nice.h b/vespalib/src/vespa/vespalib/util/nice.h new file mode 100644 index 00000000000..7c3873337eb --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/nice.h @@ -0,0 +1,16 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "runnable.h" + +namespace vespalib { + +// Wraps an init function inside another init function that adjusts +// the niceness of the thread being started. The 'how_nice' parameter +// is a value from 0.0 (not nice at all) to 1.0 (super nice). It will +// be mapped into an actual nice value in a linear fashion based on +// the nice value space that is still available. + +Runnable::init_fun_t be_nice(Runnable::init_fun_t init, double how_nice); + +} diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.h b/vespalib/src/vespa/vespalib/util/rcuvector.h index dd4fa660279..00d050fa8d1 100644 --- a/vespalib/src/vespa/vespalib/util/rcuvector.h +++ b/vespalib/src/vespa/vespalib/util/rcuvector.h @@ -122,6 +122,7 @@ public: void reset(); void shrink(size_t newSize) __attribute__((noinline)); void replaceVector(ArrayType replacement); + ArrayType create_replacement_vector() const { return _data.create(); } }; template <typename T> diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.hpp b/vespalib/src/vespa/vespalib/util/rcuvector.hpp index 3c455149dfd..7c70539da0e 100644 --- a/vespalib/src/vespa/vespalib/util/rcuvector.hpp +++ b/vespalib/src/vespa/vespalib/util/rcuvector.hpp @@ -42,7 +42,7 @@ template <typename T> void RcuVectorBase<T>::reset() { // Assumes no readers at this moment - ArrayType().swap(_data); + _data.reset(); _data.reserve(16); } @@ -52,7 +52,7 @@ RcuVectorBase<T>::~RcuVectorBase() = default; template <typename T> void RcuVectorBase<T>::expand(size_t newCapacity) { - ArrayType tmpData; + auto tmpData = create_replacement_vector(); tmpData.reserve(newCapacity); for (const T & v : _data) { tmpData.push_back_fast(v); @@ -91,7 +91,7 @@ RcuVectorBase<T>::shrink(size_t newSize) return; } if (!_data.try_unreserve(wantedCapacity)) { - ArrayType tmpData; + auto tmpData = create_replacement_vector(); tmpData.reserve(wantedCapacity); tmpData.resize(newSize); for (uint32_t i = 0; i < newSize; ++i) { diff --git a/vespalib/src/vespa/vespalib/util/runnable.cpp b/vespalib/src/vespa/vespalib/util/runnable.cpp index c67c4696d88..3b75efe93f6 100644 --- a/vespalib/src/vespa/vespalib/util/runnable.cpp +++ b/vespalib/src/vespa/vespalib/util/runnable.cpp @@ -4,4 +4,11 @@ namespace vespalib { +int +Runnable::default_init_function(Runnable &target) +{ + target.run(); + return 1; +} + } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/runnable.h b/vespalib/src/vespa/vespalib/util/runnable.h index 43144ebc2cd..bc89a9decac 100644 --- a/vespalib/src/vespa/vespalib/util/runnable.h +++ b/vespalib/src/vespa/vespalib/util/runnable.h @@ -23,6 +23,7 @@ namespace vespalib { struct Runnable { using UP = std::unique_ptr<Runnable>; using init_fun_t = std::function<int(Runnable&)>; + static int default_init_function(Runnable &target); /** * Entry point called by the running thread @@ -36,4 +37,3 @@ struct Runnable { }; } // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp new file mode 100644 index 00000000000..f5a1c117cc8 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp @@ -0,0 +1,434 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "shared_operation_throttler.h" +#include <atomic> +#include <condition_variable> +#include <cassert> +#include <functional> +#include <mutex> + +namespace vespalib { + +namespace { + +class NoLimitsOperationThrottler final : public SharedOperationThrottler { +public: + NoLimitsOperationThrottler() + : SharedOperationThrottler(), + _refs(0u) + { + } + ~NoLimitsOperationThrottler() override { + assert(_refs.load(std::memory_order_acquire) == 0u); + } + Token blocking_acquire_one() noexcept override { + ++_refs; + return Token(this, TokenCtorTag{}); + } + Token blocking_acquire_one(vespalib::duration) noexcept override { + ++_refs; + return Token(this, TokenCtorTag{}); + } + Token try_acquire_one() noexcept override { + ++_refs; + return Token(this, TokenCtorTag{}); + } + uint32_t current_window_size() const noexcept override { return 0; } + uint32_t waiting_threads() const noexcept override { return 0; } + void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ } +private: + void release_one() noexcept override { --_refs; } + std::atomic<uint32_t> _refs; +}; + +/** + * Effectively a 1-1 transplant of the MessageBus DynamicThrottlePolicy, but + * without an underlying StaticThrottlePolicy and with no need for individual + * MessageBus Message/Reply objects. + * + * Please keep the underlying algorithm in sync with the Java implementation, + * as that is considered the source of truth. For descriptions of the various + * parameters, also see the Java code: + * messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java + */ +class DynamicThrottlePolicy { + SharedOperationThrottler::DynamicThrottleParams _active_params; + std::function<steady_time()> _time_provider; + uint32_t _num_sent; + uint32_t _num_ok; + double _resize_rate; + uint64_t _resize_time; + uint64_t _time_of_last_message; + uint64_t _idle_time_period; + double _efficiency_threshold; + double _window_size_increment; + double _window_size; + double _max_window_size; + double _min_window_size; + double _decrement_factor; + double _window_size_backoff; + double _weight; + double _local_max_throughput; +public: + DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params, + std::function<steady_time()> time_provider); + + void set_window_size_increment(double window_size_increment) noexcept; + void set_window_size_backoff(double window_size_backoff) noexcept; + void set_resize_rate(double resize_rate) noexcept; + void set_max_window_size(double max_size) noexcept; + + void set_min_window_size(double min_size) noexcept; + void set_window_size_decrement_factor(double decrement_factor) noexcept; + + void configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept; + + [[nodiscard]] uint32_t current_window_size() const noexcept { + return static_cast<uint32_t>(_window_size); + } + [[nodiscard]] bool has_spare_capacity(uint32_t pending_count) noexcept; + void process_request() noexcept; + void process_response(bool success) noexcept; + +private: + void internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept; + + [[nodiscard]] uint64_t current_time_as_millis() noexcept { + return count_ms(_time_provider().time_since_epoch()); + } +}; + +DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params, + std::function<steady_time()> time_provider) + : _active_params(params), + _time_provider(std::move(time_provider)), + _num_sent(0), + _num_ok(0), + _resize_rate(3.0), + _resize_time(0), + _time_of_last_message(current_time_as_millis()), + _idle_time_period(60000), + _efficiency_threshold(1), + _window_size_increment(20), + _window_size(_window_size_increment), + _max_window_size(INT_MAX), + _min_window_size(_window_size_increment), + _decrement_factor(2.0), + _window_size_backoff(0.9), + _weight(1), + _local_max_throughput(0) +{ + internal_unconditional_configure(_active_params); +} + +void +DynamicThrottlePolicy::set_window_size_increment(double window_size_increment) noexcept +{ + _window_size_increment = window_size_increment; + _window_size = std::max(_window_size, _window_size_increment); +} + +void +DynamicThrottlePolicy::set_window_size_backoff(double window_size_backoff) noexcept +{ + _window_size_backoff = std::max(0.0, std::min(1.0, window_size_backoff)); +} + +void +DynamicThrottlePolicy::set_resize_rate(double resize_rate) noexcept +{ + _resize_rate = std::max(2.0, resize_rate); +} + +void +DynamicThrottlePolicy::set_max_window_size(double max_size) noexcept +{ + _max_window_size = max_size; +} + +void +DynamicThrottlePolicy::set_min_window_size(double min_size) noexcept +{ + _min_window_size = min_size; + _window_size = std::max(_min_window_size, _window_size_increment); +} + +void +DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor) noexcept +{ + _decrement_factor = decrement_factor; +} + +void +DynamicThrottlePolicy::internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept +{ + // We use setters for convenience, since setting one parameter may imply setting others, + // based on it, and there's frequently min/max capping of values. + set_window_size_increment(params.window_size_increment); + set_min_window_size(params.min_window_size); + set_max_window_size(params.max_window_size); + set_resize_rate(params.resize_rate); + set_window_size_decrement_factor(params.window_size_decrement_factor); + set_window_size_backoff(params.window_size_backoff); +} + +void +DynamicThrottlePolicy::configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept +{ + // To avoid any noise where setting parameters on the throttler may implicitly reduce the + // current window size (even though this isn't _currently_ the case), don't invoke any internal + // reconfiguration code unless the parameters have actually changed. + if (params != _active_params) { + internal_unconditional_configure(params); + _active_params = params; + } +} + +bool +DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept +{ + const uint64_t time = current_time_as_millis(); + if ((time - _time_of_last_message) > _idle_time_period) { + _window_size = std::max(_min_window_size, std::min(_window_size, pending_count + _window_size_increment)); + } + _time_of_last_message = time; + const auto window_size_floored = static_cast<uint32_t>(_window_size); + // Use floating point window sizes, so the algorithm sees the difference between 1.1 and 1.9 window size. + const bool carry = _num_sent < ((_window_size * _resize_rate) * (_window_size - window_size_floored)); + return pending_count < (window_size_floored + (carry ? 1 : 0)); +} + +void +DynamicThrottlePolicy::process_request() noexcept +{ + if (++_num_sent < (_window_size * _resize_rate)) { + return; + } + + const uint64_t time = current_time_as_millis(); + const double elapsed = time - _resize_time; + _resize_time = time; + + const double throughput = _num_ok / elapsed; + _num_sent = 0; + _num_ok = 0; + + if (throughput > _local_max_throughput) { + _local_max_throughput = throughput; + _window_size += _weight * _window_size_increment; + } else { + // scale up/down throughput for comparing to window size + double period = 1; + while ((throughput * (period / _window_size)) < 2) { + period *= 10; + } + while ((throughput * (period / _window_size)) > 2) { + period *= 0.1; + } + const double efficiency = throughput * (period / _window_size); + + if (efficiency < _efficiency_threshold) { + _window_size = std::min(_window_size * _window_size_backoff, + _window_size - _decrement_factor * _window_size_increment); + _local_max_throughput = 0; + } else { + _window_size += _weight * _window_size_increment; + } + } + _window_size = std::max(_min_window_size, _window_size); + _window_size = std::min(_max_window_size, _window_size); +} + +void +DynamicThrottlePolicy::process_response(bool success) noexcept +{ + if (success) { + ++_num_ok; + } +} + +class DynamicOperationThrottler final : public SharedOperationThrottler { + mutable std::mutex _mutex; + std::condition_variable _cond; + DynamicThrottlePolicy _throttle_policy; + uint32_t _pending_ops; + uint32_t _waiting_threads; +public: + DynamicOperationThrottler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider); + ~DynamicOperationThrottler() override; + + Token blocking_acquire_one() noexcept override; + Token blocking_acquire_one(vespalib::duration timeout) noexcept override; + Token try_acquire_one() noexcept override; + uint32_t current_window_size() const noexcept override; + uint32_t waiting_threads() const noexcept override; + void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override; +private: + void release_one() noexcept override; + // Non-const since actually checking the send window of a dynamic throttler might change + // it if enough time has passed. + [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept; + void add_one_to_active_window_size() noexcept; + void subtract_one_from_active_window_size() noexcept; +}; + +DynamicOperationThrottler::DynamicOperationThrottler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider) + : _mutex(), + _cond(), + _throttle_policy(params, std::move(time_provider)), + _pending_ops(0), + _waiting_threads(0) +{ +} + +DynamicOperationThrottler::~DynamicOperationThrottler() +{ + assert(_pending_ops == 0u); +} + +bool +DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept +{ + return _throttle_policy.has_spare_capacity(_pending_ops); +} + +void +DynamicOperationThrottler::add_one_to_active_window_size() noexcept +{ + _throttle_policy.process_request(); + ++_pending_ops; +} + +void +DynamicOperationThrottler::subtract_one_from_active_window_size() noexcept +{ + _throttle_policy.process_response(true); // TODO support failure push-back + assert(_pending_ops > 0); + --_pending_ops; +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + if (!has_spare_capacity_in_active_window()) { + ++_waiting_threads; + _cond.wait(lock, [&] { + return has_spare_capacity_in_active_window(); + }); + --_waiting_threads; + } + add_one_to_active_window_size(); + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept +{ + std::unique_lock lock(_mutex); + if (!has_spare_capacity_in_active_window()) { + ++_waiting_threads; + const bool accepted = _cond.wait_for(lock, timeout, [&] { + return has_spare_capacity_in_active_window(); + }); + --_waiting_threads; + if (!accepted) { + return Token(); + } + } + add_one_to_active_window_size(); + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::try_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + if (!has_spare_capacity_in_active_window()) { + return Token(); + } + add_one_to_active_window_size(); + return Token(this, TokenCtorTag{}); +} + +void +DynamicOperationThrottler::release_one() noexcept +{ + std::unique_lock lock(_mutex); + subtract_one_from_active_window_size(); + // Only wake up a waiting thread if doing so would possibly result in success. + if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) { + lock.unlock(); + _cond.notify_one(); + } +} + +uint32_t +DynamicOperationThrottler::current_window_size() const noexcept +{ + std::unique_lock lock(_mutex); + return _throttle_policy.current_window_size(); +} + +uint32_t +DynamicOperationThrottler::waiting_threads() const noexcept +{ + std::unique_lock lock(_mutex); + return _waiting_threads; +} + +void +DynamicOperationThrottler::reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept +{ + std::unique_lock lock(_mutex); + _throttle_policy.configure(params); +} + +} // anonymous namespace + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_unlimited_throttler() +{ + return std::make_unique<NoLimitsOperationThrottler>(); +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params) +{ + return std::make_unique<DynamicOperationThrottler>(params, []() noexcept { return steady_clock::now(); }); +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider) +{ + return std::make_unique<DynamicOperationThrottler>(params, std::move(time_provider)); +} + +DynamicOperationThrottler::Token::~Token() +{ + if (_throttler) { + _throttler->release_one(); + } +} + +void +DynamicOperationThrottler::Token::reset() noexcept +{ + if (_throttler) { + _throttler->release_one(); + _throttler = nullptr; + } +} + +DynamicOperationThrottler::Token& +DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept +{ + reset(); + _throttler = rhs._throttler; + rhs._throttler = nullptr; + return *this; +} + +} diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h new file mode 100644 index 00000000000..030935339ed --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h @@ -0,0 +1,100 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "time.h" +#include <functional> +#include <memory> +#include <optional> +#include <limits.h> + +namespace vespalib { + +/** + * Operation throttler that is intended to provide global throttling of + * async operations across multiple threads. A throttler wraps a logical + * max pending window size of in-flight operations. Depending on the + * throttler implementation, the window size may expand and shrink dynamically. + * Exactly how and when this happens is unspecified. + * + * Offers both polling and (timed, non-timed) blocking calls for acquiring + * a throttle token. If the returned token is valid, the caller may proceed + * to invoke the asynchronous operation. + * + * The window slot taken up by a valid throttle token is implicitly freed up + * when the token is destroyed. + * + * All operations on the throttler are thread safe. + */ +class SharedOperationThrottler { +protected: + struct TokenCtorTag {}; // Make available to subclasses for token construction. +public: + class Token { + SharedOperationThrottler* _throttler; + public: + constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {} + constexpr Token() noexcept : _throttler(nullptr) {} + constexpr Token(Token&& rhs) noexcept + : _throttler(rhs._throttler) + { + rhs._throttler = nullptr; + } + Token& operator=(Token&& rhs) noexcept; + ~Token(); + + Token(const Token&) = delete; + Token& operator=(const Token&) = delete; + + [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); } + void reset() noexcept; + }; + + virtual ~SharedOperationThrottler() = default; + + // Acquire a valid throttling token, uninterruptedly blocking until one can be obtained. + [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; + // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be + // available. If the timeout is exceeded without any tokens becoming available, an + // invalid token will be returned. + [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; + // Attempt to acquire a valid throttling token if one is immediately available. + // An invalid token will be returned if none is available. Never blocks (other than + // when contending for the internal throttler mutex). + [[nodiscard]] virtual Token try_acquire_one() noexcept = 0; + + // May return 0, in which case the window size is unlimited. + [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0; + + // Exposed for unit testing only. + [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; + + struct DynamicThrottleParams { + uint32_t window_size_increment = 20; + uint32_t min_window_size = 20; + uint32_t max_window_size = INT_MAX; // signed max to be 1-1 compatible with Java defaults + double resize_rate = 3.0; + double window_size_decrement_factor = 1.2; + double window_size_backoff = 0.95; + + bool operator==(const DynamicThrottleParams&) const noexcept = default; + bool operator!=(const DynamicThrottleParams&) const noexcept = default; + }; + + // No-op if underlying throttler does not use a dynamic policy, or if the supplied + // parameters are equal to the current configuration. + // FIXME leaky abstraction alert! + virtual void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept = 0; + + // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking) + static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); + + // Creates a throttler that uses a DynamicThrottlePolicy under the hood + static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params); + static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider); +private: + // Exclusively called from a valid Token. Thread safe. + virtual void release_one() noexcept = 0; +}; + +} diff --git a/vespalib/src/vespa/vespalib/util/shared_string_repo.h b/vespalib/src/vespa/vespalib/util/shared_string_repo.h index ec65b942d88..7ba59937c7c 100644 --- a/vespalib/src/vespa/vespalib/util/shared_string_repo.h +++ b/vespalib/src/vespa/vespalib/util/shared_string_repo.h @@ -8,6 +8,7 @@ #include <vespa/vespalib/stllike/string.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/stllike/identity.h> +#include <vespa/vespalib/stllike/allocator.h> #include <vespa/vespalib/stllike/hashtable.hpp> #include <xxhash.h> #include <mutex> @@ -95,6 +96,7 @@ private: return (--_ref_cnt == 0); } }; + using EntryVector = std::vector<Entry, allocator_large<Entry>>; struct Key { uint32_t idx; uint32_t hash; @@ -104,8 +106,8 @@ private: uint32_t operator()(const AltKey &key) const { return key.hash; } }; struct Equal { - const std::vector<Entry> &entries; - Equal(const std::vector<Entry> &entries_in) : entries(entries_in) {} + const EntryVector &entries; + Equal(const EntryVector &entries_in) : entries(entries_in) {} Equal(const Equal &rhs) = default; bool operator()(const Key &a, const Key &b) const { return (a.idx == b.idx); } bool operator()(const Key &a, const AltKey &b) const { return ((a.hash == b.hash) && (entries[a.idx].str() == b.str)); } @@ -113,10 +115,10 @@ private: using HashType = hashtable<Key,Key,Hash,Equal,Identity,hashtable_base::and_modulator>; private: - mutable SpinLock _lock; - std::vector<Entry> _entries; - uint32_t _free; - HashType _hash; + mutable SpinLock _lock; + EntryVector _entries; + uint32_t _free; + HashType _hash; void make_entries(size_t hint); @@ -291,7 +293,7 @@ public: // A collection of string handles with ownership class Handles { private: - std::vector<string_id> _handles; + StringIdVector _handles; public: Handles(); Handles(Handles &&rhs); @@ -309,7 +311,7 @@ public: string_id id = _repo.copy(handle); _handles.push_back(id); } - const std::vector<string_id> &view() const { return _handles; } + const StringIdVector &view() const { return _handles; } }; }; diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index ab83d4e05fd..99ab298864f 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -60,9 +60,10 @@ Signal::Signal() noexcept {} Signal::~Signal() = default; -SimpleThreadBundle::Pool::Pool(size_t bundleSize) +SimpleThreadBundle::Pool::Pool(size_t bundleSize, init_fun_t init_fun) : _lock(), _bundleSize(bundleSize), + _init_fun(init_fun), _bundles() { } @@ -86,7 +87,7 @@ SimpleThreadBundle::Pool::obtain() return ret; } } - return std::make_unique<SimpleThreadBundle>(_bundleSize); + return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun); } void @@ -99,7 +100,7 @@ SimpleThreadBundle::Pool::release(SimpleThreadBundle::UP bundle) //----------------------------------------------------------------------------- -SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy) +SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Runnable::init_fun_t init_fun, Strategy strategy) : _work(), _signals(), _workers(), @@ -134,7 +135,7 @@ SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy) _hook = std::move(hook); } else { size_t signal_idx = (strategy == USE_BROADCAST) ? 0 : (i - 1); - _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], std::move(hook))); + _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], init_fun, std::move(hook))); } } } @@ -175,19 +176,19 @@ SimpleThreadBundle::run(const std::vector<Runnable*> &targets) latch.await(); } -SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::UP h) - : thread(*this, simple_thread_bundle_executor), - signal(s), - hook(std::move(h)) +SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h) + : thread(*this, std::move(init_fun)), + signal(s), + hook(std::move(h)) { thread.start(); } + void SimpleThreadBundle::Worker::run() { for (size_t gen = 0; signal.wait(gen) > 0; ) { - hook->run(); -} - + hook->run(); + } } } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h index d9a29ee7bef..b7434d09ac3 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h @@ -86,8 +86,9 @@ struct Signal { class SimpleThreadBundle : public ThreadBundle { public: - typedef fixed_thread_bundle::Work Work; - typedef fixed_thread_bundle::Signal Signal; + using Work = fixed_thread_bundle::Work; + using Signal = fixed_thread_bundle::Signal; + using init_fun_t = Runnable::init_fun_t; typedef std::unique_ptr<SimpleThreadBundle> UP; enum Strategy { USE_SIGNAL_LIST, USE_SIGNAL_TREE, USE_BROADCAST }; @@ -97,10 +98,12 @@ public: private: std::mutex _lock; size_t _bundleSize; + init_fun_t _init_fun; std::vector<SimpleThreadBundle*> _bundles; public: - Pool(size_t bundleSize); + Pool(size_t bundleSize, init_fun_t init_fun); + Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {} ~Pool(); SimpleThreadBundle::UP obtain(); void release(SimpleThreadBundle::UP bundle); @@ -112,7 +115,7 @@ private: Thread thread; Signal &signal; Runnable::UP hook; - Worker(Signal &s, Runnable::UP h); + Worker(Signal &s, init_fun_t init_fun, Runnable::UP h); void run() override; }; @@ -122,7 +125,9 @@ private: Runnable::UP _hook; public: - SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST); + SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy = USE_SIGNAL_LIST); + SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST) + : SimpleThreadBundle(size, Runnable::default_init_function, strategy) {} ~SimpleThreadBundle(); size_t size() const override; void run(const std::vector<Runnable*> &targets) override; diff --git a/vespalib/src/vespa/vespalib/util/spin_lock.h b/vespalib/src/vespa/vespalib/util/spin_lock.h index abc2b89106f..3af7bc0fd55 100644 --- a/vespalib/src/vespa/vespalib/util/spin_lock.h +++ b/vespalib/src/vespa/vespalib/util/spin_lock.h @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + #include <atomic> #include <thread> diff --git a/vespalib/src/vespa/vespalib/util/string_id.h b/vespalib/src/vespa/vespalib/util/string_id.h index 7a72feee64a..7fec1da0bb8 100644 --- a/vespalib/src/vespa/vespalib/util/string_id.h +++ b/vespalib/src/vespa/vespalib/util/string_id.h @@ -2,7 +2,8 @@ #pragma once -#include <cstdint> +#include <vespa/vespalib/stllike/allocator.h> +#include <vector> namespace vespalib { @@ -38,4 +39,6 @@ public: constexpr bool operator!=(const string_id &rhs) const noexcept { return (_id != rhs._id); } }; +using StringIdVector = std::vector<string_id, vespalib::allocator_large<string_id>>; + } |