summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/CMakeLists.txt2
-rw-r--r--vespalib/src/tests/alloc/alloc_test.cpp55
-rw-r--r--vespalib/src/tests/array/array_test.cpp67
-rw-r--r--vespalib/src/tests/cpu_usage/cpu_usage_test.cpp384
-rw-r--r--vespalib/src/tests/datastore/array_store/array_store_test.cpp19
-rw-r--r--vespalib/src/tests/datastore/datastore/.gitignore1
-rw-r--r--vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp2
-rw-r--r--vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp2
-rw-r--r--vespalib/src/tests/datastore/unique_store/unique_store_test.cpp17
-rw-r--r--vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp15
-rw-r--r--vespalib/src/tests/nice/CMakeLists.txt10
-rw-r--r--vespalib/src/tests/nice/nice_test.cpp98
-rw-r--r--vespalib/src/tests/shared_operation_throttler/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp212
-rw-r--r--vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp6
-rw-r--r--vespalib/src/tests/spin_lock/spin_lock_test.cpp10
-rw-r--r--vespalib/src/tests/stllike/asciistream_test.cpp23
-rw-r--r--vespalib/src/tests/util/rcuvector/rcuvector_test.cpp72
-rw-r--r--vespalib/src/tests/wakeup/wakeup_bench.cpp42
-rw-r--r--vespalib/src/vespa/vespalib/data/slime/array_value.h3
-rw-r--r--vespalib/src/vespa/vespalib/datastore/CMakeLists.txt3
-rw-r--r--vespalib/src/vespa/vespalib/datastore/array_store.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/datastore/array_store.h22
-rw-r--r--vespalib/src/vespa/vespalib/datastore/array_store.hpp31
-rw-r--r--vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.cpp20
-rw-r--r--vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.h39
-rw-r--r--vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.hpp39
-rw-r--r--vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.cpp14
-rw-r--r--vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.h37
-rw-r--r--vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.hpp26
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store.h6
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store.hpp8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp5
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.cpp25
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.h27
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.hpp29
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp22
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h12
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp6
-rw-r--r--vespalib/src/vespa/vespalib/stllike/asciistream.cpp200
-rw-r--r--vespalib/src/vespa/vespalib/stllike/asciistream.h2
-rw-r--r--vespalib/src/vespa/vespalib/test/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/test/thread_meets.cpp12
-rw-r--r--vespalib/src/vespa/vespalib/test/thread_meets.h34
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt2
-rw-r--r--vespalib/src/vespa/vespalib/util/alloc.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/alloc.h7
-rw-r--r--vespalib/src/vespa/vespalib/util/array.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/array.hpp15
-rw-r--r--vespalib/src/vespa/vespalib/util/arrayqueue.hpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/atomic.h151
-rw-r--r--vespalib/src/vespa/vespalib/util/child_process.cpp20
-rw-r--r--vespalib/src/vespa/vespalib/util/child_process.h13
-rw-r--r--vespalib/src/vespa/vespalib/util/count_down_latch.h13
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.cpp253
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.h197
-rw-r--r--vespalib/src/vespa/vespalib/util/nice.cpp32
-rw-r--r--vespalib/src/vespa/vespalib/util/nice.h16
-rw-r--r--vespalib/src/vespa/vespalib/util/rcuvector.h1
-rw-r--r--vespalib/src/vespa/vespalib/util/rcuvector.hpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/runnable.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/runnable.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp434
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.h100
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_string_repo.h18
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp23
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.h15
-rw-r--r--vespalib/src/vespa/vespalib/util/spin_lock.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/string_id.h5
71 files changed, 2753 insertions, 283 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index eb7e1f7d4c0..09cf354d027 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -85,6 +85,7 @@ vespa_define_module(
src/tests/net/tls/policy_checking_certificate_verifier
src/tests/net/tls/protocol_snooping
src/tests/net/tls/transport_options
+ src/tests/nice
src/tests/objects/nbostream
src/tests/optimized
src/tests/overload
@@ -101,6 +102,7 @@ vespa_define_module(
src/tests/require
src/tests/runnable_pair
src/tests/sha1
+ src/tests/shared_operation_throttler
src/tests/shared_string_repo
src/tests/sharedptr
src/tests/signalhandler
diff --git a/vespalib/src/tests/alloc/alloc_test.cpp b/vespalib/src/tests/alloc/alloc_test.cpp
index 2b6d3ee4613..da41d75b479 100644
--- a/vespalib/src/tests/alloc/alloc_test.cpp
+++ b/vespalib/src/tests/alloc/alloc_test.cpp
@@ -139,22 +139,28 @@ TEST("rounding of large mmaped buffer") {
EXPECT_EQUAL(MemoryAllocator::HUGEPAGE_SIZE*12ul, buf.size());
}
+void verifyExtension(Alloc& buf, size_t currSZ, size_t newSZ) {
+ bool expectSuccess = (currSZ != newSZ);
+ void* oldPtr = buf.get();
+ EXPECT_EQUAL(currSZ, buf.size());
+ EXPECT_EQUAL(expectSuccess, buf.resize_inplace(currSZ + 1));
+ EXPECT_EQUAL(oldPtr, buf.get());
+ EXPECT_EQUAL(newSZ, buf.size());
+}
+
TEST("heap alloc can not be extended") {
Alloc buf = Alloc::allocHeap(100);
- void * oldPtr = buf.get();
- EXPECT_EQUAL(100ul, buf.size());
- EXPECT_FALSE(buf.resize_inplace(101));
- EXPECT_EQUAL(oldPtr, buf.get());
- EXPECT_EQUAL(100ul, buf.size());
+ verifyExtension(buf, 100, 100);
+}
+
+TEST("mmap alloc cannot be extended from zero") {
+ Alloc buf = Alloc::allocMMap(0);
+ verifyExtension(buf, 0, 0);
}
TEST("auto alloced heap alloc can not be extended") {
Alloc buf = Alloc::alloc(100);
- void * oldPtr = buf.get();
- EXPECT_EQUAL(100ul, buf.size());
- EXPECT_FALSE(buf.resize_inplace(101));
- EXPECT_EQUAL(oldPtr, buf.get());
- EXPECT_EQUAL(100ul, buf.size());
+ verifyExtension(buf, 100, 100);
}
TEST("auto alloced heap alloc can not be extended, even if resize will be mmapped") {
@@ -166,15 +172,6 @@ TEST("auto alloced heap alloc can not be extended, even if resize will be mmappe
EXPECT_EQUAL(100ul, buf.size());
}
-void verifyExtension(Alloc & buf, size_t currSZ, size_t newSZ) {
- bool expectSuccess = (currSZ != newSZ);
- void * oldPtr = buf.get();
- EXPECT_EQUAL(currSZ, buf.size());
- EXPECT_EQUAL(expectSuccess, buf.resize_inplace(currSZ+1));
- EXPECT_EQUAL(oldPtr, buf.get());
- EXPECT_EQUAL(newSZ, buf.size());
-}
-
void ensureRoomForExtension(const Alloc & buf, Alloc & reserved) {
// Normally mmapping starts at the top and grows down in address space.
// Then there is no room to extend the last mapping.
@@ -253,6 +250,11 @@ TEST("heap alloc can not be shrinked") {
EXPECT_EQUAL(101ul, buf.size());
}
+TEST("heap alloc cannot be shrunk to zero") {
+ Alloc buf = Alloc::allocHeap(101);
+ EXPECT_FALSE(buf.resize_inplace(0));
+}
+
TEST("mmap alloc can be shrinked") {
Alloc buf = Alloc::allocMMap(4097);
void * oldPtr = buf.get();
@@ -262,6 +264,11 @@ TEST("mmap alloc can be shrinked") {
EXPECT_EQUAL(4_Ki, buf.size());
}
+TEST("mmap alloc cannot be shrunk to zero") {
+ Alloc buf = Alloc::allocMMap(4097);
+ EXPECT_FALSE(buf.resize_inplace(0));
+}
+
TEST("auto alloced heap alloc can not be shrinked") {
Alloc buf = Alloc::alloc(101);
void * oldPtr = buf.get();
@@ -271,6 +278,11 @@ TEST("auto alloced heap alloc can not be shrinked") {
EXPECT_EQUAL(101ul, buf.size());
}
+TEST("auto alloced heap alloc cannot be shrunk to zero") {
+ Alloc buf = Alloc::alloc(101);
+ EXPECT_FALSE(buf.resize_inplace(0));
+}
+
TEST("auto alloced mmap alloc can be shrinked") {
static constexpr size_t SZ = MemoryAllocator::HUGEPAGE_SIZE;
Alloc buf = Alloc::alloc(SZ + 1);
@@ -281,6 +293,11 @@ TEST("auto alloced mmap alloc can be shrinked") {
EXPECT_EQUAL(SZ, buf.size());
}
+TEST("auto alloced mmap alloc cannot be shrunk to zero") {
+ Alloc buf = Alloc::alloc(MemoryAllocator::HUGEPAGE_SIZE + 1);
+ EXPECT_FALSE(buf.resize_inplace(0));
+}
+
TEST("auto alloced mmap alloc can not be shrinked below HUGEPAGE_SIZE/2 + 1 ") {
static constexpr size_t SZ = MemoryAllocator::HUGEPAGE_SIZE;
Alloc buf = Alloc::alloc(SZ + 1);
diff --git a/vespalib/src/tests/array/array_test.cpp b/vespalib/src/tests/array/array_test.cpp
index 719623c9401..511c5ae11f1 100644
--- a/vespalib/src/tests/array/array_test.cpp
+++ b/vespalib/src/tests/array/array_test.cpp
@@ -2,7 +2,9 @@
#include <vespa/vespalib/stllike/string.h>
#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/test/memory_allocator_observer.h>
#include <vespa/vespalib/util/array.hpp>
+#include <vespa/vespalib/util/round_up_to_page_size.h>
#include <vespa/vespalib/util/size_literals.h>
#include <deque>
#include <atomic>
@@ -27,6 +29,11 @@ std::ostream & operator << (std::ostream & os, const Array<T> & a)
}
+using alloc::Alloc;
+using alloc::MemoryAllocator;
+using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver;
+using AllocStats = MyMemoryAllocator::Stats;
+
class Clever {
public:
Clever() : _counter(&_global) { (*_counter)++; }
@@ -324,31 +331,77 @@ TEST("test move assignment")
struct UnreserveFixture {
Array<int> arr;
- UnreserveFixture() : arr(1025, 7, alloc::Alloc::allocMMap(0))
+ UnreserveFixture() : arr(page_ints() + 1, 7, alloc::Alloc::allocMMap(0))
{
- EXPECT_EQUAL(1025u, arr.size());
- EXPECT_EQUAL(2048u, arr.capacity());
+ EXPECT_EQUAL(page_ints() + 1, arr.size());
+ EXPECT_EQUAL(2 * page_ints(), arr.capacity());
+ }
+
+ static size_t page_ints() {
+ return round_up_to_page_size(1) / sizeof(int);
}
};
TEST_F("require that try_unreserve() fails if wanted capacity >= current capacity", UnreserveFixture)
{
- EXPECT_FALSE(f.arr.try_unreserve(2048));
+ EXPECT_FALSE(f.arr.try_unreserve(2 * UnreserveFixture::page_ints()));
}
TEST_F("require that try_unreserve() fails if wanted capacity < current size", UnreserveFixture)
{
- EXPECT_FALSE(f.arr.try_unreserve(1024));
+ EXPECT_FALSE(f.arr.try_unreserve(UnreserveFixture::page_ints()));
}
TEST_F("require that try_unreserve() succeedes if mmap can be shrinked", UnreserveFixture)
{
int *oldPtr = &f.arr[0];
f.arr.resize(512);
- EXPECT_TRUE(f.arr.try_unreserve(1023));
- EXPECT_EQUAL(1_Ki, f.arr.capacity());
+ EXPECT_TRUE(f.arr.try_unreserve(UnreserveFixture::page_ints() - 1));
+ EXPECT_EQUAL(UnreserveFixture::page_ints(), f.arr.capacity());
int *newPtr = &f.arr[0];
EXPECT_EQUAL(oldPtr, newPtr);
}
+struct Fixture {
+ AllocStats stats;
+ std::unique_ptr<MemoryAllocator> allocator;
+ Alloc initial_alloc;
+ Array<int> arr;
+
+ Fixture();
+ ~Fixture();
+};
+
+Fixture::Fixture()
+ : stats(),
+ allocator(std::make_unique<MyMemoryAllocator>(stats)),
+ initial_alloc(Alloc::alloc_with_allocator(allocator.get())),
+ arr(initial_alloc)
+{
+}
+
+Fixture::~Fixture() = default;
+
+TEST_F("require that memory allocator can be set", Fixture)
+{
+ f.arr.resize(1);
+ EXPECT_EQUAL(AllocStats(1, 0), f.stats);
+}
+
+TEST_F("require that memory allocator is preserved across reset", Fixture)
+{
+ f.arr.resize(1);
+ f.arr.reset();
+ f.arr.resize(1);
+ EXPECT_EQUAL(AllocStats(2, 1), f.stats);
+}
+
+TEST_F("require that created array uses same memory allocator", Fixture)
+{
+ auto arr2 = f.arr.create();
+ EXPECT_EQUAL(AllocStats(0, 0), f.stats);
+ arr2.resize(1);
+ EXPECT_EQUAL(AllocStats(1, 0), f.stats);
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
index 98a7bd780a7..83f49d6c73b 100644
--- a/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
+++ b/vespalib/src/tests/cpu_usage/cpu_usage_test.cpp
@@ -2,11 +2,16 @@
#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/benchmark_timer.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/test/thread_meets.h>
#include <vespa/vespalib/testkit/test_kit.h>
+#include <sys/resource.h>
#include <thread>
using namespace vespalib;
+using namespace vespalib::test;
+using vespalib::make_string_short::fmt;
bool verbose = false;
size_t loop_cnt = 10;
@@ -16,6 +21,16 @@ using Sampler = vespalib::cpu_usage::ThreadSampler;
//-----------------------------------------------------------------------------
+class EndTime {
+private:
+ steady_time _end_time;
+public:
+ EndTime(duration test_time) : _end_time(steady_clock::now() + test_time) {}
+ bool operator()() const { return (steady_clock::now() >= _end_time); }
+};
+
+//-----------------------------------------------------------------------------
+
void be_busy(duration d) {
if (d > 0ms) {
volatile int tmp = 123;
@@ -45,22 +60,30 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*>
TEST_BARRIER(); // #1
auto t0 = steady_clock::now();
std::vector<duration> pre_usage = sample(samplers);
+ auto pre_total = cpu_usage::total_cpu_usage();
TEST_BARRIER(); // #2
TEST_BARRIER(); // #3
auto t1 = steady_clock::now();
std::vector<duration> post_usage = sample(samplers);
+ auto post_total = cpu_usage::total_cpu_usage();
TEST_BARRIER(); // #4
double wall = to_s(t1 - t0);
- std::vector<double> load(4, 0.0);
+ std::vector<double> util(4, 0.0);
for (size_t i = 0; i < 4; ++i) {
- load[i] = to_s(post_usage[i] - pre_usage[i]) / wall;
+ util[i] = to_s(post_usage[i] - pre_usage[i]) / wall;
}
- EXPECT_GREATER(load[3], load[0]);
- fprintf(stderr, "loads: { %.2f, %.2f, %.2f, %.2f }\n", load[0], load[1], load[2], load[3]);
+ double total_util = to_s(post_total - pre_total) / wall;
+ EXPECT_GREATER(util[3], util[0]);
+ // NB: cannot expect total_util to be greater than util[3]
+ // here due to mock utils being 'as expected' while valgrind
+ // will cut all utils in about half.
+ EXPECT_GREATER(total_util, util[0]);
+ fprintf(stderr, "utils: { %.3f, %.3f, %.3f, %.3f }\n", util[0], util[1], util[2], util[3]);
+ fprintf(stderr, "total util: %.3f\n", total_util);
} else {
int idx = (thread_id - 1);
- double target_load = double(thread_id - 1) / (num_threads - 2);
- auto sampler = cpu_usage::create_thread_sampler(force_mock, target_load);
+ double target_util = double(thread_id - 1) / (num_threads - 2);
+ auto sampler = cpu_usage::create_thread_sampler(force_mock, target_util);
samplers[idx] = sampler.get();
TEST_BARRIER(); // #1
TEST_BARRIER(); // #2
@@ -74,7 +97,7 @@ void verify_sampling(size_t thread_id, size_t num_threads, std::vector<Sampler*>
//-----------------------------------------------------------------------------
-TEST_MT_F("require that dummy thread-based CPU usage sampling with known expected load works", 5, std::vector<Sampler*>(4, nullptr)) {
+TEST_MT_F("require that dummy thread-based CPU usage sampling with known expected util works", 5, std::vector<Sampler*>(4, nullptr)) {
TEST_DO(verify_sampling(thread_id, num_threads, f1, true));
}
@@ -89,6 +112,353 @@ TEST("measure thread CPU clock overhead") {
fprintf(stderr, "approx overhead per sample (thread CPU clock): %f us\n", min_time_us);
}
+TEST("measure total cpu usage overhead") {
+ duration d;
+ double min_time_us = BenchmarkTimer::benchmark([&d]() noexcept { d = cpu_usage::total_cpu_usage(); }, budget) * 1000000.0;
+ fprintf(stderr, "approx overhead per RUsage sample: %f us\n", min_time_us);
+}
+
+//-----------------------------------------------------------------------------
+
+void verify_category(CpuUsage::Category cat, size_t idx, const vespalib::string &name) {
+ switch (cat) { // make sure we known all categories
+ case CpuUsage::Category::SETUP:
+ case CpuUsage::Category::READ:
+ case CpuUsage::Category::WRITE:
+ case CpuUsage::Category::COMPACT:
+ case CpuUsage::Category::OTHER:
+ EXPECT_EQUAL(CpuUsage::index_of(cat), idx);
+ EXPECT_EQUAL(CpuUsage::name_of(cat), name);
+ }
+}
+
+TEST("require that CPU categories are as expected") {
+ TEST_DO(verify_category(CpuUsage::Category::SETUP, 0u, "setup"));
+ TEST_DO(verify_category(CpuUsage::Category::READ, 1u, "read"));
+ TEST_DO(verify_category(CpuUsage::Category::WRITE, 2u, "write"));
+ TEST_DO(verify_category(CpuUsage::Category::COMPACT, 3u, "compact"));
+ TEST_DO(verify_category(CpuUsage::Category::OTHER, 4u, "other"));
+ EXPECT_EQUAL(CpuUsage::num_categories, 5u);
+}
+
+TEST("require that empty sample is zero") {
+ CpuUsage::Sample sample;
+ EXPECT_EQUAL(sample.size(), CpuUsage::num_categories);
+ for (uint32_t i = 0; i < sample.size(); ++i) {
+ EXPECT_EQUAL(sample[i].count(), 0);
+ }
+}
+
+TEST("require that cpu samples can be manipulated and inspected") {
+ CpuUsage::Sample a;
+ CpuUsage::Sample b;
+ const CpuUsage::Sample &c = a;
+ a[CpuUsage::Category::SETUP] = 1ms;
+ a[CpuUsage::Category::READ] = 2ms;
+ a[CpuUsage::Category::WRITE] = 3ms;
+ a[CpuUsage::Category::COMPACT] = 4ms;
+ a[CpuUsage::Category::OTHER] = 5ms;
+ for (uint32_t i = 0; i < b.size(); ++i) {
+ b[i] = 10ms * (i + 1);
+ }
+ a.merge(b);
+ for (uint32_t i = 0; i < c.size(); ++i) {
+ EXPECT_EQUAL(c[i], 11ms * (i + 1));
+ }
+ EXPECT_EQUAL(c[CpuUsage::Category::SETUP], 11ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::READ], 22ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::WRITE], 33ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::COMPACT], 44ms);
+ EXPECT_EQUAL(c[CpuUsage::Category::OTHER], 55ms);
+}
+
+//-----------------------------------------------------------------------------
+
+struct CpuUsage::Test {
+ struct BlockingTracker : ThreadTracker {
+ std::atomic<size_t> called;
+ ThreadMeets::Nop sync_entry;
+ ThreadMeets::Swap<Sample> swap_sample;
+ BlockingTracker()
+ : called(0), sync_entry(2), swap_sample() {}
+ Sample sample() noexcept override {
+ if (called++) {
+ return Sample();
+ }
+ sync_entry();
+ return swap_sample(Sample());
+ }
+ };
+ struct SimpleTracker : ThreadTracker {
+ Sample my_sample;
+ std::atomic<size_t> called;
+ SimpleTracker(Sample sample) noexcept
+ : my_sample(sample), called(0) {}
+ Sample sample() noexcept override {
+ ++called;
+ return my_sample;
+ }
+ };
+ struct Fixture {
+ CpuUsage my_usage;
+ std::shared_ptr<BlockingTracker> blocking;
+ std::vector<std::shared_ptr<SimpleTracker>> simple_list;
+ Fixture() : my_usage() {}
+ void add_blocking() {
+ ASSERT_TRUE(!blocking);
+ blocking = std::make_unique<BlockingTracker>();
+ my_usage.add_thread(blocking);
+ }
+ void add_simple(Sample sample) {
+ auto simple = std::make_shared<SimpleTracker>(sample);
+ simple_list.push_back(simple);
+ my_usage.add_thread(simple);
+ }
+ void add_remove_simple(Sample sample) {
+ auto simple = std::make_shared<SimpleTracker>(sample);
+ my_usage.add_thread(simple);
+ my_usage.remove_thread(simple);
+ }
+ size_t count_threads() {
+ Guard guard(my_usage._lock);
+ return my_usage._threads.size();
+ }
+ bool is_sampling() {
+ Guard guard(my_usage._lock);
+ return my_usage._sampling;
+ }
+ size_t count_conflicts() {
+ Guard guard(my_usage._lock);
+ if (!my_usage._conflict) {
+ return 0;
+ }
+ return my_usage._conflict->waiters;
+ }
+ size_t count_simple_samples() {
+ size_t result = 0;
+ for (const auto &simple: simple_list) {
+ result += simple->called;
+ }
+ return result;
+ }
+ TimedSample sample() { return my_usage.sample_or_wait(); }
+ ~Fixture() {
+ if (blocking) {
+ my_usage.remove_thread(std::move(blocking));
+ }
+ for (auto &simple: simple_list) {
+ my_usage.remove_thread(std::move(simple));
+ }
+ ASSERT_EQUAL(count_threads(), 0u);
+ }
+ };
+ struct TrackerImpl {
+ ThreadTrackerImpl impl;
+ TrackerImpl(cpu_usage::ThreadSampler::UP sampler)
+ : impl(std::move(sampler)) {}
+ CpuUsage::Sample sample() { return impl.sample(); }
+ CpuUsage::Category set_category(CpuUsage::Category cat) { return impl.set_category(cat); }
+ };
+};
+
+TEST_F("require that CpuUsage sample calls sample on thread trackers", CpuUsage::Test::Fixture()) {
+ CpuUsage::Sample sample;
+ sample[CpuUsage::Category::READ] = 10ms;
+ f1.add_simple(sample);
+ f1.add_simple(sample);
+ f1.add_simple(sample);
+ EXPECT_EQUAL(f1.count_threads(), 3u);
+ auto result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+ EXPECT_EQUAL(f1.count_simple_samples(), 3u);
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(60ms));
+ EXPECT_EQUAL(f1.count_simple_samples(), 6u);
+}
+
+TEST_F("require that threads added and removed between CpuUsage sample calls are tracked", CpuUsage::Test::Fixture()) {
+ CpuUsage::Sample sample;
+ sample[CpuUsage::Category::READ] = 10ms;
+ auto result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(0ms));
+ f1.add_remove_simple(sample);
+ f1.add_remove_simple(sample);
+ f1.add_remove_simple(sample);
+ EXPECT_EQUAL(f1.count_threads(), 0u);
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+ result = f1.sample();
+ EXPECT_EQUAL(result.second[CpuUsage::Category::READ], duration(30ms));
+}
+
+TEST_MT_FF("require that sample conflicts are resolved correctly", 5, CpuUsage::Test::Fixture(), std::vector<CpuUsage::TimedSample>(num_threads - 1)) {
+ if (thread_id == 0) {
+ CpuUsage::Sample s1;
+ s1[CpuUsage::Category::SETUP] = 10ms;
+ CpuUsage::Sample s2;
+ s2[CpuUsage::Category::READ] = 20ms;
+ CpuUsage::Sample s3;
+ s3[CpuUsage::Category::WRITE] = 30ms;
+ CpuUsage::Sample s4;
+ s4[CpuUsage::Category::COMPACT] = 40ms;
+ f1.add_blocking();
+ f1.add_simple(s1); // should be sampled
+ EXPECT_TRUE(!f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), 0u);
+ TEST_BARRIER(); // #1
+ f1.blocking->sync_entry();
+ EXPECT_TRUE(f1.is_sampling());
+ while (f1.count_conflicts() < (num_threads - 2)) {
+ // wait for appropriate number of conflicts
+ std::this_thread::sleep_for(1ms);
+ }
+ f1.add_simple(s2); // should NOT be sampled (pending add)
+ f1.add_remove_simple(s3); // should be sampled (pending remove);
+ EXPECT_EQUAL(f1.count_threads(), 2u);
+ EXPECT_TRUE(f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), (num_threads - 2));
+ f1.blocking->swap_sample(s4);
+ TEST_BARRIER(); // #2
+ EXPECT_TRUE(!f1.is_sampling());
+ EXPECT_EQUAL(f1.count_conflicts(), 0u);
+ EXPECT_EQUAL(f1.count_threads(), 3u);
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::SETUP], duration(10ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::READ], duration(0ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::WRITE], duration(30ms));
+ EXPECT_EQUAL(f2[0].second[CpuUsage::Category::COMPACT], duration(40ms));
+ for (size_t i = 1; i < (num_threads - 1); ++i) {
+ EXPECT_EQUAL(f2[i].first, f2[0].first);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::SETUP], f2[0].second[CpuUsage::Category::SETUP]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::READ], f2[0].second[CpuUsage::Category::READ]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::WRITE], f2[0].second[CpuUsage::Category::WRITE]);
+ EXPECT_EQUAL(f2[i].second[CpuUsage::Category::COMPACT], f2[0].second[CpuUsage::Category::COMPACT]);
+ }
+ } else {
+ TEST_BARRIER(); // #1
+ f2[thread_id - 1] = f1.sample();
+ TEST_BARRIER(); // #2
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+struct DummySampler : public cpu_usage::ThreadSampler {
+ duration &ref;
+ DummySampler(duration &ref_in) : ref(ref_in) {}
+ duration sample() const noexcept override { return ref; }
+};
+
+TEST("require that thread tracker implementation can track cpu use") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ t += 10ms;
+ tracker.set_category(CpuUsage::Category::SETUP);
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::READ);
+ t += 10ms;
+ auto sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(15ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(10ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(0ms));
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::WRITE);
+ t += 10ms;
+ sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::SETUP], duration(0ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(15ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::WRITE], duration(10ms));
+}
+
+TEST("require that thread tracker implementation reports previous CPU category") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::OTHER),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::SETUP)));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::SETUP),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ)));
+ EXPECT_EQUAL(CpuUsage::index_of(CpuUsage::Category::READ),
+ CpuUsage::index_of(tracker.set_category(CpuUsage::Category::READ)));
+}
+
+TEST("require that thread tracker implementation does not track OTHER cpu use") {
+ duration t = duration::zero();
+ CpuUsage::Test::TrackerImpl tracker(std::make_unique<DummySampler>(t));
+ t += 10ms;
+ tracker.set_category(CpuUsage::Category::OTHER);
+ t += 15ms;
+ tracker.set_category(CpuUsage::Category::READ);
+ tracker.set_category(CpuUsage::Category::OTHER);
+ t += 15ms;
+ auto sample = tracker.sample();
+ EXPECT_EQUAL(sample[CpuUsage::Category::READ], duration(0ms));
+ EXPECT_EQUAL(sample[CpuUsage::Category::OTHER], duration(0ms));
+}
+
+//-----------------------------------------------------------------------------
+
+void do_sample_cpu_usage(const EndTime &end_time) {
+ auto my_usage = CpuUsage::use(CpuUsage::Category::SETUP);
+ CpuUtil cpu(8ms);
+ while (!end_time()) {
+ std::this_thread::sleep_for(verbose ? 1s : 10ms);
+ auto util = cpu.get_util();
+ vespalib::string body;
+ for (size_t i = 0; i < util.size(); ++i) {
+ if (!body.empty()) {
+ body.append(", ");
+ }
+ body.append(fmt("%s: %.3f", CpuUsage::name_of(CpuUsage::Category(i)).c_str(), util[i]));
+ }
+ fprintf(stderr, "CPU: %s\n", body.c_str());
+ }
+}
+
+void do_full_work(CpuUsage::Category cat, const EndTime &end_time) {
+ auto my_usage = CpuUsage::use(cat);
+ while (!end_time()) {
+ be_busy(4ms);
+ }
+}
+
+void do_some_work(CpuUsage::Category cat, const EndTime &end_time) {
+ auto my_usage = CpuUsage::use(cat);
+ while (!end_time()) {
+ be_busy(4ms);
+ std::this_thread::sleep_for(4ms);
+ }
+}
+
+void do_nested_work(CpuUsage::Category cat1, CpuUsage::Category cat2, const EndTime &end_time) {
+ auto my_usage1 = CpuUsage::use(cat1);
+ while (!end_time()) {
+ be_busy(4ms);
+ auto my_usage2 = CpuUsage::use(cat2);
+ be_busy(4ms);
+ }
+}
+
+void do_external_work(CpuUsage::Category cat, const EndTime &end_time) {
+ auto my_usage1 = CpuUsage::use(CpuUsage::Category::SETUP);
+ while (!end_time()) {
+ std::thread thread([cat](){
+ auto my_usage2 = CpuUsage::use(cat);
+ be_busy(4ms);
+ });
+ thread.join();
+ }
+}
+
+TEST_MT_F("use top-level API to sample CPU usage", 5, EndTime(verbose ? 10s : 100ms)) {
+ switch (thread_id) {
+ case 0: return do_sample_cpu_usage(f1);
+ case 1: return do_full_work(CpuUsage::Category::WRITE, f1);
+ case 2: return do_some_work(CpuUsage::Category::READ, f1);
+ case 3: return do_nested_work(CpuUsage::Category::OTHER, CpuUsage::Category::READ, f1);
+ case 4: return do_external_work(CpuUsage::Category::COMPACT, f1);
+ default: TEST_FATAL("missing thread id case");
+ }
+}
+
//-----------------------------------------------------------------------------
int main(int argc, char **argv) {
diff --git a/vespalib/src/tests/datastore/array_store/array_store_test.cpp b/vespalib/src/tests/datastore/array_store/array_store_test.cpp
index c58e357a9a1..2672ed70f6d 100644
--- a/vespalib/src/tests/datastore/array_store/array_store_test.cpp
+++ b/vespalib/src/tests/datastore/array_store/array_store_test.cpp
@@ -7,6 +7,7 @@
#include <vespa/vespalib/datastore/compaction_strategy.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/test/memory_allocator_observer.h>
#include <vespa/vespalib/test/insertion_operators.h>
#include <vespa/vespalib/util/memory_allocator.h>
#include <vespa/vespalib/util/size_literals.h>
@@ -19,6 +20,9 @@ using vespalib::ArrayRef;
using generation_t = vespalib::GenerationHandler::generation_t;
using MemStats = vespalib::datastore::test::MemStats;
using BufferStats = vespalib::datastore::test::BufferStats;
+using vespalib::alloc::MemoryAllocator;
+using vespalib::alloc::test::MemoryAllocatorObserver;
+using AllocStats = MemoryAllocatorObserver::Stats;
namespace {
@@ -40,18 +44,20 @@ struct Fixture
using value_type = EntryT;
using ReferenceStore = vespalib::hash_map<EntryRef, EntryVector>;
+ AllocStats stats;
ArrayStoreType store;
ReferenceStore refStore;
generation_t generation;
Fixture(uint32_t maxSmallArraySize, bool enable_free_lists = true)
: store(ArrayStoreConfig(maxSmallArraySize,
ArrayStoreConfig::AllocSpec(16, RefT::offsetSize(), 8_Ki,
- ALLOC_GROW_FACTOR)).enable_free_lists(enable_free_lists)),
+ ALLOC_GROW_FACTOR)).enable_free_lists(enable_free_lists),
+ std::make_unique<MemoryAllocatorObserver>(stats)),
refStore(),
generation(1)
{}
Fixture(const ArrayStoreConfig &storeCfg)
- : store(storeCfg),
+ : store(storeCfg, std::make_unique<MemoryAllocatorObserver>(stats)),
refStore(),
generation(1)
{}
@@ -162,10 +168,10 @@ TEST("require that we test with trivial and non-trivial types")
TEST_F("control static sizes", NumberFixture(3)) {
#ifdef _LIBCPP_VERSION
- EXPECT_EQUAL(424u, sizeof(f.store));
+ EXPECT_EQUAL(440u, sizeof(f.store));
EXPECT_EQUAL(296u, sizeof(NumberFixture::ArrayStoreType::DataStoreType));
#else
- EXPECT_EQUAL(456u, sizeof(f.store));
+ EXPECT_EQUAL(472u, sizeof(f.store));
EXPECT_EQUAL(328u, sizeof(NumberFixture::ArrayStoreType::DataStoreType));
#endif
EXPECT_EQUAL(96u, sizeof(NumberFixture::ArrayStoreType::SmallArrayType));
@@ -447,4 +453,9 @@ TEST_F("require that offset in EntryRefT is within bounds when allocating memory
f.assertStoreContent();
}
+TEST_F("require that provided memory allocator is used", NumberFixture(3))
+{
+ EXPECT_EQUAL(AllocStats(4, 0), f.stats);
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/datastore/datastore/.gitignore b/vespalib/src/tests/datastore/datastore/.gitignore
new file mode 100644
index 00000000000..d033739fd78
--- /dev/null
+++ b/vespalib/src/tests/datastore/datastore/.gitignore
@@ -0,0 +1 @@
+/core.*
diff --git a/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp b/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp
index 417b92af9ac..599cb209e6c 100644
--- a/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp
+++ b/vespalib/src/tests/datastore/fixed_size_hash_map/fixed_size_hash_map_test.cpp
@@ -68,7 +68,7 @@ struct DataStoreFixedSizeHashTest : public ::testing::Test
DataStoreFixedSizeHashTest::DataStoreFixedSizeHashTest()
: _generation_handler(),
_generation_holder(),
- _allocator(),
+ _allocator({}),
_store(_allocator.get_data_store()),
_comp(std::make_unique<MyCompare>(_store)),
_hash_map(),
diff --git a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp
index 796e19a97d1..13f9ae251b6 100644
--- a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp
+++ b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp
@@ -132,7 +132,7 @@ struct DataStoreShardedHashTest : public ::testing::Test
DataStoreShardedHashTest::DataStoreShardedHashTest()
: _generationHandler(),
- _allocator(),
+ _allocator({}),
_store(_allocator.get_data_store()),
_hash_map(std::make_unique<MyCompare>(_store)),
_writer(1, 128_Ki),
diff --git a/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp b/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp
index 917c91f2dff..7f279689985 100644
--- a/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp
+++ b/vespalib/src/tests/datastore/unique_store/unique_store_test.cpp
@@ -9,6 +9,7 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/test/datastore/buffer_stats.h>
#include <vespa/vespalib/test/insertion_operators.h>
+#include <vespa/vespalib/test/memory_allocator_observer.h>
#include <vespa/vespalib/util/traits.h>
#include <vector>
@@ -21,6 +22,9 @@ using namespace vespalib::datastore;
using vespalib::ArrayRef;
using generation_t = vespalib::GenerationHandler::generation_t;
using vespalib::datastore::test::BufferStats;
+using vespalib::alloc::MemoryAllocator;
+using vespalib::alloc::test::MemoryAllocatorObserver;
+using AllocStats = MemoryAllocatorObserver::Stats;
template <typename UniqueStoreT>
struct TestBaseValues {
@@ -39,6 +43,7 @@ struct TestBase : public ::testing::Test {
using ReferenceStoreValueType = std::conditional_t<std::is_same_v<ValueType, const char *>, std::string, ValueType>;
using ReferenceStore = std::map<EntryRef, std::pair<ReferenceStoreValueType,uint32_t>>;
+ AllocStats stats;
UniqueStoreType store;
ReferenceStore refStore;
generation_t generation;
@@ -148,7 +153,8 @@ struct TestBase : public ::testing::Test {
template <typename UniqueStoreTypeAndDictionaryType>
TestBase<UniqueStoreTypeAndDictionaryType>::TestBase()
- : store(),
+ : stats(),
+ store(std::make_unique<MemoryAllocatorObserver>(stats)),
refStore(),
generation(1)
{
@@ -424,6 +430,15 @@ TYPED_TEST(TestBase, store_can_be_enumerated)
EXPECT_EQ(2u, enumValue2);
}
+TYPED_TEST(TestBase, provided_memory_allocator_is_used)
+{
+ if constexpr (std::is_same_v<const char *, typename TestFixture::ValueType>) {
+ EXPECT_EQ(AllocStats(18, 0), this->stats);
+ } else {
+ EXPECT_EQ(AllocStats(1, 0), this->stats);
+ }
+}
+
#pragma GCC diagnostic pop
TEST_F(DoubleTest, nan_is_handled)
diff --git a/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp b/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp
index 158b85f6bf5..21330c22166 100644
--- a/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp
+++ b/vespalib/src/tests/datastore/unique_store_string_allocator/unique_store_string_allocator_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/test/datastore/buffer_stats.h>
#include <vespa/vespalib/test/insertion_operators.h>
+#include <vespa/vespalib/test/memory_allocator_observer.h>
#include <vespa/vespalib/util/traits.h>
#include <vector>
@@ -11,6 +12,10 @@ using namespace vespalib::datastore;
using vespalib::MemoryUsage;
using generation_t = vespalib::GenerationHandler::generation_t;
using BufferStats = vespalib::datastore::test::BufferStats;
+using vespalib::alloc::MemoryAllocator;
+using vespalib::alloc::test::MemoryAllocatorObserver;
+using AllocStats = MemoryAllocatorObserver::Stats;
+
namespace {
@@ -24,10 +29,12 @@ template <typename RefT = EntryRefT<22>>
struct TestBase : public ::testing::Test {
using EntryRefType = RefT;
+ AllocStats stats;
UniqueStoreStringAllocator<EntryRefType> allocator;
generation_t generation;
TestBase()
- : allocator(),
+ : stats(),
+ allocator(std::make_unique<MemoryAllocatorObserver>(stats)),
generation(1)
{}
void assert_add(const char *input) {
@@ -170,6 +177,11 @@ TEST_F(StringTest, free_list_is_never_used_for_move)
assert_buffer_state(ref2, BufferStats().used(4).hold(0).dead(2).extra_used(2002));
}
+TEST_F(StringTest, provided_memory_allocator_is_used)
+{
+ EXPECT_EQ(AllocStats(18, 0), stats);
+}
+
TEST_F(SmallOffsetStringTest, new_underlying_buffer_is_allocated_when_current_is_full)
{
uint32_t first_buffer_id = get_buffer_id(add(small.c_str()));
@@ -184,6 +196,7 @@ TEST_F(SmallOffsetStringTest, new_underlying_buffer_is_allocated_when_current_is
uint32_t buffer_id = get_buffer_id(add(small.c_str()));
EXPECT_EQ(second_buffer_id, buffer_id);
}
+ EXPECT_LT(18, stats.alloc_cnt);
}
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/tests/nice/CMakeLists.txt b/vespalib/src/tests/nice/CMakeLists.txt
new file mode 100644
index 00000000000..a97ec52c22c
--- /dev/null
+++ b/vespalib/src/tests/nice/CMakeLists.txt
@@ -0,0 +1,10 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_nice_test_app TEST
+ SOURCES
+ nice_test.cpp
+ DEPENDS
+ vespalib
+)
+if(NOT CMAKE_HOST_SYSTEM_NAME STREQUAL "Darwin")
+ vespa_add_test(NAME vespalib_nice_test_app COMMAND vespalib_nice_test_app)
+endif()
diff --git a/vespalib/src/tests/nice/nice_test.cpp b/vespalib/src/tests/nice/nice_test.cpp
new file mode 100644
index 00000000000..ec1d25be3bd
--- /dev/null
+++ b/vespalib/src/tests/nice/nice_test.cpp
@@ -0,0 +1,98 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/util/nice.h>
+#include <vespa/vespalib/test/thread_meets.h>
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <unistd.h>
+#include <functional>
+#include <thread>
+
+using vespalib::Runnable;
+using vespalib::be_nice;
+using vespalib::test::ThreadMeets;
+
+double how_nice(int now, int target) {
+ int max = 19;
+ int wanted_param = (target - now);
+ int num_zones = ((max + 1) - now);
+ // make sure we are in the middle of the wanted nice zone
+ double result = (0.5 + wanted_param) / num_zones;
+ fprintf(stderr, " ... using how_nice=%g to get from %d to %d in nice value\n", result, now, target);
+ return result;
+}
+
+struct RunFun : Runnable {
+ std::function<void()> my_fun;
+ RunFun(std::function<void()> fun_in) : my_fun(fun_in) {}
+ void run() override { my_fun(); }
+};
+
+int my_init_fun(Runnable &target) {
+ target.run();
+ return 1;
+}
+
+std::thread run_with_init(std::function<void()> my_fun, Runnable::init_fun_t init_fun = my_init_fun) {
+ return std::thread([init_fun, my_fun]
+ {
+ RunFun run_fun(my_fun);
+ init_fun(run_fun);
+ });
+}
+
+TEST("require that initial nice value is 0") {
+ EXPECT_EQUAL(nice(0), 0);
+}
+
+TEST("require that nice value is tracked per thread") {
+ ThreadMeets::Nop barrier(5);
+ std::vector<std::thread> threads;
+ for (int i = 0; i < 5; ++i) {
+ threads.push_back(run_with_init([my_barrier = &barrier, i]
+ {
+ nice(i);
+ (*my_barrier)();
+ EXPECT_EQUAL(nice(0), i);
+ }));
+ }
+ for (auto &thread: threads) {
+ thread.join();
+ }
+}
+
+void verify_max_nice_value() {
+ int now = nice(0);
+ now = nice(19 - now);
+ EXPECT_EQUAL(now, 19);
+ now = nice(1);
+ EXPECT_EQUAL(now, 19);
+}
+
+TEST("require that max nice value is 19") {
+ auto thread = run_with_init([]{ verify_max_nice_value(); });
+ thread.join();
+}
+
+TEST("require that nice value can be set with init function") {
+ for (int i = 0; i <= 19; ++i) {
+ auto thread = run_with_init([i]()
+ {
+ EXPECT_EQUAL(nice(0), i);
+ }, be_nice(my_init_fun, how_nice(0, i)));
+ thread.join();
+ }
+}
+
+TEST("require that niceness can be nested and will act on a limited nice value range") {
+ auto thread1 = run_with_init([]{ EXPECT_EQUAL(nice(0), 7); },
+ be_nice(be_nice(my_init_fun, how_nice(3, 7)), how_nice(0, 3)));
+ auto thread2 = run_with_init([]{ EXPECT_EQUAL(nice(0), 15); },
+ be_nice(be_nice(my_init_fun, how_nice(10, 15)), how_nice(0, 10)));
+ auto thread3 = run_with_init([]{ EXPECT_EQUAL(nice(0), 19); },
+ be_nice(be_nice(my_init_fun, how_nice(10, 19)), how_nice(0, 10)));
+ thread1.join();
+ thread2.join();
+ thread3.join();
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt
new file mode 100644
index 00000000000..6e977cdb59f
--- /dev/null
+++ b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_shared_operation_throttler_test_app TEST
+ SOURCES
+ shared_operation_throttler_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_shared_operation_throttler_test_app COMMAND vespalib_shared_operation_throttler_test_app)
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
new file mode 100644
index 00000000000..d9b6ae7f908
--- /dev/null
+++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
@@ -0,0 +1,212 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/util/shared_operation_throttler.h>
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <thread>
+
+namespace vespalib {
+
+using ThrottleToken = SharedOperationThrottler::Token;
+
+struct DynamicThrottleFixture {
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ DynamicThrottleFixture() {
+ SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = 1;
+ params.min_window_size = 1;
+ _throttler = SharedOperationThrottler::make_dynamic_throttler(params);
+ }
+};
+
+TEST("unlimited throttler does not throttle") {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQUAL(throttler->current_window_size(), 0u);
+}
+
+TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQUAL(f1._throttler->current_window_size(), 1u);
+}
+
+TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) {
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = f1._throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixture()) {
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = f1._throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (f1._throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) {
+ auto window_filling_token = f1._throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = f1._throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST("default constructed token is invalid") {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture()) {
+ {
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("token can be moved and reset", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
+// Note on test semantics: these tests are adapted from a subset of the MessageBus
+// throttling tests. Some tests have been simplified due to no longer having access
+// to the low-level DynamicThrottlePolicy API.
+
+struct WindowFixture {
+ uint64_t _milli_time;
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ WindowFixture(uint32_t window_size_increment = 5,
+ uint32_t min_window_size = 20,
+ uint32_t max_window_size = INT_MAX)
+ : _milli_time(0),
+ _throttler()
+ {
+ SharedOperationThrottler::DynamicThrottleParams params;
+ params.resize_rate = 1;
+ params.window_size_increment = window_size_increment;
+ params.min_window_size = min_window_size;
+ params.max_window_size = max_window_size;
+ params.window_size_decrement_factor = 2;
+ params.window_size_backoff = 0.9;
+ _throttler = SharedOperationThrottler::make_dynamic_throttler(params, [&]() noexcept {
+ return steady_time(std::chrono::milliseconds(_milli_time));
+ });
+ }
+
+ std::vector<SharedOperationThrottler::Token> fill_entire_throttle_window() {
+ std::vector<SharedOperationThrottler::Token> tokens;
+ while (true) {
+ auto token = _throttler->try_acquire_one();
+ if (!token.valid()) {
+ break;
+ }
+ tokens.emplace_back(std::move(token));
+ }
+ return tokens;
+ }
+
+ uint32_t attempt_converge_on_stable_window_size(uint32_t max_pending) {
+ for (uint32_t i = 0; i < 999; ++i) {
+ auto tokens = fill_entire_throttle_window();
+ uint32_t num_pending = static_cast<uint32_t>(tokens.size());
+
+ uint64_t trip_time = (num_pending < max_pending) ? 1000 : 1000 + (num_pending - max_pending) * 1000;
+ _milli_time += trip_time;
+ // Throttle window slots implicitly freed up as tokens are destructed.
+ }
+ uint32_t ret = _throttler->current_window_size();
+ fprintf(stderr, "attempt_converge_on_stable_window_size() = %u\n", ret);
+ return ret;
+ }
+};
+
+TEST_F("window size changes dynamically based on throughput", WindowFixture()) {
+ uint32_t window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 90 && window_size <= 105);
+
+ window_size = f1.attempt_converge_on_stable_window_size(200);
+ ASSERT_TRUE(window_size >= 180 && window_size <= 205);
+
+ window_size = f1.attempt_converge_on_stable_window_size(50);
+ ASSERT_TRUE(window_size >= 45 && window_size <= 55);
+
+ window_size = f1.attempt_converge_on_stable_window_size(500);
+ ASSERT_TRUE(window_size >= 450 && window_size <= 505);
+
+ window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 90 && window_size <= 115);
+}
+
+TEST_F("window size is reset after idle time period", WindowFixture(5, 1)) {
+ double window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 90 && window_size <= 110);
+
+ f1._milli_time += 30001; // Not yet past 60s idle time
+ auto tokens = f1.fill_entire_throttle_window();
+ ASSERT_TRUE(tokens.size() >= 90 && tokens.size() <= 110);
+ tokens.clear();
+
+ f1._milli_time += 60001; // Idle time passed
+ tokens = f1.fill_entire_throttle_window();
+ EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size
+}
+
+TEST_F("minimum window size is respected", WindowFixture(5, 150, INT_MAX)) {
+ double window_size = f1.attempt_converge_on_stable_window_size(200);
+ ASSERT_TRUE(window_size >= 150 && window_size <= 210);
+}
+
+TEST_F("maximum window size is respected", WindowFixture(5, 1, 50)) {
+ double window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 40 && window_size <= 50);
+}
+
+}
+
+TEST_MAIN() {
+ TEST_RUN_ALL();
+}
diff --git a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp
index 81c8271f755..5b267c3b9e9 100644
--- a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp
+++ b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp
@@ -101,8 +101,8 @@ std::unique_ptr<Handles> copy_strong_handles(const Handles &handles) {
return result;
}
-std::unique_ptr<std::vector<string_id>> make_weak_handles(const Handles &handles) {
- return std::make_unique<std::vector<string_id>>(handles.view());
+std::unique_ptr<StringIdVector> make_weak_handles(const Handles &handles) {
+ return std::make_unique<StringIdVector>(handles.view());
}
//-----------------------------------------------------------------------------
@@ -202,7 +202,7 @@ struct Fixture {
std::vector<vespalib::string> get_direct_result;
std::unique_ptr<Handles> strong;
std::unique_ptr<Handles> strong_copy;
- std::unique_ptr<std::vector<string_id>> weak;
+ std::unique_ptr<StringIdVector> weak;
auto copy_strings_task = [&](){ copy_strings_result = copy_strings(work); };
auto copy_and_hash_task = [&](){ copy_and_hash_result = copy_and_hash(work); };
auto local_enum_task = [&](){ local_enum_result = local_enum(work); };
diff --git a/vespalib/src/tests/spin_lock/spin_lock_test.cpp b/vespalib/src/tests/spin_lock/spin_lock_test.cpp
index 54ad354f584..78e35a3e8d1 100644
--- a/vespalib/src/tests/spin_lock/spin_lock_test.cpp
+++ b/vespalib/src/tests/spin_lock/spin_lock_test.cpp
@@ -1,12 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/spin_lock.h>
+#include <vespa/vespalib/util/atomic.h>
#include <vespa/vespalib/util/benchmark_timer.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <array>
using namespace vespalib;
+using namespace vespalib::atomic;
bool verbose = false;
double budget = 0.25;
@@ -25,15 +27,15 @@ struct MyState {
void update() {
std::array<size_t,SZ> tmp;
for (size_t i = 0; i < SZ; ++i) {
- tmp[i] = state[i];
+ store_ref_relaxed(tmp[i], load_ref_relaxed(state[i]));
}
for (size_t i = 0; i < SZ; ++i) {
- state[i] = tmp[i] + 1;
+ store_ref_relaxed(state[i], load_ref_relaxed(tmp[i]) + 1);
}
}
bool check(size_t expect) const {
- for (size_t value: state) {
- if (value != expect) {
+ for (const auto& value: state) {
+ if (load_ref_relaxed(value) != expect) {
return false;
}
}
diff --git a/vespalib/src/tests/stllike/asciistream_test.cpp b/vespalib/src/tests/stllike/asciistream_test.cpp
index be0bc1cb694..3042595e18c 100644
--- a/vespalib/src/tests/stllike/asciistream_test.cpp
+++ b/vespalib/src/tests/stllike/asciistream_test.cpp
@@ -409,27 +409,14 @@ AsciistreamTest::testWriteThenRead()
void
AsciistreamTest::testGetLine()
{
- asciistream is("");
- EXPECT_TRUE(is.getlines().empty());
- is = asciistream("line 1");
- std::vector<string> v = is.getlines();
- EXPECT_EQUAL(1u, v.size());
- EXPECT_EQUAL("line 1", v[0]);
- is = asciistream("line 1\nline 2");
- v = is.getlines();
- EXPECT_EQUAL(2u, v.size());
- EXPECT_EQUAL("line 1", v[0]);
- EXPECT_EQUAL("line 2", v[1]);
- is = asciistream("line 1\nline 2\n\n");
- v = is.getlines();
- EXPECT_EQUAL(3u, v.size());
- EXPECT_EQUAL("line 1", v[0]);
- EXPECT_EQUAL("line 2", v[1]);
- EXPECT_EQUAL("", v[2]);
- is = asciistream("line 1");
+ asciistream is = asciistream("line 1\nline 2\nline 3");
string s;
getline(is, s);
EXPECT_EQUAL("line 1", s);
+ getline(is, s);
+ EXPECT_EQUAL("line 2", s);
+ getline(is, s);
+ EXPECT_EQUAL("line 3", s);
}
#define VERIFY_DOUBLE_SERIALIZATION(value, expected, format, precision) { \
diff --git a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
index cf84ab03a25..14802a60ff1 100644
--- a/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
+++ b/vespalib/src/tests/util/rcuvector/rcuvector_test.cpp
@@ -1,11 +1,17 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/test/memory_allocator_observer.h>
#include <vespa/vespalib/util/rcuvector.h>
#include <vespa/vespalib/util/size_literals.h>
using namespace vespalib;
+using vespalib::alloc::Alloc;
+using vespalib::alloc::MemoryAllocator;
+using MyMemoryAllocator = vespalib::alloc::test::MemoryAllocatorObserver;
+using AllocStats = MyMemoryAllocator::Stats;
+
bool
assertUsage(const MemoryUsage & exp, const MemoryUsage & act)
{
@@ -288,4 +294,70 @@ TEST("test small expand")
g.trimHoldLists(2);
}
+struct Fixture {
+ using generation_t = GenerationHandler::generation_t;
+
+ AllocStats stats;
+ std::unique_ptr<MemoryAllocator> allocator;
+ Alloc initial_alloc;
+ GenerationHolder g;
+ RcuVectorBase<int> arr;
+
+ Fixture();
+ ~Fixture();
+ void transfer_and_trim(generation_t transfer_gen, generation_t trim_gen)
+ {
+ g.transferHoldLists(transfer_gen);
+ g.trimHoldLists(trim_gen);
+ }
+};
+
+Fixture::Fixture()
+ : stats(),
+ allocator(std::make_unique<MyMemoryAllocator>(stats)),
+ initial_alloc(Alloc::alloc_with_allocator(allocator.get())),
+ g(),
+ arr(g, initial_alloc)
+{
+ arr.reserve(100);
+}
+
+Fixture::~Fixture() = default;
+
+TEST_F("require that memory allocator can be set", Fixture)
+{
+ EXPECT_EQUAL(AllocStats(2, 0), f.stats);
+ f.transfer_and_trim(1, 2);
+ EXPECT_EQUAL(AllocStats(2, 1), f.stats);
+}
+
+TEST_F("require that memory allocator is preserved across reset", Fixture)
+{
+ f.arr.reset();
+ f.arr.reserve(100);
+ EXPECT_EQUAL(AllocStats(4, 1), f.stats);
+ f.transfer_and_trim(1, 2);
+ EXPECT_EQUAL(AllocStats(4, 3), f.stats);
+}
+
+TEST_F("require that created replacement vector uses same memory allocator", Fixture)
+{
+ auto arr2 = f.arr.create_replacement_vector();
+ EXPECT_EQUAL(AllocStats(2, 0), f.stats);
+ arr2.reserve(100);
+ EXPECT_EQUAL(AllocStats(3, 0), f.stats);
+ f.transfer_and_trim(1, 2);
+ EXPECT_EQUAL(AllocStats(3, 1), f.stats);
+}
+
+TEST_F("require that ensure_size and shrink use same memory allocator", Fixture)
+{
+ f.arr.ensure_size(2000);
+ EXPECT_EQUAL(AllocStats(3, 0), f.stats);
+ f.arr.shrink(1000);
+ EXPECT_EQUAL(AllocStats(4, 0), f.stats);
+ f.transfer_and_trim(1, 2);
+ EXPECT_EQUAL(AllocStats(4, 3), f.stats);
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/wakeup/wakeup_bench.cpp b/vespalib/src/tests/wakeup/wakeup_bench.cpp
index 1d9817508d3..c39b8899159 100644
--- a/vespalib/src/tests/wakeup/wakeup_bench.cpp
+++ b/vespalib/src/tests/wakeup/wakeup_bench.cpp
@@ -5,6 +5,7 @@
#include <condition_variable>
#include <thread>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#ifdef __linux__
#include <linux/futex.h>
@@ -121,18 +122,33 @@ struct UsePipe : State {
set_wakeup();
char token = 'T';
[[maybe_unused]] ssize_t res = write(pipefd[1], &token, 1);
- assert(res == 1);
+ // assert(res == 1);
}
void stop() {
set_stop();
char token = 'T';
[[maybe_unused]] ssize_t res = write(pipefd[1], &token, 1);
- assert(res == 1);
+ // assert(res == 1);
}
void wait() {
char token_trash[128];
[[maybe_unused]] ssize_t res = read(pipefd[0], token_trash, sizeof(token_trash));
- assert(res == 1);
+ // assert(res == 1);
+ }
+};
+
+struct UseAtomic : State {
+ void wakeup() {
+ set_wakeup();
+ value.notify_one();
+ }
+ void stop() {
+ set_stop();
+ value.notify_one();
+ }
+ void wait() {
+ value.wait(0);
+ // assert(!is_ready());
}
};
@@ -162,9 +178,11 @@ struct Wakeup : T {
using T::should_stop;
using T::set_ready;
using T::wait;
+ cpu_usage::ThreadSampler::UP cpu;
std::thread thread;
Wakeup() : thread([this]{ run(); }) {}
void run() {
+ cpu = cpu_usage::create_thread_sampler();
while (!should_stop()) {
set_ready();
wait();
@@ -211,6 +229,15 @@ void wait_until_ready(const T &list) {
}
template <typename T>
+duration sample_cpu(T &list) {
+ duration result = duration::zero();
+ for (auto *item: list) {
+ result += item->cpu->sample();
+ }
+ return result;
+}
+
+template <typename T>
auto perform_wakeups(T &list, size_t target) __attribute__((noinline));
template <typename T>
auto perform_wakeups(T &list, size_t target) {
@@ -239,11 +266,17 @@ void benchmark() {
perform_wakeups(list, WAKE_CNT / 64);
}
auto t1 = steady_clock::now();
+ auto cpu0 = sample_cpu(list);
auto res = perform_wakeups(list, WAKE_CNT);
auto t2 = steady_clock::now();
+ auto cpu1 = sample_cpu(list);
wait_until_ready(list);
destroy_list(list);
- fprintf(stderr, "wakeups per second: %zu (skipped: %zu)\n", size_t(res.first / to_s(t2 - t1)), res.second);
+ double run_time = to_s(t2 - t1);
+ double cpu_time = to_s(cpu1 - cpu0);
+ double cpu_load = (cpu_time / (N * run_time));
+ fprintf(stderr, "wakeups per second: %zu (skipped: %zu, cpu load: %.3f)\n",
+ size_t(res.first / run_time), res.second, cpu_load);
}
TEST(WakeupBench, using_spin) { benchmark<Wakeup<UseSpin>>(); }
@@ -251,6 +284,7 @@ TEST(WakeupBench, using_spin_yield) { benchmark<Wakeup<UseSpinYield>>(); }
TEST(WakeupBench, using_cond) { benchmark<Wakeup<UseCond>>(); }
TEST(WakeupBench, using_cond_nolock) { benchmark<Wakeup<UseCondNolock>>(); }
TEST(WakeupBench, using_pipe) { benchmark<Wakeup<UsePipe>>(); }
+TEST(WakeupBench, using_atomic) { benchmark<Wakeup<UseAtomic>>(); }
#ifdef __linux__
TEST(WakeupBench, using_futex) { benchmark<Wakeup<UseFutex>>(); }
diff --git a/vespalib/src/vespa/vespalib/data/slime/array_value.h b/vespalib/src/vespa/vespalib/data/slime/array_value.h
index b1b257e8476..216d116498c 100644
--- a/vespalib/src/vespa/vespalib/data/slime/array_value.h
+++ b/vespalib/src/vespa/vespalib/data/slime/array_value.h
@@ -5,6 +5,7 @@
#include "value.h"
#include "nix_value.h"
#include "value_factory.h"
+#include <vespa/vespalib/stllike/allocator.h>
#include <vector>
namespace vespalib::slime {
@@ -18,7 +19,7 @@ class ArrayValue final : public Value
private:
SymbolTable &_symbolTable;
Stash &_stash;
- std::vector<Value*> _values;
+ std::vector<Value*, vespalib::allocator_large<Value*>> _values;
protected:
Cursor &addLeaf(const ValueFactory &input) override {
diff --git a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
index d628843279d..a2f7a4d4ff4 100644
--- a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
@@ -12,8 +12,11 @@ vespa_add_library(vespalib_vespalib_datastore OBJECT
entryref.cpp
entry_ref_filter.cpp
fixed_size_hash_map.cpp
+ large_array_buffer_type.cpp
sharded_hash_map.cpp
+ small_array_buffer_type.cpp
unique_store.cpp
+ unique_store_buffer_type.cpp
unique_store_string_allocator.cpp
DEPENDS
)
diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.cpp b/vespalib/src/vespa/vespalib/datastore/array_store.cpp
index 0e03b36d2f9..b03f21402a5 100644
--- a/vespalib/src/vespa/vespalib/datastore/array_store.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/array_store.cpp
@@ -5,10 +5,4 @@
namespace vespalib::datastore {
-template class BufferType<vespalib::Array<uint8_t>>;
-template class BufferType<vespalib::Array<uint32_t>>;
-template class BufferType<vespalib::Array<int32_t>>;
-template class BufferType<vespalib::Array<std::string>>;
-template class BufferType<vespalib::Array<AtomicEntryRef>>;
-
}
diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.h b/vespalib/src/vespa/vespalib/datastore/array_store.h
index d9b62c310b5..ed3af451b04 100644
--- a/vespalib/src/vespa/vespalib/datastore/array_store.h
+++ b/vespalib/src/vespa/vespalib/datastore/array_store.h
@@ -9,6 +9,8 @@
#include "entryref.h"
#include "atomic_entry_ref.h"
#include "i_compaction_context.h"
+#include "large_array_buffer_type.h"
+#include "small_array_buffer_type.h"
#include <vespa/vespalib/util/array.h>
namespace vespalib::datastore {
@@ -32,27 +34,15 @@ public:
using SmallArrayType = BufferType<EntryT>;
using LargeArray = vespalib::Array<EntryT>;
using AllocSpec = ArrayStoreConfig::AllocSpec;
-
private:
- class LargeArrayType : public BufferType<LargeArray> {
- private:
- using ParentType = BufferType<LargeArray>;
- using ParentType::_emptyEntry;
- using CleanContext = typename ParentType::CleanContext;
- public:
- LargeArrayType(const AllocSpec &spec);
- void cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override;
- };
-
-
uint32_t _largeArrayTypeId;
uint32_t _maxSmallArraySize;
DataStoreType _store;
- std::vector<SmallArrayType> _smallArrayTypes;
- LargeArrayType _largeArrayType;
+ std::vector<SmallArrayBufferType<EntryT>> _smallArrayTypes;
+ LargeArrayBufferType<EntryT> _largeArrayType;
using generation_t = vespalib::GenerationHandler::generation_t;
- void initArrayTypes(const ArrayStoreConfig &cfg);
+ void initArrayTypes(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator);
// 1-to-1 mapping between type ids and sizes for small arrays is enforced during initialization.
uint32_t getTypeId(size_t arraySize) const { return arraySize; }
size_t getArraySize(uint32_t typeId) const { return typeId; }
@@ -68,7 +58,7 @@ private:
}
public:
- ArrayStore(const ArrayStoreConfig &cfg);
+ ArrayStore(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator);
~ArrayStore();
EntryRef add(const ConstArrayRef &array);
ConstArrayRef get(EntryRef ref) const {
diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.hpp b/vespalib/src/vespa/vespalib/datastore/array_store.hpp
index bbbd52c354d..00c1615b173 100644
--- a/vespalib/src/vespa/vespalib/datastore/array_store.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/array_store.hpp
@@ -6,40 +6,23 @@
#include "compaction_spec.h"
#include "entry_ref_filter.h"
#include "datastore.hpp"
+#include "large_array_buffer_type.hpp"
+#include "small_array_buffer_type.hpp"
#include <atomic>
#include <algorithm>
namespace vespalib::datastore {
template <typename EntryT, typename RefT>
-ArrayStore<EntryT, RefT>::LargeArrayType::LargeArrayType(const AllocSpec &spec)
- : BufferType<LargeArray>(1, spec.minArraysInBuffer, spec.maxArraysInBuffer, spec.numArraysForNewBuffer, spec.allocGrowFactor)
-{
-}
-
-template <typename EntryT, typename RefT>
-void
-ArrayStore<EntryT, RefT>::LargeArrayType::cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx)
-{
- LargeArray *elem = static_cast<LargeArray *>(buffer) + offset;
- for (size_t i = 0; i < numElems; ++i) {
- cleanCtx.extraBytesCleaned(sizeof(EntryT) * elem->size());
- *elem = _emptyEntry;
- ++elem;
- }
-}
-
-template <typename EntryT, typename RefT>
void
-ArrayStore<EntryT, RefT>::initArrayTypes(const ArrayStoreConfig &cfg)
+ArrayStore<EntryT, RefT>::initArrayTypes(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator)
{
_largeArrayTypeId = _store.addType(&_largeArrayType);
assert(_largeArrayTypeId == 0);
_smallArrayTypes.reserve(_maxSmallArraySize);
for (uint32_t arraySize = 1; arraySize <= _maxSmallArraySize; ++arraySize) {
const AllocSpec &spec = cfg.specForSize(arraySize);
- _smallArrayTypes.emplace_back(arraySize, spec.minArraysInBuffer, spec.maxArraysInBuffer,
- spec.numArraysForNewBuffer, spec.allocGrowFactor);
+ _smallArrayTypes.emplace_back(arraySize, spec, memory_allocator);
}
for (auto & type : _smallArrayTypes) {
uint32_t typeId = _store.addType(&type);
@@ -48,14 +31,14 @@ ArrayStore<EntryT, RefT>::initArrayTypes(const ArrayStoreConfig &cfg)
}
template <typename EntryT, typename RefT>
-ArrayStore<EntryT, RefT>::ArrayStore(const ArrayStoreConfig &cfg)
+ArrayStore<EntryT, RefT>::ArrayStore(const ArrayStoreConfig &cfg, std::shared_ptr<alloc::MemoryAllocator> memory_allocator)
: _largeArrayTypeId(0),
_maxSmallArraySize(cfg.maxSmallArraySize()),
_store(),
_smallArrayTypes(),
- _largeArrayType(cfg.specForSize(0))
+ _largeArrayType(cfg.specForSize(0), memory_allocator)
{
- initArrayTypes(cfg);
+ initArrayTypes(cfg, std::move(memory_allocator));
_store.init_primary_buffers();
if (cfg.enable_free_lists()) {
_store.enableFreeLists();
diff --git a/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.cpp b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.cpp
new file mode 100644
index 00000000000..f4ccb27abad
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.cpp
@@ -0,0 +1,20 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "large_array_buffer_type.hpp"
+#include "buffer_type.hpp"
+
+namespace vespalib::datastore {
+
+template class BufferType<Array<uint8_t>>;
+template class BufferType<Array<uint32_t>>;
+template class BufferType<Array<int32_t>>;
+template class BufferType<Array<std::string>>;
+template class BufferType<Array<AtomicEntryRef>>;
+
+template class LargeArrayBufferType<uint8_t>;
+template class LargeArrayBufferType<uint32_t>;
+template class LargeArrayBufferType<int32_t>;
+template class LargeArrayBufferType<std::string>;
+template class LargeArrayBufferType<AtomicEntryRef>;
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.h b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.h
new file mode 100644
index 00000000000..50d15d4a27c
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.h
@@ -0,0 +1,39 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "array_store_config.h"
+#include "buffer_type.h"
+#include <vespa/vespalib/util/array.h>
+#include <memory>
+
+namespace vespalib::alloc { class MemoryAllocator; }
+
+namespace vespalib::datastore {
+
+/*
+ * Class representing buffer type for large arrays in ArrayStore
+ */
+template <typename EntryT>
+class LargeArrayBufferType : public BufferType<Array<EntryT>>
+{
+ using AllocSpec = ArrayStoreConfig::AllocSpec;
+ using ArrayType = Array<EntryT>;
+ using ParentType = BufferType<ArrayType>;
+ using ParentType::_emptyEntry;
+ using CleanContext = typename ParentType::CleanContext;
+ std::shared_ptr<alloc::MemoryAllocator> _memory_allocator;
+public:
+ LargeArrayBufferType(const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept;
+ ~LargeArrayBufferType() override;
+ void cleanHold(void* buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override;
+ const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override;
+};
+
+extern template class LargeArrayBufferType<uint8_t>;
+extern template class LargeArrayBufferType<uint32_t>;
+extern template class LargeArrayBufferType<int32_t>;
+extern template class LargeArrayBufferType<std::string>;
+extern template class LargeArrayBufferType<AtomicEntryRef>;
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.hpp b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.hpp
new file mode 100644
index 00000000000..aeeef8166c6
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/large_array_buffer_type.hpp
@@ -0,0 +1,39 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "large_array_buffer_type.h"
+#include <vespa/vespalib/util/array.hpp>
+
+namespace vespalib::datastore {
+
+template <typename EntryT>
+LargeArrayBufferType<EntryT>::LargeArrayBufferType(const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept
+ : BufferType<Array<EntryT>>(1u, spec.minArraysInBuffer, spec.maxArraysInBuffer, spec.numArraysForNewBuffer, spec.allocGrowFactor),
+ _memory_allocator(std::move(memory_allocator))
+{
+}
+
+template <typename EntryT>
+LargeArrayBufferType<EntryT>::~LargeArrayBufferType() = default;
+
+template <typename EntryT>
+void
+LargeArrayBufferType<EntryT>::cleanHold(void* buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx)
+{
+ ArrayType* elem = static_cast<ArrayType*>(buffer) + offset;
+ for (size_t i = 0; i < numElems; ++i) {
+ cleanCtx.extraBytesCleaned(sizeof(EntryT) * elem->size());
+ *elem = _emptyEntry;
+ ++elem;
+ }
+}
+
+template <typename EntryT>
+const vespalib::alloc::MemoryAllocator*
+LargeArrayBufferType<EntryT>::get_memory_allocator() const
+{
+ return _memory_allocator.get();
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.cpp b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.cpp
new file mode 100644
index 00000000000..06a4c6007a9
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.cpp
@@ -0,0 +1,14 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "small_array_buffer_type.hpp"
+#include "buffer_type.hpp"
+
+namespace vespalib::datastore {
+
+template class SmallArrayBufferType<uint8_t>;
+template class SmallArrayBufferType<uint32_t>;
+template class SmallArrayBufferType<int32_t>;
+template class SmallArrayBufferType<std::string>;
+template class SmallArrayBufferType<AtomicEntryRef>;
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.h b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.h
new file mode 100644
index 00000000000..4e4568c3d16
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.h
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "array_store_config.h"
+#include "buffer_type.h"
+#include <memory>
+
+namespace vespalib::alloc { class MemoryAllocator; }
+
+namespace vespalib::datastore {
+
+/*
+ * Class representing buffer type for small arrays in ArrayStore
+ */
+template <typename EntryT>
+class SmallArrayBufferType : public BufferType<EntryT>
+{
+ using AllocSpec = ArrayStoreConfig::AllocSpec;
+ std::shared_ptr<alloc::MemoryAllocator> _memory_allocator;
+public:
+ SmallArrayBufferType(const SmallArrayBufferType&) = delete;
+ SmallArrayBufferType& operator=(const SmallArrayBufferType&) = delete;
+ SmallArrayBufferType(SmallArrayBufferType&&) noexcept = default;
+ SmallArrayBufferType& operator=(SmallArrayBufferType&&) noexcept = default;
+ SmallArrayBufferType(uint32_t array_size, const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept;
+ ~SmallArrayBufferType() override;
+ const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override;
+};
+
+extern template class SmallArrayBufferType<uint8_t>;
+extern template class SmallArrayBufferType<uint32_t>;
+extern template class SmallArrayBufferType<int32_t>;
+extern template class SmallArrayBufferType<std::string>;
+extern template class SmallArrayBufferType<AtomicEntryRef>;
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.hpp b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.hpp
new file mode 100644
index 00000000000..414804417eb
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/small_array_buffer_type.hpp
@@ -0,0 +1,26 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "small_array_buffer_type.h"
+
+namespace vespalib::datastore {
+
+template <typename EntryT>
+SmallArrayBufferType<EntryT>::SmallArrayBufferType(uint32_t array_size, const AllocSpec& spec, std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept
+ : BufferType<EntryT>(array_size, spec.minArraysInBuffer, spec.maxArraysInBuffer, spec.numArraysForNewBuffer, spec.allocGrowFactor),
+ _memory_allocator(std::move(memory_allocator))
+{
+}
+
+template <typename EntryT>
+SmallArrayBufferType<EntryT>::~SmallArrayBufferType() = default;
+
+template <typename EntryT>
+const vespalib::alloc::MemoryAllocator*
+SmallArrayBufferType<EntryT>::get_memory_allocator() const
+{
+ return _memory_allocator.get();
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.cpp b/vespalib/src/vespa/vespalib/datastore/unique_store.cpp
index 0d740c30c84..dcd9b38fab8 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store.cpp
@@ -5,14 +5,6 @@
namespace vespalib::datastore {
-template class BufferType<UniqueStoreEntry<int8_t>>;
-template class BufferType<UniqueStoreEntry<int16_t>>;
-template class BufferType<UniqueStoreEntry<int32_t>>;
-template class BufferType<UniqueStoreEntry<int64_t>>;
-template class BufferType<UniqueStoreEntry<uint32_t>>;
-template class BufferType<UniqueStoreEntry<float>>;
-template class BufferType<UniqueStoreEntry<double>>;
-
using namespace btree;
VESPALIB_DATASTORE_INSTANTIATE_BUFFERTYPE_INTERNALNODE(EntryRef, NoAggregated, uniquestore::DefaultDictionaryTraits::INTERNAL_SLOTS);
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.h b/vespalib/src/vespa/vespalib/datastore/unique_store.h
index aea98f406e8..81034ab4210 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store.h
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store.h
@@ -13,6 +13,8 @@
#include "unique_store_comparator.h"
#include "unique_store_entry.h"
+namespace vespalib::alloc { class MemoryAllocator; }
+
namespace vespalib::datastore {
template <typename Allocator>
@@ -47,8 +49,8 @@ private:
using generation_t = vespalib::GenerationHandler::generation_t;
public:
- UniqueStore();
- UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict);
+ UniqueStore(std::shared_ptr<alloc::MemoryAllocator> memory_allocator);
+ UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict, std::shared_ptr<alloc::MemoryAllocator> memory_allocator);
~UniqueStore();
void set_dictionary(std::unique_ptr<IUniqueStoreDictionary> dict);
UniqueStoreAddResult add(EntryConstRefType value);
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
index b73b714a6bc..b1a7db56545 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
@@ -27,14 +27,14 @@ using DefaultUniqueStoreDictionary = UniqueStoreDictionary<DefaultDictionary>;
}
template <typename EntryT, typename RefT, typename Compare, typename Allocator>
-UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore()
- : UniqueStore<EntryT, RefT, Compare, Allocator>(std::make_unique<uniquestore::DefaultUniqueStoreDictionary>(std::unique_ptr<EntryComparator>()))
+UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore(std::shared_ptr<alloc::MemoryAllocator> memory_allocator)
+ : UniqueStore<EntryT, RefT, Compare, Allocator>(std::make_unique<uniquestore::DefaultUniqueStoreDictionary>(std::unique_ptr<EntryComparator>()), std::move(memory_allocator))
{
}
template <typename EntryT, typename RefT, typename Compare, typename Allocator>
-UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict)
- : _allocator(),
+UniqueStore<EntryT, RefT, Compare, Allocator>::UniqueStore(std::unique_ptr<IUniqueStoreDictionary> dict, std::shared_ptr<alloc::MemoryAllocator> memory_allocator)
+ : _allocator(std::move(memory_allocator)),
_store(_allocator.get_data_store()),
_dict(std::move(dict))
{
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h
index 0648f466bae..025165ee0e0 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.h
@@ -5,9 +5,12 @@
#include "datastore.h"
#include "entryref.h"
#include "unique_store_add_result.h"
+#include "unique_store_buffer_type.h"
#include "unique_store_entry.h"
#include "i_compactable.h"
+namespace vespalib::alloc { class MemoryAllocator; }
+
namespace vespalib::datastore {
/**
@@ -23,13 +26,12 @@ public:
using EntryConstRefType = const EntryType &;
using WrappedEntryType = UniqueStoreEntry<EntryType>;
using RefType = RefT;
- using UniqueStoreBufferType = BufferType<WrappedEntryType>;
private:
DataStoreType _store;
- UniqueStoreBufferType _typeHandler;
+ UniqueStoreBufferType<WrappedEntryType> _typeHandler;
public:
- UniqueStoreAllocator();
+ UniqueStoreAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator);
~UniqueStoreAllocator() override;
EntryRef allocate(const EntryType& value);
void hold(EntryRef ref);
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp
index eb634503c9d..04a229d4ffa 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_allocator.hpp
@@ -3,6 +3,7 @@
#pragma once
#include "unique_store_allocator.h"
+#include "unique_store_buffer_type.hpp"
#include "unique_store_value_filter.h"
#include "datastore.hpp"
#include <vespa/vespalib/util/size_literals.h>
@@ -13,10 +14,10 @@ constexpr size_t NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER = 1_Ki;
constexpr float ALLOC_GROW_FACTOR = 0.2;
template <typename EntryT, typename RefT>
-UniqueStoreAllocator<EntryT, RefT>::UniqueStoreAllocator()
+UniqueStoreAllocator<EntryT, RefT>::UniqueStoreAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator)
: ICompactable(),
_store(),
- _typeHandler(1, 2u, RefT::offsetSize(), NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR)
+ _typeHandler(2u, RefT::offsetSize(), NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR, std::move(memory_allocator))
{
auto typeId = _store.addType(&_typeHandler);
assert(typeId == 0u);
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.cpp b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.cpp
new file mode 100644
index 00000000000..3441a9435a1
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.cpp
@@ -0,0 +1,25 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "unique_store_buffer_type.hpp"
+#include "unique_store_entry.h"
+
+namespace vespalib::datastore {
+
+template class BufferType<UniqueStoreEntry<int8_t>>;
+template class BufferType<UniqueStoreEntry<int16_t>>;
+template class BufferType<UniqueStoreEntry<int32_t>>;
+template class BufferType<UniqueStoreEntry<int64_t>>;
+template class BufferType<UniqueStoreEntry<uint32_t>>;
+template class BufferType<UniqueStoreEntry<float>>;
+template class BufferType<UniqueStoreEntry<double>>;
+
+template class UniqueStoreBufferType<UniqueStoreEntry<int8_t>>;
+template class UniqueStoreBufferType<UniqueStoreEntry<int16_t>>;
+template class UniqueStoreBufferType<UniqueStoreEntry<int32_t>>;
+template class UniqueStoreBufferType<UniqueStoreEntry<int64_t>>;
+template class UniqueStoreBufferType<UniqueStoreEntry<uint32_t>>;
+template class UniqueStoreBufferType<UniqueStoreEntry<float>>;
+template class UniqueStoreBufferType<UniqueStoreEntry<double>>;
+
+};
+
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.h b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.h
new file mode 100644
index 00000000000..2aba785dd91
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.h
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "buffer_type.h"
+#include <memory>
+
+namespace vespalib::alloc { class MemoryAllocator; }
+
+namespace vespalib::datastore {
+
+/*
+ * Class representing buffer type for a normal unique store allocator.
+ */
+template <typename WrappedEntry>
+class UniqueStoreBufferType : public BufferType<WrappedEntry>
+{
+ std::shared_ptr<alloc::MemoryAllocator> _memory_allocator;
+public:
+ UniqueStoreBufferType(uint32_t min_arrays, uint32_t max_arrays,
+ uint32_t num_arrays_for_new_buffer, float alloc_grow_factor,
+ std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept;
+ ~UniqueStoreBufferType() override;
+ const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override;
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.hpp
new file mode 100644
index 00000000000..c99033106ee
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_buffer_type.hpp
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "unique_store_buffer_type.h"
+#include "buffer_type.hpp"
+
+namespace vespalib::datastore {
+
+template <typename WrappedEntry>
+UniqueStoreBufferType<WrappedEntry>::UniqueStoreBufferType(uint32_t min_arrays, uint32_t max_arrays,
+ uint32_t num_arrays_for_new_buffer, float alloc_grow_factor,
+ std::shared_ptr<alloc::MemoryAllocator> memory_allocator) noexcept
+ : BufferType<WrappedEntry>(1u, min_arrays, max_arrays, num_arrays_for_new_buffer, alloc_grow_factor),
+ _memory_allocator(std::move(memory_allocator))
+{
+}
+
+template <typename WrappedEntry>
+UniqueStoreBufferType<WrappedEntry>::~UniqueStoreBufferType() = default;
+
+template <typename WrappedEntry>
+const vespalib::alloc::MemoryAllocator*
+UniqueStoreBufferType<WrappedEntry>::get_memory_allocator() const
+{
+ return _memory_allocator.get();
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp
index 664f1e4432e..9c639067615 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.cpp
@@ -33,8 +33,9 @@ get_type_id(size_t string_len)
}
-UniqueStoreSmallStringBufferType::UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays)
- : BufferType<char>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR)
+UniqueStoreSmallStringBufferType::UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator)
+ : BufferType<char>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR),
+ _memory_allocator(std::move(memory_allocator))
{
}
@@ -68,8 +69,15 @@ UniqueStoreSmallStringBufferType::cleanHold(void *buffer, size_t offset, ElemCou
assert(e == e_end);
}
-UniqueStoreExternalStringBufferType::UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays)
- : BufferType<UniqueStoreEntry<std::string>>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR)
+const vespalib::alloc::MemoryAllocator*
+UniqueStoreSmallStringBufferType::get_memory_allocator() const
+{
+ return _memory_allocator.get();
+}
+
+UniqueStoreExternalStringBufferType::UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator)
+ : BufferType<UniqueStoreEntry<std::string>>(array_size, 2u, max_arrays, NUM_ARRAYS_FOR_NEW_UNIQUESTORE_BUFFER, ALLOC_GROW_FACTOR),
+ _memory_allocator(std::move(memory_allocator))
{
}
@@ -86,6 +94,12 @@ UniqueStoreExternalStringBufferType::cleanHold(void *buffer, size_t offset, Elem
}
}
+const vespalib::alloc::MemoryAllocator*
+UniqueStoreExternalStringBufferType::get_memory_allocator() const
+{
+ return _memory_allocator.get();
+}
+
template class UniqueStoreStringAllocator<EntryRefT<22>>;
template class BufferType<UniqueStoreEntry<std::string>>;
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h
index 7b4c578f248..be5fa8f6c1e 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.h
@@ -10,6 +10,8 @@
#include <cassert>
#include <string>
+namespace vespalib::alloc { class MemoryAllocator; }
+
namespace vespalib::datastore {
namespace string_allocator {
@@ -56,22 +58,26 @@ public:
* bytes
*/
class UniqueStoreSmallStringBufferType : public BufferType<char> {
+ std::shared_ptr<vespalib::alloc::MemoryAllocator> _memory_allocator;
public:
- UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays);
+ UniqueStoreSmallStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator);
~UniqueStoreSmallStringBufferType() override;
void destroyElements(void *, ElemCount) override;
void fallbackCopy(void *newBuffer, const void *oldBuffer, ElemCount numElems) override;
void cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext) override;
+ const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override;
};
/*
* Buffer type for external strings in unique store.
*/
class UniqueStoreExternalStringBufferType : public BufferType<UniqueStoreEntry<std::string>> {
+ std::shared_ptr<vespalib::alloc::MemoryAllocator> _memory_allocator;
public:
- UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays);
+ UniqueStoreExternalStringBufferType(uint32_t array_size, uint32_t max_arrays, std::shared_ptr<vespalib::alloc::MemoryAllocator> memory_allocator);
~UniqueStoreExternalStringBufferType() override;
void cleanHold(void *buffer, size_t offset, ElemCount numElems, CleanContext cleanCtx) override;
+ const vespalib::alloc::MemoryAllocator* get_memory_allocator() const override;
};
/**
@@ -101,7 +107,7 @@ private:
static uint32_t get_type_id(const char *value);
public:
- UniqueStoreStringAllocator();
+ UniqueStoreStringAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator);
~UniqueStoreStringAllocator() override;
EntryRef allocate(const char *value);
void hold(EntryRef ref);
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp
index 90c44200fbb..71ea16bcde2 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_string_allocator.hpp
@@ -8,14 +8,14 @@
namespace vespalib::datastore {
template <typename RefT>
-UniqueStoreStringAllocator<RefT>::UniqueStoreStringAllocator()
+UniqueStoreStringAllocator<RefT>::UniqueStoreStringAllocator(std::shared_ptr<alloc::MemoryAllocator> memory_allocator)
: ICompactable(),
_store(),
_type_handlers()
{
- _type_handlers.emplace_back(std::make_unique<UniqueStoreExternalStringBufferType>(1, RefT::offsetSize()));
+ _type_handlers.emplace_back(std::make_unique<UniqueStoreExternalStringBufferType>(1, RefT::offsetSize(), memory_allocator));
for (auto size : string_allocator::array_sizes) {
- _type_handlers.emplace_back(std::make_unique<UniqueStoreSmallStringBufferType>(size, RefT::offsetSize()));
+ _type_handlers.emplace_back(std::make_unique<UniqueStoreSmallStringBufferType>(size, RefT::offsetSize(), memory_allocator));
}
uint32_t exp_type_id = 0;
for (auto &type_handler : _type_handlers) {
diff --git a/vespalib/src/vespa/vespalib/stllike/asciistream.cpp b/vespalib/src/vespa/vespalib/stllike/asciistream.cpp
index bedd0f119c6..eb127c7051a 100644
--- a/vespalib/src/vespa/vespalib/stllike/asciistream.cpp
+++ b/vespalib/src/vespa/vespalib/stllike/asciistream.cpp
@@ -3,15 +3,14 @@
#include "asciistream.h"
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/util/memory.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/alloc.h>
#include <vespa/vespalib/locale/c.h>
#include <vespa/fastos/file.h>
#include <limits>
-#include <stdexcept>
#include <cassert>
-#include <cmath>
#include <charconv>
+#include <vector>
#include <vespa/log/log.h>
LOG_SETUP(".vespalib.stllike.asciistream");
@@ -19,20 +18,23 @@ LOG_SETUP(".vespalib.stllike.asciistream");
namespace vespalib {
namespace {
- std::vector<string> getPrecisions(const char type) {
- std::vector<string> result(VESPALIB_ASCIISTREAM_MAX_PRECISION + 1);
- for (uint32_t i=0; i<result.size(); ++i) {
- char buf[8];
- int count = snprintf(buf, sizeof(buf), "%%.%u%c", i, type);
- assert(size_t(count) < sizeof(buf)); // Assert no truncation.
- (void) count;
- result[i] = buf;
- }
- return result;
+
+std::vector<string>
+getPrecisions(const char type) {
+ std::vector<string> result(VESPALIB_ASCIISTREAM_MAX_PRECISION + 1);
+ for (uint32_t i=0; i<result.size(); ++i) {
+ char buf[8];
+ int count = snprintf(buf, sizeof(buf), "%%.%u%c", i, type);
+ assert(size_t(count) < sizeof(buf)); // Assert no truncation.
+ (void) count;
+ result[i] = buf;
}
- std::vector<string> fixedPrecisions = getPrecisions('f');
- std::vector<string> scientificPrecisions = getPrecisions('e');
- std::vector<string> autoPrecisions = getPrecisions('g');
+ return result;
+}
+std::vector<string> fixedPrecisions = getPrecisions('f');
+std::vector<string> scientificPrecisions = getPrecisions('e');
+std::vector<string> autoPrecisions = getPrecisions('g');
+
}
asciistream &
@@ -93,7 +95,8 @@ asciistream::asciistream(const asciistream & rhs) :
{
}
-asciistream & asciistream::operator = (const asciistream & rhs)
+asciistream &
+asciistream::operator = (const asciistream & rhs)
{
if (this != &rhs) {
asciistream newStream(rhs);
@@ -108,7 +111,8 @@ asciistream::asciistream(asciistream && rhs) noexcept
swap(rhs);
}
-asciistream & asciistream::operator = (asciistream && rhs) noexcept
+asciistream &
+asciistream::operator = (asciistream && rhs) noexcept
{
if (this != &rhs) {
swap(rhs);
@@ -116,7 +120,8 @@ asciistream & asciistream::operator = (asciistream && rhs) noexcept
return *this;
}
-void asciistream::swap(asciistream & rhs) noexcept
+void
+asciistream::swap(asciistream & rhs) noexcept
{
std::swap(_rPos, rhs._rPos);
// If read-only, _wbuf is empty and _rbuf is set
@@ -150,7 +155,8 @@ void throwUnderflow(size_t pos) __attribute__((noinline));
template <typename T>
T strToInt(T & v, const char *begin, const char *end) __attribute__((noinline));
-void throwInputError(int e, const char * t, const char * buf)
+void
+throwInputError(int e, const char * t, const char * buf)
{
if (e == 0) {
throw IllegalArgumentException("Failed decoding a " + string(t) + " from '" + string(buf) + "'.", VESPA_STRLOC);
@@ -163,7 +169,8 @@ void throwInputError(int e, const char * t, const char * buf)
}
}
-void throwInputError(std::errc e, const char * t, const char * buf) {
+void
+throwInputError(std::errc e, const char * t, const char * buf) {
if (e == std::errc::invalid_argument) {
throw IllegalArgumentException("Illegal " + string(t) + " value '" + string(buf) + "'.", VESPA_STRLOC);
} else if (e == std::errc::result_out_of_range) {
@@ -173,12 +180,14 @@ void throwInputError(std::errc e, const char * t, const char * buf) {
}
}
-void throwUnderflow(size_t pos)
+void
+throwUnderflow(size_t pos)
{
throw IllegalArgumentException(make_string("buffer underflow at pos %ld.", pos), VESPA_STRLOC);
}
-int getValue(double & val, const char *buf)
+int
+getValue(double & val, const char *buf)
{
char *ebuf;
errno = 0;
@@ -189,7 +198,8 @@ int getValue(double & val, const char *buf)
return ebuf - buf;
}
-int getValue(float & val, const char *buf)
+int
+getValue(float & val, const char *buf)
{
char *ebuf;
errno = 0;
@@ -201,7 +211,8 @@ int getValue(float & val, const char *buf)
}
template <typename T>
-T strToInt(T & v, const char *begin, const char *end)
+T
+strToInt(T & v, const char *begin, const char *end)
{
const char * curr = begin;
for (;(curr < end) && std::isspace(*curr); curr++);
@@ -226,7 +237,8 @@ T strToInt(T & v, const char *begin, const char *end)
}
-asciistream & asciistream::operator >> (bool & v)
+asciistream &
+asciistream::operator >> (bool & v)
{
for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++);
if (_rPos < length()) {
@@ -237,7 +249,8 @@ asciistream & asciistream::operator >> (bool & v)
return *this;
}
-asciistream & asciistream::operator >> (char & v)
+asciistream &
+asciistream::operator >> (char & v)
{
for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++);
if (_rPos < length()) {
@@ -248,7 +261,8 @@ asciistream & asciistream::operator >> (char & v)
return *this;
}
-asciistream & asciistream::operator >> (signed char & v)
+asciistream &
+asciistream::operator >> (signed char & v)
{
for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++);
if (_rPos < length()) {
@@ -259,7 +273,8 @@ asciistream & asciistream::operator >> (signed char & v)
return *this;
}
-asciistream & asciistream::operator >> (unsigned char & v)
+asciistream &
+asciistream::operator >> (unsigned char & v)
{
for (;(_rPos < length()) && std::isspace(_rbuf[_rPos]); _rPos++);
if (_rPos < length()) {
@@ -270,55 +285,64 @@ asciistream & asciistream::operator >> (unsigned char & v)
return *this;
}
-asciistream & asciistream::operator >> (unsigned short & v)
+asciistream &
+asciistream::operator >> (unsigned short & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (unsigned int & v)
+asciistream &
+asciistream::operator >> (unsigned int & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (unsigned long & v)
+asciistream &
+asciistream::operator >> (unsigned long & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (unsigned long long & v)
+asciistream &
+asciistream::operator >> (unsigned long long & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (short & v)
+asciistream &
+asciistream::operator >> (short & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (int & v)
+asciistream &
+asciistream::operator >> (int & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (long & v)
+asciistream &
+asciistream::operator >> (long & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (long long & v)
+asciistream &
+asciistream::operator >> (long long & v)
{
_rPos += strToInt(v, &_rbuf[_rPos], &_rbuf[length()]);
return *this;
}
-asciistream & asciistream::operator >> (double & v)
+asciistream &
+asciistream::operator >> (double & v)
{
double l(0);
_rPos += getValue(l, &_rbuf[_rPos]);
@@ -326,7 +350,8 @@ asciistream & asciistream::operator >> (double & v)
return *this;
}
-asciistream & asciistream::operator >> (float & v)
+asciistream &
+asciistream::operator >> (float & v)
{
float l(0);
_rPos += getValue(l, &_rbuf[_rPos]);
@@ -334,7 +359,8 @@ asciistream & asciistream::operator >> (float & v)
return *this;
}
-void asciistream::eatWhite()
+void
+asciistream::eatWhite()
{
for (;(_rPos < length()) && isspace(_rbuf[_rPos]); _rPos++);
}
@@ -344,7 +370,8 @@ void asciistream::eatNonWhite()
for (;(_rPos < length()) && !isspace(_rbuf[_rPos]); _rPos++);
}
-asciistream & asciistream::operator >> (std::string & v)
+asciistream &
+asciistream::operator >> (std::string & v)
{
eatWhite();
size_t start(_rPos);
@@ -353,7 +380,8 @@ asciistream & asciistream::operator >> (std::string & v)
return *this;
}
-asciistream & asciistream::operator >> (string & v)
+asciistream &
+asciistream::operator >> (string & v)
{
eatWhite();
size_t start(_rPos);
@@ -366,7 +394,8 @@ asciistream & asciistream::operator >> (string & v)
namespace {
const char * _C_char = "0123456789abcdefg";
-char * prependInt(char * tmp, Base base)
+char *
+prependInt(char * tmp, Base base)
{
if (base == bin) {
tmp[1] = 'b';
@@ -376,7 +405,8 @@ char * prependInt(char * tmp, Base base)
return tmp + 2;
}
-char * prependSign(bool sign, char * tmp)
+char *
+prependSign(bool sign, char * tmp)
{
if (sign) {
tmp[0] = '-';
@@ -389,7 +419,8 @@ template <uint8_t base>
uint8_t printInt(unsigned long long r, char * tmp, uint8_t i) __attribute__((noinline));
template <uint8_t base>
-uint8_t printInt(unsigned long long r, char * tmp, uint8_t i)
+uint8_t
+printInt(unsigned long long r, char * tmp, uint8_t i)
{
for(; r; i--, r/=base) {
uint8_t d = r%base;
@@ -401,7 +432,8 @@ uint8_t printInt(unsigned long long r, char * tmp, uint8_t i)
}
-asciistream & asciistream::operator << (long long v)
+asciistream &
+asciistream::operator << (long long v)
{
char tmp[72];
uint8_t i(sizeof(tmp));
@@ -432,14 +464,16 @@ asciistream & asciistream::operator << (long long v)
return *this;
}
-void asciistream::doReallyFill(size_t currWidth)
+void
+asciistream::doReallyFill(size_t currWidth)
{
for (; _width > currWidth; currWidth++) {
write(&_fill, 1);
}
}
-asciistream & asciistream::operator << (unsigned long long v)
+asciistream &
+asciistream::operator << (unsigned long long v)
{
char tmp[72];
uint8_t i(sizeof(tmp));
@@ -477,13 +511,15 @@ struct BaseStateSaver {
};
}
-asciistream& asciistream::operator<<(const void* p)
+asciistream &
+asciistream::operator<<(const void* p)
{
BaseStateSaver saver(*this, _base);
return *this << "0x" << hex << reinterpret_cast<uint64_t>(p);
}
-asciistream & asciistream::operator << (float v)
+asciistream &
+asciistream::operator << (float v)
{
if (_floatSpec == fixed) {
printFixed(v);
@@ -493,7 +529,8 @@ asciistream & asciistream::operator << (float v)
return *this;
}
-asciistream & asciistream::operator << (double v)
+asciistream &
+asciistream::operator << (double v)
{
if (_floatSpec == fixed) {
printFixed(v);
@@ -516,20 +553,20 @@ void asciistream::printFixed(T v)
}
namespace {
- bool hasDotOrIsScientific(const char* string, size_t len) {
- for (size_t i=0; i<len; ++i) {
- switch (string[i]) {
- case '.':
- case ',':
- case 'e':
- case 'E':
- return true;
- default:
- break;
- }
+bool hasDotOrIsScientific(const char* string, size_t len) {
+ for (size_t i=0; i<len; ++i) {
+ switch (string[i]) {
+ case '.':
+ case ',':
+ case 'e':
+ case 'E':
+ return true;
+ default:
+ break;
}
- return false;
}
+ return false;
+}
}
template <typename T>
@@ -548,7 +585,8 @@ void asciistream::printScientific(T v)
}
}
-void asciistream::write(const void * buf, size_t len)
+void
+asciistream::write(const void * buf, size_t len)
{
if (_rPos > 0 && _rPos == length()) {
clear();
@@ -564,16 +602,8 @@ void asciistream::write(const void * buf, size_t len)
_rbuf = _wbuf;
}
-std::vector<string> asciistream::getlines(char delim)
-{
- std::vector<string> lines;
- while (!eof()) {
- lines.push_back(getline(delim));
- }
- return lines;
-}
-
-string asciistream::getline(char delim)
+string
+asciistream::getline(char delim)
{
string line;
const size_t start(_rPos);
@@ -588,7 +618,8 @@ string asciistream::getline(char delim)
return line;
}
-asciistream asciistream::createFromFile(stringref fileName)
+asciistream
+asciistream::createFromFile(stringref fileName)
{
FastOS_File file(vespalib::string(fileName).c_str());
asciistream is;
@@ -597,32 +628,35 @@ asciistream asciistream::createFromFile(stringref fileName)
if (sz < 0) {
throw IoException("Failed getting size of file " + fileName + " : Error=" + file.getLastErrorString(), IoException::UNSPECIFIED, VESPA_STRLOC);
}
- MallocPtr buf(sz);
- ssize_t actual = file.Read(buf, sz);
+ alloc::Alloc buf = alloc::Alloc::alloc(sz);
+ ssize_t actual = file.Read(buf.get(), sz);
if (actual != sz) {
asciistream e;
e << "Failed reading " << sz << " bytes from file " << fileName;
throw IoException(e.str() + " : Error=" + file.getLastErrorString(), IoException::UNSPECIFIED, VESPA_STRLOC);
}
- is << stringref(buf.c_str(), buf.size());
+ is << stringref(static_cast<const char *>(buf.get()), sz);
}
return is;
}
-asciistream asciistream::createFromDevice(stringref fileName)
+asciistream
+asciistream::createFromDevice(stringref fileName)
{
FastOS_File file(vespalib::string(fileName).c_str());
asciistream is;
if (file.OpenReadOnly()) {
- MallocPtr buf(64_Ki);
- for (ssize_t actual = file.Read(buf, buf.size()); actual > 0; actual = file.Read(buf, buf.size())) {
- is << stringref(buf.c_str(), actual);
+ constexpr size_t SZ = 64_Ki;
+ auto buf = std::make_unique<char []>(SZ);
+ for (ssize_t actual = file.Read(buf.get(), SZ); actual > 0; actual = file.Read(buf.get(), SZ)) {
+ is << stringref(buf.get(), actual);
}
}
return is;
}
-ssize_t getline(asciistream & is, string & line, char delim)
+ssize_t
+getline(asciistream & is, string & line, char delim)
{
line = is.getline(delim);
return line.size();
diff --git a/vespalib/src/vespa/vespalib/stllike/asciistream.h b/vespalib/src/vespa/vespalib/stllike/asciistream.h
index 6a4b4378634..b333be6cec2 100644
--- a/vespalib/src/vespa/vespalib/stllike/asciistream.h
+++ b/vespalib/src/vespa/vespalib/stllike/asciistream.h
@@ -2,7 +2,6 @@
#pragma once
#include <vespa/vespalib/stllike/string.h>
-#include <vector>
namespace vespalib {
@@ -153,7 +152,6 @@ public:
static asciistream createFromFile(stringref fileName);
static asciistream createFromDevice(stringref fileName);
string getline(char delim='\n');
- std::vector<string> getlines(char delim='\n');
char getFill() const noexcept { return _fill; }
size_t getWidth() const noexcept { return static_cast<size_t>(_width); } // match input type of setw
Base getBase() const noexcept { return _base; }
diff --git a/vespalib/src/vespa/vespalib/test/CMakeLists.txt b/vespalib/src/vespa/vespalib/test/CMakeLists.txt
index d658b28f94b..a60eb15a4d4 100644
--- a/vespalib/src/vespa/vespalib/test/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/test/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_library(vespalib_vespalib_test OBJECT
make_tls_options_for_testing.cpp
memory_allocator_observer.cpp
peer_policy_utils.cpp
+ thread_meets.cpp
time_tracer.cpp
DEPENDS
)
diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.cpp b/vespalib/src/vespa/vespalib/test/thread_meets.cpp
new file mode 100644
index 00000000000..9d23e0eab28
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/test/thread_meets.cpp
@@ -0,0 +1,12 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "thread_meets.h"
+
+namespace vespalib::test {
+
+void
+ThreadMeets::Nop::mingle()
+{
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.h b/vespalib/src/vespa/vespalib/test/thread_meets.h
new file mode 100644
index 00000000000..62ca7779935
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/test/thread_meets.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/rendezvous.h>
+
+namespace vespalib::test {
+
+/**
+ * Generally useful rendezvous implementations.
+ **/
+struct ThreadMeets {
+ // can be used as a simple thread barrier
+ struct Nop : vespalib::Rendezvous<bool,bool> {
+ Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {}
+ void operator()() { rendezvous(false); }
+ void mingle() override;
+ };
+ // swap values between 2 threads
+ template <typename T>
+ struct Swap : vespalib::Rendezvous<T,T> {
+ using vespalib::Rendezvous<T,T>::in;
+ using vespalib::Rendezvous<T,T>::out;
+ using vespalib::Rendezvous<T,T>::rendezvous;
+ Swap() : vespalib::Rendezvous<T,T>(2) {}
+ T operator()(T input) { return rendezvous(input); }
+ void mingle() override {
+ out(1) = in(0);
+ out(0) = in(1);
+ }
+ };
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 32f679d22d7..752bf60c688 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -42,6 +42,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
mmap_file_allocator.cpp
mmap_file_allocator_factory.cpp
monitored_refcount.cpp
+ nice.cpp
printable.cpp
priority_queue.cpp
random.cpp
@@ -56,6 +57,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
runnable_pair.cpp
sequence.cpp
sha1.cpp
+ shared_operation_throttler.cpp
shared_string_repo.cpp
sig_catch.cpp
signalhandler.cpp
diff --git a/vespalib/src/vespa/vespalib/util/alloc.cpp b/vespalib/src/vespa/vespalib/util/alloc.cpp
index 69f2eedcecb..bf5f0200600 100644
--- a/vespalib/src/vespa/vespalib/util/alloc.cpp
+++ b/vespalib/src/vespa/vespalib/util/alloc.cpp
@@ -381,6 +381,9 @@ MMapAllocator::sresize_inplace(PtrAndSize current, size_t newSize) {
size_t
MMapAllocator::extend_inplace(PtrAndSize current, size_t newSize) {
+ if (current.second == 0u) {
+ return 0u;
+ }
PtrAndSize got = MMapAllocator::salloc(newSize - current.second, static_cast<char *>(current.first)+current.second);
if ((static_cast<const char *>(current.first) + current.second) == static_cast<const char *>(got.first)) {
return current.second + got.second;
@@ -477,6 +480,9 @@ Alloc::allocHeap(size_t sz)
bool
Alloc::resize_inplace(size_t newSize)
{
+ if (newSize == 0u) {
+ return size() == 0u;
+ }
size_t extendedSize = _allocator->resize_inplace(_alloc, newSize);
if (extendedSize >= newSize) {
_alloc.second = extendedSize;
diff --git a/vespalib/src/vespa/vespalib/util/alloc.h b/vespalib/src/vespa/vespalib/util/alloc.h
index d25fb6f6a7c..4066894b4e3 100644
--- a/vespalib/src/vespa/vespalib/util/alloc.h
+++ b/vespalib/src/vespa/vespalib/util/alloc.h
@@ -62,6 +62,13 @@ public:
std::swap(_alloc, rhs._alloc);
std::swap(_allocator, rhs._allocator);
}
+ void reset() {
+ if (_alloc.first != nullptr) {
+ _allocator->free(_alloc);
+ _alloc.first = nullptr;
+ _alloc.second = 0u;
+ }
+ }
Alloc create(size_t sz) const noexcept {
return (sz == 0) ? Alloc(_allocator) : Alloc(_allocator, sz);
}
diff --git a/vespalib/src/vespa/vespalib/util/array.h b/vespalib/src/vespa/vespalib/util/array.h
index 30d87cd98f6..cb5d5c7cc63 100644
--- a/vespalib/src/vespa/vespalib/util/array.h
+++ b/vespalib/src/vespa/vespalib/util/array.h
@@ -135,6 +135,7 @@ public:
std::destroy(array(0), array(_sz));
_sz = 0;
}
+ void reset();
bool empty() const { return _sz == 0; }
T & operator [] (size_t i) { return *array(i); }
const T & operator [] (size_t i) const { return *array(i); }
@@ -145,6 +146,7 @@ public:
rhs._sz = 0;
return std::move(rhs._array);
}
+ Array<T> create() const;
private:
T * array(size_t i) { return static_cast<T *>(_array.get()) + i; }
const T * array(size_t i) const { return static_cast<const T *>(_array.get()) + i; }
diff --git a/vespalib/src/vespa/vespalib/util/array.hpp b/vespalib/src/vespa/vespalib/util/array.hpp
index e9070f5759c..72178f0391b 100644
--- a/vespalib/src/vespa/vespalib/util/array.hpp
+++ b/vespalib/src/vespa/vespalib/util/array.hpp
@@ -205,5 +205,20 @@ void Array<T>::cleanup()
Alloc().swap(_array);
}
+template <typename T>
+void Array<T>::reset()
+{
+ std::destroy(array(0), array(_sz));
+ _sz = 0;
+ _array.reset();
+}
+
+template <typename T>
+Array<T>
+Array<T>::create() const
+{
+ return Array<T>(_array); // Use same memory allocator
+}
+
}
diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
index 73e70e7fd89..8f3dd8ab006 100644
--- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
+++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
@@ -2,11 +2,11 @@
#pragma once
-#include <stdint.h>
-#include <stdlib.h>
+#include "traits.h"
+#include <cstdint>
+#include <cstdlib>
#include <cassert>
#include <algorithm>
-#include "traits.h"
namespace vespalib {
diff --git a/vespalib/src/vespa/vespalib/util/atomic.h b/vespalib/src/vespa/vespalib/util/atomic.h
new file mode 100644
index 00000000000..8bf258ffa50
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/atomic.h
@@ -0,0 +1,151 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <atomic>
+#include <type_traits>
+#include <version>
+
+/**
+ * Utility functions for single value atomic memory accesses.
+ *
+ * store/load_ref_* functions can be used to provide well-defined atomic
+ * memory access to memory locations that aren't explicitly wrapped in std::atomic
+ * objects. In this case, all potentially racing loads/stores _must_ be through
+ * atomic utility functions (or atomic_ref).
+ *
+ * Non-ref store/load_* functions are just syntactic sugar to make code using
+ * atomics more readable, but additionally adds sanity checks that all atomics
+ * are always lock-free.
+ */
+
+namespace vespalib::atomic {
+
+//
+// std::atomic_ref<T> helpers
+//
+
+namespace detail {
+template <typename T> struct is_std_atomic : std::false_type {};
+template <typename T> struct is_std_atomic<std::atomic<T>> : std::true_type {};
+template <typename T> inline constexpr bool is_std_atomic_v = is_std_atomic<T>::value;
+}
+
+// TODO can generalize atomic_ref code once no special casing is needed
+
+template <typename T1, typename T2>
+constexpr void store_ref_relaxed(T1& lhs, T2&& v) noexcept {
+ static_assert(!detail::is_std_atomic_v<T1>, "atomic ref function invoked with a std::atomic, probably not intended");
+#if __cpp_lib_atomic_ref
+ static_assert(std::atomic_ref<T1>::is_always_lock_free);
+ std::atomic_ref<T1>(lhs).store(std::forward<T2>(v), std::memory_order_relaxed);
+#else
+ // TODO replace with compiler intrinsic
+ lhs = std::forward<T2>(v);
+#endif
+}
+
+template <typename T1, typename T2>
+constexpr void store_ref_release(T1& lhs, T2&& v) noexcept {
+ static_assert(!detail::is_std_atomic_v<T1>, "atomic ref function invoked with a std::atomic, probably not intended");
+#if __cpp_lib_atomic_ref
+ static_assert(std::atomic_ref<T1>::is_always_lock_free);
+ std::atomic_ref<T1>(lhs).store(std::forward<T2>(v), std::memory_order_release);
+#else
+ // TODO replace with compiler intrinsic
+ lhs = std::forward<T2>(v);
+ std::atomic_thread_fence(std::memory_order_release);
+#endif
+}
+
+template <typename T1, typename T2>
+constexpr void store_ref_seq_cst(T1& lhs, T2&& v) noexcept {
+ static_assert(!detail::is_std_atomic_v<T1>, "atomic ref function invoked with a std::atomic, probably not intended");
+#if __cpp_lib_atomic_ref
+ static_assert(std::atomic_ref<T1>::is_always_lock_free);
+ std::atomic_ref<T1>(lhs).store(std::forward<T2>(v), std::memory_order_seq_cst);
+#else
+ // TODO replace with compiler intrinsic
+ lhs = std::forward<T2>(v);
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+#endif
+}
+
+template <typename T>
+[[nodiscard]] constexpr T load_ref_relaxed(const T& a) noexcept {
+ static_assert(!detail::is_std_atomic_v<T>, "atomic ref function invoked with a std::atomic, probably not intended");
+#if __cpp_lib_atomic_ref
+ static_assert(std::atomic_ref<const T>::is_always_lock_free);
+ return std::atomic_ref<const T>(a).load(std::memory_order_relaxed);
+#else
+ // TODO replace with compiler intrinsic
+ return a;
+#endif
+}
+
+template <typename T>
+[[nodiscard]] constexpr T load_ref_acquire(const T& a) noexcept {
+ static_assert(!detail::is_std_atomic_v<T>, "atomic ref function invoked with a std::atomic, probably not intended");
+#if __cpp_lib_atomic_ref
+ static_assert(std::atomic_ref<const T>::is_always_lock_free);
+ return std::atomic_ref<const T>(a).load(std::memory_order_acquire);
+#else
+ // TODO replace with compiler intrinsic
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return a;
+#endif
+}
+
+template <typename T>
+[[nodiscard]] constexpr T load_ref_seq_cst(const T& a) noexcept {
+ static_assert(!detail::is_std_atomic_v<T>, "atomic ref function invoked with a std::atomic, probably not intended");
+#if __cpp_lib_atomic_ref
+ static_assert(std::atomic_ref<const T>::is_always_lock_free);
+ return std::atomic_ref<const T>(a).load(std::memory_order_seq_cst);
+#else
+ // TODO replace with compiler intrinsic
+ std::atomic_thread_fence(std::memory_order_seq_cst);
+ return a;
+#endif
+}
+
+//
+// std::atomic<T> helpers
+//
+
+template <typename T1, typename T2>
+constexpr void store_relaxed(std::atomic<T1>& lhs, T2&& v) noexcept {
+ static_assert(std::atomic<T1>::is_always_lock_free);
+ lhs.store(std::forward<T2>(v), std::memory_order_relaxed);
+}
+
+template <typename T1, typename T2>
+constexpr void store_release(std::atomic<T1>& lhs, T2&& v) noexcept {
+ static_assert(std::atomic<T1>::is_always_lock_free);
+ lhs.store(std::forward<T2>(v), std::memory_order_release);
+}
+
+template <typename T1, typename T2>
+constexpr void store_seq_cst(std::atomic<T1>& lhs, T2&& v) noexcept {
+ static_assert(std::atomic<T1>::is_always_lock_free);
+ lhs.store(std::forward<T2>(v), std::memory_order_seq_cst);
+}
+
+template <typename T>
+[[nodiscard]] constexpr T load_relaxed(const std::atomic<T>& a) noexcept {
+ static_assert(std::atomic<T>::is_always_lock_free);
+ return a.load(std::memory_order_relaxed);
+}
+
+template <typename T>
+[[nodiscard]] constexpr T load_acquire(const std::atomic<T>& a) noexcept {
+ static_assert(std::atomic<T>::is_always_lock_free);
+ return a.load(std::memory_order_acquire);
+}
+
+template <typename T>
+[[nodiscard]] constexpr T load_seq_cst(const std::atomic<T>& a) noexcept {
+ static_assert(std::atomic<T>::is_always_lock_free);
+ return a.load(std::memory_order_seq_cst);
+}
+
+} // vespalib::atomic
diff --git a/vespalib/src/vespa/vespalib/util/child_process.cpp b/vespalib/src/vespa/vespalib/util/child_process.cpp
index 7193a445f9a..694bed60f1b 100644
--- a/vespalib/src/vespa/vespalib/util/child_process.cpp
+++ b/vespalib/src/vespa/vespalib/util/child_process.cpp
@@ -67,7 +67,9 @@ ChildProcess::Reader::OnReceiveData(const void *data, size_t length)
return;
}
if (buf == nullptr) { // EOF
- _gotEOF = true;
+ if (--_num_streams == 0) {
+ _gotEOF = true;
+ }
} else {
_queue.push(std::string(buf, length));
}
@@ -107,11 +109,12 @@ ChildProcess::Reader::updateEOF()
}
-ChildProcess::Reader::Reader()
+ChildProcess::Reader::Reader(int num_streams)
: _lock(),
_cond(),
_queue(),
_data(),
+ _num_streams(num_streams),
_gotEOF(false),
_waitCnt(0),
_readEOF(false)
@@ -203,7 +206,7 @@ ChildProcess::checkProc()
ChildProcess::ChildProcess(const char *cmd)
- : _reader(),
+ : _reader(1),
_proc(cmd, true, &_reader),
_running(false),
_failed(false),
@@ -213,6 +216,17 @@ ChildProcess::ChildProcess(const char *cmd)
_failed = !_running;
}
+ChildProcess::ChildProcess(const char *cmd, capture_stderr_tag)
+ : _reader(2),
+ _proc(cmd, true, &_reader, &_reader),
+ _running(false),
+ _failed(false),
+ _exitCode(-918273645)
+{
+ _running = _proc.CreateWithShell();
+ _failed = !_running;
+}
+
ChildProcess::~ChildProcess() = default;
diff --git a/vespalib/src/vespa/vespalib/util/child_process.h b/vespalib/src/vespa/vespalib/util/child_process.h
index 646a2c7c6c9..877c56a8cb1 100644
--- a/vespalib/src/vespa/vespalib/util/child_process.h
+++ b/vespalib/src/vespa/vespalib/util/child_process.h
@@ -28,6 +28,7 @@ private:
std::condition_variable _cond;
std::queue<std::string> _queue;
std::string _data;
+ int _num_streams;
bool _gotEOF;
int _waitCnt;
bool _readEOF;
@@ -38,7 +39,7 @@ private:
void updateEOF();
public:
- Reader();
+ Reader(int num_streams);
~Reader() override;
uint32_t read(char *buf, uint32_t len, int msTimeout);
@@ -57,6 +58,7 @@ private:
public:
ChildProcess(const ChildProcess &) = delete;
ChildProcess &operator=(const ChildProcess &) = delete;
+ struct capture_stderr_tag{};
/**
* @brief Run a child process
@@ -66,6 +68,15 @@ public:
**/
explicit ChildProcess(const char *cmd);
+ /**
+ * @brief Run a child process
+ *
+ * Starts a process running the given command. stderr is
+ * redirected into stdout.
+ * @param cmd A shell command line to run
+ **/
+ explicit ChildProcess(const char *cmd, capture_stderr_tag);
+
/** @brief destructor doing cleanup if needed */
~ChildProcess();
diff --git a/vespalib/src/vespa/vespalib/util/count_down_latch.h b/vespalib/src/vespa/vespalib/util/count_down_latch.h
index d543d773909..613a60e90c5 100644
--- a/vespalib/src/vespa/vespalib/util/count_down_latch.h
+++ b/vespalib/src/vespa/vespalib/util/count_down_latch.h
@@ -22,7 +22,7 @@ namespace vespalib {
class CountDownLatch
{
private:
- std::mutex _lock;
+ mutable std::mutex _lock;
std::condition_variable _cond;
uint32_t _count;
@@ -44,7 +44,7 @@ public:
* blocked in the await method will be unblocked.
**/
void countDown() {
- std::lock_guard<std::mutex> guard(_lock);
+ std::lock_guard guard(_lock);
if (_count != 0) {
--_count;
if (_count == 0) {
@@ -59,7 +59,7 @@ public:
* reduce the count to 0.
**/
void await() {
- std::unique_lock<std::mutex> guard(_lock);
+ std::unique_lock guard(_lock);
_cond.wait(guard, [this]() { return (_count == 0); });
}
@@ -72,7 +72,7 @@ public:
* @return true if the counter reached 0, false if we timed out
**/
bool await(vespalib::duration maxwait) {
- std::unique_lock<std::mutex> guard(_lock);
+ std::unique_lock guard(_lock);
return _cond.wait_for(guard, maxwait, [this]() { return (_count == 0); });
}
@@ -82,7 +82,10 @@ public:
*
* @return current count
**/
- uint32_t getCount() const { return _count; }
+ [[nodiscard]] uint32_t getCount() const noexcept {
+ std::lock_guard guard(_lock);
+ return _count;
+ }
/**
* Empty. Needs to be virtual to reduce compiler warnings.
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
index 4eee0a63870..97dbedad66f 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.cpp
@@ -3,6 +3,10 @@
#include "cpu_usage.h"
#include "require.h"
#include <pthread.h>
+#include <optional>
+#include <cassert>
+
+#include <sys/resource.h>
namespace vespalib {
@@ -13,11 +17,11 @@ namespace {
class DummyThreadSampler : public ThreadSampler {
private:
steady_time _start;
- double _load;
+ double _util;
public:
- DummyThreadSampler(double load) : _start(steady_clock::now()), _load(load) {}
- duration sample() const override {
- return from_s(to_s(steady_clock::now() - _start) * _load);
+ DummyThreadSampler(double util) : _start(steady_clock::now()), _util(util) {}
+ duration sample() const noexcept override {
+ return from_s(to_s(steady_clock::now() - _start) * _util);
}
};
@@ -30,9 +34,10 @@ public:
LinuxThreadSampler() : _my_clock() {
REQUIRE_EQ(pthread_getcpuclockid(pthread_self(), &_my_clock), 0);
}
- duration sample() const override {
+ duration sample() const noexcept override {
timespec ts;
- REQUIRE_EQ(clock_gettime(_my_clock, &ts), 0);
+ memset(&ts, 0, sizeof(ts));
+ clock_gettime(_my_clock, &ts);
return from_timespec(ts);
}
};
@@ -41,16 +46,246 @@ public:
} // <unnamed>
-ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_load) {
+duration total_cpu_usage() noexcept {
+ timespec ts;
+ memset(&ts, 0, sizeof(ts));
+ clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts);
+ return from_timespec(ts);
+}
+
+ThreadSampler::UP create_thread_sampler(bool force_mock_impl, double expected_util) {
if (force_mock_impl) {
- return std::make_unique<DummyThreadSampler>(expected_load);
+ return std::make_unique<DummyThreadSampler>(expected_util);
}
#ifdef __linux__
return std::make_unique<LinuxThreadSampler>();
#endif
- return std::make_unique<DummyThreadSampler>(expected_load);
+ return std::make_unique<DummyThreadSampler>(expected_util);
}
} // cpu_usage
+CpuUsage::ThreadTrackerImpl::ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler)
+ : _lock(),
+ _cat(Category::OTHER),
+ _old_usage(),
+ _sampler(std::move(sampler)),
+ _pending()
+{
+}
+
+CpuUsage::Category
+CpuUsage::ThreadTrackerImpl::set_category(Category new_cat) noexcept
+{
+ // only owning thread may change category
+ if (new_cat == _cat) {
+ return new_cat;
+ }
+ Guard guard(_lock);
+ duration new_usage = _sampler->sample();
+ if (_cat != Category::OTHER) {
+ _pending[_cat] += (new_usage - _old_usage);
+ }
+ _old_usage = new_usage;
+ auto old_cat = _cat;
+ _cat = new_cat;
+ return old_cat;
+}
+
+CpuUsage::Sample
+CpuUsage::ThreadTrackerImpl::sample() noexcept
+{
+ Guard guard(_lock);
+ if (_cat != Category::OTHER) {
+ duration new_usage = _sampler->sample();
+ _pending[_cat] += (new_usage - _old_usage);
+ _old_usage = new_usage;
+ }
+ Sample sample = _pending;
+ _pending = Sample();
+ return sample;
+}
+
+vespalib::string &
+CpuUsage::name_of(Category cat)
+{
+ static std::array<vespalib::string,num_categories> names = {"setup", "read", "write", "compact", "other"};
+ return names[index_of(cat)];
+}
+
+CpuUsage::Category
+CpuUsage::MyUsage::set_cpu_category_for_this_thread(Category cat) noexcept
+{
+ struct Wrapper {
+ std::shared_ptr<ThreadTrackerImpl> self;
+ Wrapper() : self(std::make_shared<ThreadTrackerImpl>(cpu_usage::create_thread_sampler())) {
+ CpuUsage::self().add_thread(self);
+ }
+ ~Wrapper() {
+ self->set_category(CpuUsage::Category::OTHER);
+ CpuUsage::self().remove_thread(std::move(self));
+ }
+ };
+ thread_local Wrapper wrapper;
+ return wrapper.self->set_category(cat);
+}
+
+CpuUsage::CpuUsage()
+ : _lock(),
+ _usage(),
+ _threads(),
+ _sampling(false),
+ _conflict(),
+ _pending_add(),
+ _pending_remove()
+{
+}
+
+CpuUsage &
+CpuUsage::self()
+{
+ static CpuUsage me;
+ return me;
+}
+
+void
+CpuUsage::do_add_thread(const Guard &, ThreadTracker::SP tracker)
+{
+ assert(!_sampling);
+ auto *key = tracker.get();
+ auto [ignore, was_inserted] = _threads.emplace(key, std::move(tracker));
+ assert(was_inserted);
+}
+
+void
+CpuUsage::do_remove_thread(const Guard &, ThreadTracker::SP tracker)
+{
+ assert(!_sampling);
+ _usage.merge(tracker->sample());
+ auto was_removed = _threads.erase(tracker.get());
+ assert(was_removed);
+}
+
+void
+CpuUsage::add_thread(ThreadTracker::SP tracker)
+{
+ Guard guard(_lock);
+ if (_sampling) {
+ _pending_add.push_back(std::move(tracker));
+ } else {
+ do_add_thread(guard, std::move(tracker));
+ }
+}
+
+void
+CpuUsage::remove_thread(ThreadTracker::SP tracker)
+{
+ Guard guard(_lock);
+ if (_sampling) {
+ _pending_remove.push_back(std::move(tracker));
+ } else {
+ do_remove_thread(guard, std::move(tracker));
+ }
+}
+
+void
+CpuUsage::handle_pending(const Guard &guard)
+{
+ for (auto &thread: _pending_add) {
+ do_add_thread(guard, std::move(thread));
+ }
+ _pending_add.clear();
+ for (auto &thread: _pending_remove) {
+ do_remove_thread(guard, std::move(thread));
+ }
+ _pending_remove.clear();
+}
+
+CpuUsage::TimedSample
+CpuUsage::do_sample()
+{
+ assert(_sampling);
+ Sample my_sample;
+ std::optional<std::promise<TimedSample>> my_promise;
+ auto t = steady_clock::now();
+ for (const auto &entry: _threads) {
+ my_sample.merge(entry.first->sample());
+ }
+ {
+ Guard guard(_lock);
+ _sampling = false;
+ handle_pending(guard);
+ if (_conflict) {
+ my_promise = std::move(_conflict->sample_promise);
+ _conflict.reset();
+ }
+ my_sample.merge(_usage);
+ _usage = my_sample;
+ }
+ auto total = cpu_usage::total_cpu_usage();
+ for (size_t i = 0; i < index_of(Category::OTHER); ++i) {
+ total -= my_sample[i];
+ }
+ my_sample[Category::OTHER] = std::max(total, duration::zero());
+ TimedSample result{t, my_sample};
+ if (my_promise.has_value()) {
+ my_promise.value().set_value(result);
+ }
+ return result;
+}
+
+CpuUsage::TimedSample
+CpuUsage::sample_or_wait()
+{
+ std::shared_future<TimedSample> my_future;
+ {
+ Guard guard(_lock);
+ if (_sampling) {
+ if (!_conflict) {
+ _conflict = std::make_unique<SampleConflict>();
+ }
+ my_future = _conflict->future_sample;
+ _conflict->waiters++;
+ } else {
+ _sampling = true;
+ }
+ }
+ if (my_future.valid()) {
+ return my_future.get();
+ } else {
+ return do_sample();
+ }
+}
+
+CpuUsage::TimedSample
+CpuUsage::sample()
+{
+ return self().sample_or_wait();
+}
+
+Runnable::init_fun_t
+CpuUsage::wrap(Runnable::init_fun_t init, Category cat)
+{
+ return [init,cat](Runnable &target) {
+ auto my_usage = CpuUsage::use(cat);
+ return init(target);
+ };
+}
+
+Executor::Task::UP
+CpuUsage::wrap(Executor::Task::UP task, Category cat)
+{
+ struct CpuTask : Executor::Task {
+ UP task;
+ Category cat;
+ CpuTask(UP task_in, Category cat_in)
+ : task(std::move(task_in)), cat(cat_in) {}
+ void run() override {
+ auto my_usage = CpuUsage::use(cat);
+ task->run();
+ }
+ };
+ return std::make_unique<CpuTask>(std::move(task), cat);
+}
+
} // namespace
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h
index 09509a984b5..31309d88f5f 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.h
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h
@@ -1,25 +1,218 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+#include "runnable.h"
+#include "executor.h"
+#include "spin_lock.h"
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/stllike/string.h>
+#include <array>
#include <memory>
+#include <future>
+#include <vector>
+#include <map>
namespace vespalib {
namespace cpu_usage {
/**
+ * Samples the total CPU usage of this process so far.
+ **/
+duration total_cpu_usage() noexcept;
+
+/**
* Samples the total CPU usage of the thread that created it. Note
* that this must not be used after thread termination. Enables
* sampling the CPU usage of a thread from outside the thread.
**/
struct ThreadSampler {
using UP = std::unique_ptr<ThreadSampler>;
- virtual duration sample() const = 0;
+ virtual duration sample() const noexcept = 0;
virtual ~ThreadSampler() {}
};
-ThreadSampler::UP create_thread_sampler(bool force_mock_impl = false, double expected_load = 0.16);
+ThreadSampler::UP create_thread_sampler(bool force_mock_impl = false, double expected_util = 0.16);
} // cpu_usage
+/**
+ * Tracks accumulative cpu usage across threads and work
+ * categories. Use the 'use' function to signal what kind of CPU you
+ * are using in the current thread. Use the 'sample' function to get a
+ * complete view of CPU usage so far.
+ *
+ * The 'use' function returns a MyUsage object that needs to be kept
+ * alive for as long as the current thread should contribute to the
+ * specified cpu use. Note that MyUsage instances may shadow each
+ * other, but must be destructed in reverse construction order.
+ **/
+class CpuUsage
+{
+public:
+ // The kind of work performed by a thread. Used to separate
+ // different kinds of CPU usage. Note that categories are
+ // exclusive/non-overlapping; a thread can only perform one kind
+ // of CPU work at any specific time.
+ enum class Category {
+ SETUP = 0, // usage related to system setup (init/(re-)config/etc.)
+ READ = 1, // usage related to reading data from the system
+ WRITE = 2, // usage related to writing data to the system
+ COMPACT = 3, // usage related to internal data re-structuring
+ OTHER = 4 // all other cpu usage not in the categories above
+ };
+ static vespalib::string &name_of(Category cat);
+ static constexpr size_t index_of(Category cat) { return static_cast<size_t>(cat); }
+ static constexpr size_t num_categories = 5;
+
+ template <typename T>
+ class PerCategory {
+ private:
+ std::array<T,num_categories> _array;
+ public:
+ PerCategory() : _array() {}
+ size_t size() const { return _array.size(); }
+ T &operator[](size_t idx) { return _array[idx]; }
+ T &operator[](Category cat) { return _array[index_of(cat)]; }
+ const T &operator[](size_t idx) const { return _array[idx]; }
+ const T &operator[](Category cat) const { return _array[index_of(cat)]; }
+ };
+
+ // A sample contains how much CPU has been spent in each category.
+ class Sample : public PerCategory<duration> {
+ public:
+ void merge(const Sample &rhs) {
+ for (size_t i = 0; i < size(); ++i) {
+ (*this)[i] += rhs[i];
+ }
+ }
+ };
+
+ // a sample tagged with the time it was taken
+ using TimedSample = std::pair<steady_time, Sample>;
+
+ // Used by threads to signal what kind of CPU they are currently
+ // using. The thread will contribute to the declared CPU usage
+ // category while this object lives. MyUsage instances may shadow
+ // each other, but must be destructed in reverse construction
+ // order. The preferred way to use this class is by doing:
+ //
+ // auto my_usage = CpuUsage::use(my_cat);
+ class MyUsage {
+ private:
+ Category _old_cat;
+ static Category set_cpu_category_for_this_thread(Category cat) noexcept;
+ public:
+ MyUsage(Category cat)
+ : _old_cat(set_cpu_category_for_this_thread(cat)) {}
+ MyUsage(MyUsage &&) = delete;
+ MyUsage(const MyUsage &) = delete;
+ MyUsage &operator=(MyUsage &&) = delete;
+ MyUsage &operator=(const MyUsage &) = delete;
+ ~MyUsage() { set_cpu_category_for_this_thread(_old_cat); }
+ };
+
+ // grant extra access for testing
+ struct Test;
+
+private:
+ using Guard = std::lock_guard<SpinLock>;
+
+ // Used when multiple threads call the 'sample' function at the
+ // same time. One thread will sample while the others will wait
+ // for the result.
+ struct SampleConflict {
+ std::promise<TimedSample> sample_promise;
+ std::shared_future<TimedSample> future_sample;
+ size_t waiters;
+ SampleConflict() : sample_promise(),
+ future_sample(sample_promise.get_future()),
+ waiters(0) {}
+ };
+
+ // Interface used to perform destructive sampling of the CPU spent
+ // in various categories since the last time it was sampled.
+ struct ThreadTracker {
+ using SP = std::shared_ptr<ThreadTracker>;
+ virtual Sample sample() noexcept = 0;
+ virtual ~ThreadTracker() {}
+ };
+
+ class ThreadTrackerImpl : public ThreadTracker {
+ private:
+ SpinLock _lock;
+ Category _cat;
+ duration _old_usage;
+ cpu_usage::ThreadSampler::UP _sampler;
+ Sample _pending;
+
+ public:
+ ThreadTrackerImpl(cpu_usage::ThreadSampler::UP sampler);
+ // only called by owning thread
+ Category set_category(Category new_cat) noexcept;
+ Sample sample() noexcept override;
+ };
+
+ SpinLock _lock;
+ Sample _usage;
+ std::map<ThreadTracker*,ThreadTracker::SP> _threads;
+ bool _sampling;
+ std::unique_ptr<SampleConflict> _conflict;
+ std::vector<ThreadTracker::SP> _pending_add;
+ std::vector<ThreadTracker::SP> _pending_remove;
+
+ CpuUsage();
+ CpuUsage(CpuUsage &&) = delete;
+ CpuUsage(const CpuUsage &) = delete;
+ CpuUsage &operator=(CpuUsage &&) = delete;
+ CpuUsage &operator=(const CpuUsage &) = delete;
+
+ static CpuUsage &self();
+
+ void do_add_thread(const Guard &guard, ThreadTracker::SP tracker);
+ void do_remove_thread(const Guard &guard, ThreadTracker::SP tracker);
+
+ void add_thread(ThreadTracker::SP tracker);
+ void remove_thread(ThreadTracker::SP tracker);
+
+ void handle_pending(const Guard &guard);
+ TimedSample do_sample();
+ TimedSample sample_or_wait();
+
+public:
+ static MyUsage use(Category cat) { return MyUsage(cat); }
+ static TimedSample sample();
+ static Runnable::init_fun_t wrap(Runnable::init_fun_t init, Category cat);
+ static Executor::Task::UP wrap(Executor::Task::UP task, Category cat);
+};
+
+/**
+ * Simple class used to track cpu utilization over time.
+ **/
+class CpuUtil
+{
+private:
+ duration _min_delay;
+ CpuUsage::TimedSample _old_sample;
+ CpuUsage::PerCategory<double> _util;
+
+public:
+ CpuUtil(duration min_delay = 850ms)
+ : _min_delay(min_delay),
+ _old_sample(CpuUsage::sample()),
+ _util() {}
+
+ CpuUsage::PerCategory<double> get_util() {
+ if (steady_clock::now() >= (_old_sample.first + _min_delay)) {
+ auto new_sample = CpuUsage::sample();
+ auto dt = to_s(new_sample.first - _old_sample.first);
+ for (size_t i = 0; i < _util.size(); ++i) {
+ _util[i] = to_s(new_sample.second[i] - _old_sample.second[i]) / dt;
+ }
+ _old_sample = new_sample;
+ }
+ return _util;
+ }
+};
+
} // namespace
diff --git a/vespalib/src/vespa/vespalib/util/nice.cpp b/vespalib/src/vespa/vespalib/util/nice.cpp
new file mode 100644
index 00000000000..cefaaa0347b
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/nice.cpp
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "nice.h"
+
+#include <unistd.h>
+#include <algorithm>
+
+namespace vespalib {
+
+namespace {
+
+void set_nice_value(double how_nice) {
+ if (how_nice > 0.0) {
+#ifndef __APPLE__
+ int now = nice(0);
+ int max = 19;
+ int max_inc = (max - now);
+ nice(std::min(max_inc, int(how_nice * (max_inc + 1))));
+#endif
+ }
+}
+
+}
+
+Runnable::init_fun_t be_nice(Runnable::init_fun_t init, double how_nice) {
+ return [init,how_nice](Runnable &target) {
+ set_nice_value(how_nice);
+ return init(target);
+ };
+}
+
+} // namespace
diff --git a/vespalib/src/vespa/vespalib/util/nice.h b/vespalib/src/vespa/vespalib/util/nice.h
new file mode 100644
index 00000000000..7c3873337eb
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/nice.h
@@ -0,0 +1,16 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "runnable.h"
+
+namespace vespalib {
+
+// Wraps an init function inside another init function that adjusts
+// the niceness of the thread being started. The 'how_nice' parameter
+// is a value from 0.0 (not nice at all) to 1.0 (super nice). It will
+// be mapped into an actual nice value in a linear fashion based on
+// the nice value space that is still available.
+
+Runnable::init_fun_t be_nice(Runnable::init_fun_t init, double how_nice);
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.h b/vespalib/src/vespa/vespalib/util/rcuvector.h
index dd4fa660279..00d050fa8d1 100644
--- a/vespalib/src/vespa/vespalib/util/rcuvector.h
+++ b/vespalib/src/vespa/vespalib/util/rcuvector.h
@@ -122,6 +122,7 @@ public:
void reset();
void shrink(size_t newSize) __attribute__((noinline));
void replaceVector(ArrayType replacement);
+ ArrayType create_replacement_vector() const { return _data.create(); }
};
template <typename T>
diff --git a/vespalib/src/vespa/vespalib/util/rcuvector.hpp b/vespalib/src/vespa/vespalib/util/rcuvector.hpp
index 3c455149dfd..7c70539da0e 100644
--- a/vespalib/src/vespa/vespalib/util/rcuvector.hpp
+++ b/vespalib/src/vespa/vespalib/util/rcuvector.hpp
@@ -42,7 +42,7 @@ template <typename T>
void
RcuVectorBase<T>::reset() {
// Assumes no readers at this moment
- ArrayType().swap(_data);
+ _data.reset();
_data.reserve(16);
}
@@ -52,7 +52,7 @@ RcuVectorBase<T>::~RcuVectorBase() = default;
template <typename T>
void
RcuVectorBase<T>::expand(size_t newCapacity) {
- ArrayType tmpData;
+ auto tmpData = create_replacement_vector();
tmpData.reserve(newCapacity);
for (const T & v : _data) {
tmpData.push_back_fast(v);
@@ -91,7 +91,7 @@ RcuVectorBase<T>::shrink(size_t newSize)
return;
}
if (!_data.try_unreserve(wantedCapacity)) {
- ArrayType tmpData;
+ auto tmpData = create_replacement_vector();
tmpData.reserve(wantedCapacity);
tmpData.resize(newSize);
for (uint32_t i = 0; i < newSize; ++i) {
diff --git a/vespalib/src/vespa/vespalib/util/runnable.cpp b/vespalib/src/vespa/vespalib/util/runnable.cpp
index c67c4696d88..3b75efe93f6 100644
--- a/vespalib/src/vespa/vespalib/util/runnable.cpp
+++ b/vespalib/src/vespa/vespalib/util/runnable.cpp
@@ -4,4 +4,11 @@
namespace vespalib {
+int
+Runnable::default_init_function(Runnable &target)
+{
+ target.run();
+ return 1;
+}
+
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/runnable.h b/vespalib/src/vespa/vespalib/util/runnable.h
index 43144ebc2cd..bc89a9decac 100644
--- a/vespalib/src/vespa/vespalib/util/runnable.h
+++ b/vespalib/src/vespa/vespalib/util/runnable.h
@@ -23,6 +23,7 @@ namespace vespalib {
struct Runnable {
using UP = std::unique_ptr<Runnable>;
using init_fun_t = std::function<int(Runnable&)>;
+ static int default_init_function(Runnable &target);
/**
* Entry point called by the running thread
@@ -36,4 +37,3 @@ struct Runnable {
};
} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
new file mode 100644
index 00000000000..f5a1c117cc8
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
@@ -0,0 +1,434 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "shared_operation_throttler.h"
+#include <atomic>
+#include <condition_variable>
+#include <cassert>
+#include <functional>
+#include <mutex>
+
+namespace vespalib {
+
+namespace {
+
+class NoLimitsOperationThrottler final : public SharedOperationThrottler {
+public:
+ NoLimitsOperationThrottler()
+ : SharedOperationThrottler(),
+ _refs(0u)
+ {
+ }
+ ~NoLimitsOperationThrottler() override {
+ assert(_refs.load(std::memory_order_acquire) == 0u);
+ }
+ Token blocking_acquire_one() noexcept override {
+ ++_refs;
+ return Token(this, TokenCtorTag{});
+ }
+ Token blocking_acquire_one(vespalib::duration) noexcept override {
+ ++_refs;
+ return Token(this, TokenCtorTag{});
+ }
+ Token try_acquire_one() noexcept override {
+ ++_refs;
+ return Token(this, TokenCtorTag{});
+ }
+ uint32_t current_window_size() const noexcept override { return 0; }
+ uint32_t waiting_threads() const noexcept override { return 0; }
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ }
+private:
+ void release_one() noexcept override { --_refs; }
+ std::atomic<uint32_t> _refs;
+};
+
+/**
+ * Effectively a 1-1 transplant of the MessageBus DynamicThrottlePolicy, but
+ * without an underlying StaticThrottlePolicy and with no need for individual
+ * MessageBus Message/Reply objects.
+ *
+ * Please keep the underlying algorithm in sync with the Java implementation,
+ * as that is considered the source of truth. For descriptions of the various
+ * parameters, also see the Java code:
+ * messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+ */
+class DynamicThrottlePolicy {
+ SharedOperationThrottler::DynamicThrottleParams _active_params;
+ std::function<steady_time()> _time_provider;
+ uint32_t _num_sent;
+ uint32_t _num_ok;
+ double _resize_rate;
+ uint64_t _resize_time;
+ uint64_t _time_of_last_message;
+ uint64_t _idle_time_period;
+ double _efficiency_threshold;
+ double _window_size_increment;
+ double _window_size;
+ double _max_window_size;
+ double _min_window_size;
+ double _decrement_factor;
+ double _window_size_backoff;
+ double _weight;
+ double _local_max_throughput;
+public:
+ DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider);
+
+ void set_window_size_increment(double window_size_increment) noexcept;
+ void set_window_size_backoff(double window_size_backoff) noexcept;
+ void set_resize_rate(double resize_rate) noexcept;
+ void set_max_window_size(double max_size) noexcept;
+
+ void set_min_window_size(double min_size) noexcept;
+ void set_window_size_decrement_factor(double decrement_factor) noexcept;
+
+ void configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
+ [[nodiscard]] uint32_t current_window_size() const noexcept {
+ return static_cast<uint32_t>(_window_size);
+ }
+ [[nodiscard]] bool has_spare_capacity(uint32_t pending_count) noexcept;
+ void process_request() noexcept;
+ void process_response(bool success) noexcept;
+
+private:
+ void internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
+ [[nodiscard]] uint64_t current_time_as_millis() noexcept {
+ return count_ms(_time_provider().time_since_epoch());
+ }
+};
+
+DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider)
+ : _active_params(params),
+ _time_provider(std::move(time_provider)),
+ _num_sent(0),
+ _num_ok(0),
+ _resize_rate(3.0),
+ _resize_time(0),
+ _time_of_last_message(current_time_as_millis()),
+ _idle_time_period(60000),
+ _efficiency_threshold(1),
+ _window_size_increment(20),
+ _window_size(_window_size_increment),
+ _max_window_size(INT_MAX),
+ _min_window_size(_window_size_increment),
+ _decrement_factor(2.0),
+ _window_size_backoff(0.9),
+ _weight(1),
+ _local_max_throughput(0)
+{
+ internal_unconditional_configure(_active_params);
+}
+
+void
+DynamicThrottlePolicy::set_window_size_increment(double window_size_increment) noexcept
+{
+ _window_size_increment = window_size_increment;
+ _window_size = std::max(_window_size, _window_size_increment);
+}
+
+void
+DynamicThrottlePolicy::set_window_size_backoff(double window_size_backoff) noexcept
+{
+ _window_size_backoff = std::max(0.0, std::min(1.0, window_size_backoff));
+}
+
+void
+DynamicThrottlePolicy::set_resize_rate(double resize_rate) noexcept
+{
+ _resize_rate = std::max(2.0, resize_rate);
+}
+
+void
+DynamicThrottlePolicy::set_max_window_size(double max_size) noexcept
+{
+ _max_window_size = max_size;
+}
+
+void
+DynamicThrottlePolicy::set_min_window_size(double min_size) noexcept
+{
+ _min_window_size = min_size;
+ _window_size = std::max(_min_window_size, _window_size_increment);
+}
+
+void
+DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor) noexcept
+{
+ _decrement_factor = decrement_factor;
+}
+
+void
+DynamicThrottlePolicy::internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // We use setters for convenience, since setting one parameter may imply setting others,
+ // based on it, and there's frequently min/max capping of values.
+ set_window_size_increment(params.window_size_increment);
+ set_min_window_size(params.min_window_size);
+ set_max_window_size(params.max_window_size);
+ set_resize_rate(params.resize_rate);
+ set_window_size_decrement_factor(params.window_size_decrement_factor);
+ set_window_size_backoff(params.window_size_backoff);
+}
+
+void
+DynamicThrottlePolicy::configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // To avoid any noise where setting parameters on the throttler may implicitly reduce the
+ // current window size (even though this isn't _currently_ the case), don't invoke any internal
+ // reconfiguration code unless the parameters have actually changed.
+ if (params != _active_params) {
+ internal_unconditional_configure(params);
+ _active_params = params;
+ }
+}
+
+bool
+DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept
+{
+ const uint64_t time = current_time_as_millis();
+ if ((time - _time_of_last_message) > _idle_time_period) {
+ _window_size = std::max(_min_window_size, std::min(_window_size, pending_count + _window_size_increment));
+ }
+ _time_of_last_message = time;
+ const auto window_size_floored = static_cast<uint32_t>(_window_size);
+ // Use floating point window sizes, so the algorithm sees the difference between 1.1 and 1.9 window size.
+ const bool carry = _num_sent < ((_window_size * _resize_rate) * (_window_size - window_size_floored));
+ return pending_count < (window_size_floored + (carry ? 1 : 0));
+}
+
+void
+DynamicThrottlePolicy::process_request() noexcept
+{
+ if (++_num_sent < (_window_size * _resize_rate)) {
+ return;
+ }
+
+ const uint64_t time = current_time_as_millis();
+ const double elapsed = time - _resize_time;
+ _resize_time = time;
+
+ const double throughput = _num_ok / elapsed;
+ _num_sent = 0;
+ _num_ok = 0;
+
+ if (throughput > _local_max_throughput) {
+ _local_max_throughput = throughput;
+ _window_size += _weight * _window_size_increment;
+ } else {
+ // scale up/down throughput for comparing to window size
+ double period = 1;
+ while ((throughput * (period / _window_size)) < 2) {
+ period *= 10;
+ }
+ while ((throughput * (period / _window_size)) > 2) {
+ period *= 0.1;
+ }
+ const double efficiency = throughput * (period / _window_size);
+
+ if (efficiency < _efficiency_threshold) {
+ _window_size = std::min(_window_size * _window_size_backoff,
+ _window_size - _decrement_factor * _window_size_increment);
+ _local_max_throughput = 0;
+ } else {
+ _window_size += _weight * _window_size_increment;
+ }
+ }
+ _window_size = std::max(_min_window_size, _window_size);
+ _window_size = std::min(_max_window_size, _window_size);
+}
+
+void
+DynamicThrottlePolicy::process_response(bool success) noexcept
+{
+ if (success) {
+ ++_num_ok;
+ }
+}
+
+class DynamicOperationThrottler final : public SharedOperationThrottler {
+ mutable std::mutex _mutex;
+ std::condition_variable _cond;
+ DynamicThrottlePolicy _throttle_policy;
+ uint32_t _pending_ops;
+ uint32_t _waiting_threads;
+public:
+ DynamicOperationThrottler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider);
+ ~DynamicOperationThrottler() override;
+
+ Token blocking_acquire_one() noexcept override;
+ Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
+ Token try_acquire_one() noexcept override;
+ uint32_t current_window_size() const noexcept override;
+ uint32_t waiting_threads() const noexcept override;
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override;
+private:
+ void release_one() noexcept override;
+ // Non-const since actually checking the send window of a dynamic throttler might change
+ // it if enough time has passed.
+ [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept;
+ void add_one_to_active_window_size() noexcept;
+ void subtract_one_from_active_window_size() noexcept;
+};
+
+DynamicOperationThrottler::DynamicOperationThrottler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider)
+ : _mutex(),
+ _cond(),
+ _throttle_policy(params, std::move(time_provider)),
+ _pending_ops(0),
+ _waiting_threads(0)
+{
+}
+
+DynamicOperationThrottler::~DynamicOperationThrottler()
+{
+ assert(_pending_ops == 0u);
+}
+
+bool
+DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept
+{
+ return _throttle_policy.has_spare_capacity(_pending_ops);
+}
+
+void
+DynamicOperationThrottler::add_one_to_active_window_size() noexcept
+{
+ _throttle_policy.process_request();
+ ++_pending_ops;
+}
+
+void
+DynamicOperationThrottler::subtract_one_from_active_window_size() noexcept
+{
+ _throttle_policy.process_response(true); // TODO support failure push-back
+ assert(_pending_ops > 0);
+ --_pending_ops;
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ if (!has_spare_capacity_in_active_window()) {
+ ++_waiting_threads;
+ _cond.wait(lock, [&] {
+ return has_spare_capacity_in_active_window();
+ });
+ --_waiting_threads;
+ }
+ add_one_to_active_window_size();
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
+{
+ std::unique_lock lock(_mutex);
+ if (!has_spare_capacity_in_active_window()) {
+ ++_waiting_threads;
+ const bool accepted = _cond.wait_for(lock, timeout, [&] {
+ return has_spare_capacity_in_active_window();
+ });
+ --_waiting_threads;
+ if (!accepted) {
+ return Token();
+ }
+ }
+ add_one_to_active_window_size();
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::try_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ if (!has_spare_capacity_in_active_window()) {
+ return Token();
+ }
+ add_one_to_active_window_size();
+ return Token(this, TokenCtorTag{});
+}
+
+void
+DynamicOperationThrottler::release_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ subtract_one_from_active_window_size();
+ // Only wake up a waiting thread if doing so would possibly result in success.
+ if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) {
+ lock.unlock();
+ _cond.notify_one();
+ }
+}
+
+uint32_t
+DynamicOperationThrottler::current_window_size() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _throttle_policy.current_window_size();
+}
+
+uint32_t
+DynamicOperationThrottler::waiting_threads() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _waiting_threads;
+}
+
+void
+DynamicOperationThrottler::reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept
+{
+ std::unique_lock lock(_mutex);
+ _throttle_policy.configure(params);
+}
+
+} // anonymous namespace
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_unlimited_throttler()
+{
+ return std::make_unique<NoLimitsOperationThrottler>();
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params)
+{
+ return std::make_unique<DynamicOperationThrottler>(params, []() noexcept { return steady_clock::now(); });
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider)
+{
+ return std::make_unique<DynamicOperationThrottler>(params, std::move(time_provider));
+}
+
+DynamicOperationThrottler::Token::~Token()
+{
+ if (_throttler) {
+ _throttler->release_one();
+ }
+}
+
+void
+DynamicOperationThrottler::Token::reset() noexcept
+{
+ if (_throttler) {
+ _throttler->release_one();
+ _throttler = nullptr;
+ }
+}
+
+DynamicOperationThrottler::Token&
+DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept
+{
+ reset();
+ _throttler = rhs._throttler;
+ rhs._throttler = nullptr;
+ return *this;
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
new file mode 100644
index 00000000000..030935339ed
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
@@ -0,0 +1,100 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "time.h"
+#include <functional>
+#include <memory>
+#include <optional>
+#include <limits.h>
+
+namespace vespalib {
+
+/**
+ * Operation throttler that is intended to provide global throttling of
+ * async operations across multiple threads. A throttler wraps a logical
+ * max pending window size of in-flight operations. Depending on the
+ * throttler implementation, the window size may expand and shrink dynamically.
+ * Exactly how and when this happens is unspecified.
+ *
+ * Offers both polling and (timed, non-timed) blocking calls for acquiring
+ * a throttle token. If the returned token is valid, the caller may proceed
+ * to invoke the asynchronous operation.
+ *
+ * The window slot taken up by a valid throttle token is implicitly freed up
+ * when the token is destroyed.
+ *
+ * All operations on the throttler are thread safe.
+ */
+class SharedOperationThrottler {
+protected:
+ struct TokenCtorTag {}; // Make available to subclasses for token construction.
+public:
+ class Token {
+ SharedOperationThrottler* _throttler;
+ public:
+ constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {}
+ constexpr Token() noexcept : _throttler(nullptr) {}
+ constexpr Token(Token&& rhs) noexcept
+ : _throttler(rhs._throttler)
+ {
+ rhs._throttler = nullptr;
+ }
+ Token& operator=(Token&& rhs) noexcept;
+ ~Token();
+
+ Token(const Token&) = delete;
+ Token& operator=(const Token&) = delete;
+
+ [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); }
+ void reset() noexcept;
+ };
+
+ virtual ~SharedOperationThrottler() = default;
+
+ // Acquire a valid throttling token, uninterruptedly blocking until one can be obtained.
+ [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be
+ // available. If the timeout is exceeded without any tokens becoming available, an
+ // invalid token will be returned.
+ [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ // Attempt to acquire a valid throttling token if one is immediately available.
+ // An invalid token will be returned if none is available. Never blocks (other than
+ // when contending for the internal throttler mutex).
+ [[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
+
+ // May return 0, in which case the window size is unlimited.
+ [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
+
+ // Exposed for unit testing only.
+ [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
+
+ struct DynamicThrottleParams {
+ uint32_t window_size_increment = 20;
+ uint32_t min_window_size = 20;
+ uint32_t max_window_size = INT_MAX; // signed max to be 1-1 compatible with Java defaults
+ double resize_rate = 3.0;
+ double window_size_decrement_factor = 1.2;
+ double window_size_backoff = 0.95;
+
+ bool operator==(const DynamicThrottleParams&) const noexcept = default;
+ bool operator!=(const DynamicThrottleParams&) const noexcept = default;
+ };
+
+ // No-op if underlying throttler does not use a dynamic policy, or if the supplied
+ // parameters are equal to the current configuration.
+ // FIXME leaky abstraction alert!
+ virtual void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept = 0;
+
+ // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
+ // Creates a throttler that uses a DynamicThrottlePolicy under the hood
+ static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params);
+ static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider);
+private:
+ // Exclusively called from a valid Token. Thread safe.
+ virtual void release_one() noexcept = 0;
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/shared_string_repo.h b/vespalib/src/vespa/vespalib/util/shared_string_repo.h
index ec65b942d88..7ba59937c7c 100644
--- a/vespalib/src/vespa/vespalib/util/shared_string_repo.h
+++ b/vespalib/src/vespa/vespalib/util/shared_string_repo.h
@@ -8,6 +8,7 @@
#include <vespa/vespalib/stllike/string.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/stllike/identity.h>
+#include <vespa/vespalib/stllike/allocator.h>
#include <vespa/vespalib/stllike/hashtable.hpp>
#include <xxhash.h>
#include <mutex>
@@ -95,6 +96,7 @@ private:
return (--_ref_cnt == 0);
}
};
+ using EntryVector = std::vector<Entry, allocator_large<Entry>>;
struct Key {
uint32_t idx;
uint32_t hash;
@@ -104,8 +106,8 @@ private:
uint32_t operator()(const AltKey &key) const { return key.hash; }
};
struct Equal {
- const std::vector<Entry> &entries;
- Equal(const std::vector<Entry> &entries_in) : entries(entries_in) {}
+ const EntryVector &entries;
+ Equal(const EntryVector &entries_in) : entries(entries_in) {}
Equal(const Equal &rhs) = default;
bool operator()(const Key &a, const Key &b) const { return (a.idx == b.idx); }
bool operator()(const Key &a, const AltKey &b) const { return ((a.hash == b.hash) && (entries[a.idx].str() == b.str)); }
@@ -113,10 +115,10 @@ private:
using HashType = hashtable<Key,Key,Hash,Equal,Identity,hashtable_base::and_modulator>;
private:
- mutable SpinLock _lock;
- std::vector<Entry> _entries;
- uint32_t _free;
- HashType _hash;
+ mutable SpinLock _lock;
+ EntryVector _entries;
+ uint32_t _free;
+ HashType _hash;
void make_entries(size_t hint);
@@ -291,7 +293,7 @@ public:
// A collection of string handles with ownership
class Handles {
private:
- std::vector<string_id> _handles;
+ StringIdVector _handles;
public:
Handles();
Handles(Handles &&rhs);
@@ -309,7 +311,7 @@ public:
string_id id = _repo.copy(handle);
_handles.push_back(id);
}
- const std::vector<string_id> &view() const { return _handles; }
+ const StringIdVector &view() const { return _handles; }
};
};
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index ab83d4e05fd..99ab298864f 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -60,9 +60,10 @@ Signal::Signal() noexcept
{}
Signal::~Signal() = default;
-SimpleThreadBundle::Pool::Pool(size_t bundleSize)
+SimpleThreadBundle::Pool::Pool(size_t bundleSize, init_fun_t init_fun)
: _lock(),
_bundleSize(bundleSize),
+ _init_fun(init_fun),
_bundles()
{
}
@@ -86,7 +87,7 @@ SimpleThreadBundle::Pool::obtain()
return ret;
}
}
- return std::make_unique<SimpleThreadBundle>(_bundleSize);
+ return std::make_unique<SimpleThreadBundle>(_bundleSize, _init_fun);
}
void
@@ -99,7 +100,7 @@ SimpleThreadBundle::Pool::release(SimpleThreadBundle::UP bundle)
//-----------------------------------------------------------------------------
-SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy)
+SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Runnable::init_fun_t init_fun, Strategy strategy)
: _work(),
_signals(),
_workers(),
@@ -134,7 +135,7 @@ SimpleThreadBundle::SimpleThreadBundle(size_t size_in, Strategy strategy)
_hook = std::move(hook);
} else {
size_t signal_idx = (strategy == USE_BROADCAST) ? 0 : (i - 1);
- _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], std::move(hook)));
+ _workers.push_back(std::make_unique<Worker>(_signals[signal_idx], init_fun, std::move(hook)));
}
}
}
@@ -175,19 +176,19 @@ SimpleThreadBundle::run(const std::vector<Runnable*> &targets)
latch.await();
}
-SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::UP h)
- : thread(*this, simple_thread_bundle_executor),
- signal(s),
- hook(std::move(h))
+SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h)
+ : thread(*this, std::move(init_fun)),
+ signal(s),
+ hook(std::move(h))
{
thread.start();
}
+
void
SimpleThreadBundle::Worker::run() {
for (size_t gen = 0; signal.wait(gen) > 0; ) {
- hook->run();
-}
-
+ hook->run();
+ }
}
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
index d9a29ee7bef..b7434d09ac3 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
@@ -86,8 +86,9 @@ struct Signal {
class SimpleThreadBundle : public ThreadBundle
{
public:
- typedef fixed_thread_bundle::Work Work;
- typedef fixed_thread_bundle::Signal Signal;
+ using Work = fixed_thread_bundle::Work;
+ using Signal = fixed_thread_bundle::Signal;
+ using init_fun_t = Runnable::init_fun_t;
typedef std::unique_ptr<SimpleThreadBundle> UP;
enum Strategy { USE_SIGNAL_LIST, USE_SIGNAL_TREE, USE_BROADCAST };
@@ -97,10 +98,12 @@ public:
private:
std::mutex _lock;
size_t _bundleSize;
+ init_fun_t _init_fun;
std::vector<SimpleThreadBundle*> _bundles;
public:
- Pool(size_t bundleSize);
+ Pool(size_t bundleSize, init_fun_t init_fun);
+ Pool(size_t bundleSize) : Pool(bundleSize, Runnable::default_init_function) {}
~Pool();
SimpleThreadBundle::UP obtain();
void release(SimpleThreadBundle::UP bundle);
@@ -112,7 +115,7 @@ private:
Thread thread;
Signal &signal;
Runnable::UP hook;
- Worker(Signal &s, Runnable::UP h);
+ Worker(Signal &s, init_fun_t init_fun, Runnable::UP h);
void run() override;
};
@@ -122,7 +125,9 @@ private:
Runnable::UP _hook;
public:
- SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST);
+ SimpleThreadBundle(size_t size, init_fun_t init_fun, Strategy strategy = USE_SIGNAL_LIST);
+ SimpleThreadBundle(size_t size, Strategy strategy = USE_SIGNAL_LIST)
+ : SimpleThreadBundle(size, Runnable::default_init_function, strategy) {}
~SimpleThreadBundle();
size_t size() const override;
void run(const std::vector<Runnable*> &targets) override;
diff --git a/vespalib/src/vespa/vespalib/util/spin_lock.h b/vespalib/src/vespa/vespalib/util/spin_lock.h
index abc2b89106f..3af7bc0fd55 100644
--- a/vespalib/src/vespa/vespalib/util/spin_lock.h
+++ b/vespalib/src/vespa/vespalib/util/spin_lock.h
@@ -1,5 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
#include <atomic>
#include <thread>
diff --git a/vespalib/src/vespa/vespalib/util/string_id.h b/vespalib/src/vespa/vespalib/util/string_id.h
index 7a72feee64a..7fec1da0bb8 100644
--- a/vespalib/src/vespa/vespalib/util/string_id.h
+++ b/vespalib/src/vespa/vespalib/util/string_id.h
@@ -2,7 +2,8 @@
#pragma once
-#include <cstdint>
+#include <vespa/vespalib/stllike/allocator.h>
+#include <vector>
namespace vespalib {
@@ -38,4 +39,6 @@ public:
constexpr bool operator!=(const string_id &rhs) const noexcept { return (_id != rhs._id); }
};
+using StringIdVector = std::vector<string_id, vespalib::allocator_large<string_id>>;
+
}