diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-12-03 15:01:51 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-12-03 15:05:23 +0100 |
commit | 42e3653b19f95847d14b9b90478a967907611d98 (patch) | |
tree | 9daff8d4aab4b34d7517c316a9f3f9f555fdd826 /vespalib | |
parent | 35311886fd9576abf77955bb4041c4500bb7beef (diff) |
Filter early on buffer id and pass vector of entries in normalize_values
and foreach_value ShardedHashMap member functions to limit number of callbacks.
Diffstat (limited to 'vespalib')
5 files changed, 279 insertions, 29 deletions
diff --git a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp index 6e984f286c1..138dedbdd38 100644 --- a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp +++ b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp @@ -12,6 +12,7 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/datastore/unique_store_allocator.hpp> +#include <iostream> #include <thread> #include <vespa/log/log.h> @@ -27,6 +28,26 @@ using MyHashMap = vespalib::datastore::ShardedHashMap; using GenerationHandler = vespalib::GenerationHandler; using vespalib::makeLambdaTask; +constexpr uint32_t small_population = 50; +/* + * large_population should trigger multiple callbacks from normalize_values + * and foreach_value + */ +constexpr uint32_t large_population = 1200; + +namespace vespalib::datastore { + +/* + * Print EntryRef as RefT which is used by test_normalize_values and + * test_foreach_value to differentiate between buffers + */ +void PrintTo(const EntryRef &ref, std::ostream* os) { + RefT iref(ref); + *os << "RefT(" << iref.offset() << "," << iref.bufferId() << ")"; +} + +} + namespace { void consider_yield(uint32_t i) @@ -58,6 +79,19 @@ public: } }; +uint32_t select_buffer(uint32_t i) { + if ((i % 2) == 0) { + return 0; + } + if ((i % 3) == 0) { + return 1; + } + if ((i % 5) == 0) { + return 2; + } + return 3; +} + } struct DataStoreShardedHashTest : public ::testing::Test @@ -86,7 +120,11 @@ struct DataStoreShardedHashTest : public ::testing::Test void read_work(uint32_t cnt); void read_work(); void write_work(uint32_t cnt); - void populate_sample_data(); + void populate_sample_data(uint32_t cnt); + void populate_sample_values(uint32_t cnt); + void clear_sample_values(uint32_t cnt); + void test_normalize_values(bool filter, bool one_filter); + void test_foreach_value(bool one_filter); }; @@ -213,13 +251,90 @@ DataStoreShardedHashTest::write_work(uint32_t cnt) } void -DataStoreShardedHashTest::populate_sample_data() +DataStoreShardedHashTest::populate_sample_data(uint32_t cnt) { - for (uint32_t i = 0; i < 50; ++i) { + for (uint32_t i = 0; i < cnt; ++i) { insert(i); } } +void +DataStoreShardedHashTest::populate_sample_values(uint32_t cnt) +{ + for (uint32_t i = 0; i < cnt; ++i) { + MyCompare comp(_store, i); + auto result = _hash_map.find(comp, EntryRef()); + ASSERT_NE(result, nullptr); + EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); + result->second.store_relaxed(RefT(i + 200, select_buffer(i))); + } +} + +void +DataStoreShardedHashTest::clear_sample_values(uint32_t cnt) +{ + for (uint32_t i = 0; i < cnt; ++i) { + MyCompare comp(_store, i); + auto result = _hash_map.find(comp, EntryRef()); + ASSERT_NE(result, nullptr); + EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); + result->second.store_relaxed(EntryRef()); + } +} + +void +DataStoreShardedHashTest::test_normalize_values(bool filter, bool one_filter) +{ + populate_sample_data(large_population); + populate_sample_values(large_population); + if (filter) { + std::vector<bool> bfilter; + if (one_filter) { + bfilter = std::vector<bool>(RefT::numBuffers()); + bfilter[3] = true; + } else { + bfilter = std::vector<bool>(RefT::numBuffers(), true); + } + EXPECT_TRUE(_hash_map.normalize_values([](std::vector<EntryRef> &refs) noexcept { for (auto &ref : refs) { RefT iref(ref); ref = RefT(iref.offset() + 300, iref.bufferId()); } }, bfilter, RefT::offset_bits)); + } else { + EXPECT_TRUE(_hash_map.normalize_values([](EntryRef ref) noexcept { RefT iref(ref); return RefT(iref.offset() + 300, iref.bufferId()); })); + } + for (uint32_t i = 0; i < large_population; ++i) { + MyCompare comp(_store, i); + auto result = _hash_map.find(comp, EntryRef()); + ASSERT_NE(result, nullptr); + EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); + ASSERT_EQ(select_buffer(i), RefT(result->second.load_relaxed()).bufferId()); + if (filter && one_filter && select_buffer(i) != 3) { + ASSERT_EQ(i + 200, RefT(result->second.load_relaxed()).offset()); + } else { + ASSERT_EQ(i + 500, RefT(result->second.load_relaxed()).offset()); + } + result->second.store_relaxed(EntryRef()); + } +} + +void +DataStoreShardedHashTest::test_foreach_value(bool one_filter) +{ + populate_sample_data(large_population); + populate_sample_values(large_population); + + std::vector<bool> bfilter; + if (one_filter) { + bfilter = std::vector<bool>(RefT::numBuffers()); + bfilter[3] = true; + } else { + bfilter = std::vector<bool>(RefT::numBuffers(), true); + } + std::vector<EntryRef> exp_refs; + EXPECT_FALSE(_hash_map.normalize_values([&exp_refs](std::vector<EntryRef>& refs) { exp_refs.insert(exp_refs.end(), refs.begin(), refs.end()); }, bfilter, RefT::offset_bits)); + std::vector<EntryRef> act_refs; + _hash_map.foreach_value([&act_refs](const std::vector<EntryRef> &refs) { act_refs.insert(act_refs.end(), refs.begin(), refs.end()); }, bfilter, RefT::offset_bits); + EXPECT_EQ(exp_refs, act_refs); + clear_sample_values(large_population); +} + TEST_F(DataStoreShardedHashTest, single_threaded_reader_without_updates) { _report_work = true; @@ -254,7 +369,7 @@ TEST_F(DataStoreShardedHashTest, memory_usage_is_reported) EXPECT_EQ(0, initial_usage.deadBytes()); EXPECT_EQ(0, initial_usage.allocatedBytesOnHold()); auto guard = _generationHandler.takeGuard(); - for (uint32_t i = 0; i < 50; ++i) { + for (uint32_t i = 0; i < small_population; ++i) { insert(i); } auto usage = _hash_map.get_memory_usage(); @@ -264,19 +379,19 @@ TEST_F(DataStoreShardedHashTest, memory_usage_is_reported) TEST_F(DataStoreShardedHashTest, foreach_key_works) { - populate_sample_data(); + populate_sample_data(small_population); std::vector<uint32_t> keys; _hash_map.foreach_key([this, &keys](EntryRef ref) { keys.emplace_back(_allocator.get_wrapped(ref).value()); }); std::sort(keys.begin(), keys.end()); - EXPECT_EQ(50, keys.size()); - for (uint32_t i = 0; i < 50; ++i) { + EXPECT_EQ(small_population, keys.size()); + for (uint32_t i = 0; i < small_population; ++i) { EXPECT_EQ(i, keys[i]); } } TEST_F(DataStoreShardedHashTest, move_keys_works) { - populate_sample_data(); + populate_sample_data(small_population); std::vector<EntryRef> refs; _hash_map.foreach_key([&refs](EntryRef ref) { refs.emplace_back(ref); }); std::vector<EntryRef> new_refs; @@ -284,10 +399,10 @@ TEST_F(DataStoreShardedHashTest, move_keys_works) _hash_map.move_keys(my_compactable, std::vector<bool>(RefT::numBuffers(), true), RefT::offset_bits); std::vector<EntryRef> verify_new_refs; _hash_map.foreach_key([&verify_new_refs](EntryRef ref) { verify_new_refs.emplace_back(ref); }); - EXPECT_EQ(50u, refs.size()); + EXPECT_EQ(small_population, refs.size()); EXPECT_NE(refs, new_refs); EXPECT_EQ(new_refs, verify_new_refs); - for (uint32_t i = 0; i < 50; ++i) { + for (uint32_t i = 0; i < small_population; ++i) { EXPECT_NE(refs[i], new_refs[i]); auto value = _allocator.get_wrapped(refs[i]).value(); auto new_value = _allocator.get_wrapped(refs[i]).value(); @@ -297,29 +412,33 @@ TEST_F(DataStoreShardedHashTest, move_keys_works) TEST_F(DataStoreShardedHashTest, normalize_values_works) { - populate_sample_data(); - for (uint32_t i = 0; i < 50; ++i) { - MyCompare comp(_store, i); - auto result = _hash_map.find(comp, EntryRef()); - ASSERT_NE(result, nullptr); - EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); - result->second.store_relaxed(EntryRef(i + 200)); - } - _hash_map.normalize_values([](EntryRef ref) noexcept { return EntryRef(ref.ref() + 300); }); - for (uint32_t i = 0; i < 50; ++i) { - MyCompare comp(_store, i); - auto result = _hash_map.find(comp, EntryRef()); - ASSERT_NE(result, nullptr); - EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); - ASSERT_EQ(i + 500, result->second.load_relaxed().ref()); - result->second.store_relaxed(EntryRef()); - } + test_normalize_values(false, false); +} + +TEST_F(DataStoreShardedHashTest, normalize_values_all_filter_works) +{ + test_normalize_values(true, false); +} + +TEST_F(DataStoreShardedHashTest, normalize_values_one_filter_works) +{ + test_normalize_values(true, true); +} + +TEST_F(DataStoreShardedHashTest, foreach_value_all_filter_works) +{ + test_foreach_value(false); +} + +TEST_F(DataStoreShardedHashTest, foreach_value_one_filter_works) +{ + test_foreach_value(true); } TEST_F(DataStoreShardedHashTest, compact_worst_shard_works) { - populate_sample_data(); - for (uint32_t i = 10; i < 50; ++i) { + populate_sample_data(small_population); + for (uint32_t i = 10; i < small_population; ++i) { remove(i); } commit(); diff --git a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp index db9fee8ea70..79e4cb2ff74 100644 --- a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp +++ b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp @@ -220,4 +220,106 @@ FixedSizeHashMap::normalize_values(const std::function<EntryRef(EntryRef)>& norm return changed; } +namespace { + +class ChangeWriter { + std::vector<AtomicEntryRef*> _atomic_refs; +public: + ChangeWriter(uint32_t capacity); + ~ChangeWriter(); + bool write(const std::vector<EntryRef> &refs); + void emplace_back(AtomicEntryRef &atomic_ref) { _atomic_refs.emplace_back(&atomic_ref); } +}; + +ChangeWriter::ChangeWriter(uint32_t capacity) + : _atomic_refs() +{ + _atomic_refs.reserve(capacity); +} + +ChangeWriter::~ChangeWriter() = default; + +bool +ChangeWriter::write(const std::vector<EntryRef> &refs) +{ + bool changed = false; + assert(refs.size() == _atomic_refs.size()); + auto atomic_ref = _atomic_refs.begin(); + for (auto ref : refs) { + EntryRef old_ref = (*atomic_ref)->load_relaxed(); + if (ref != old_ref) { + (*atomic_ref)->store_release(ref); + changed = true; + } + ++atomic_ref; + } + assert(atomic_ref == _atomic_refs.end()); + _atomic_refs.clear(); + return changed; +} + +} + +bool +FixedSizeHashMap::normalize_values(const std::function<void(std::vector<EntryRef>&)>& normalize, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits) +{ + std::vector<EntryRef> refs; + refs.reserve(1024); + bool changed = false; + ChangeWriter change_writer(refs.capacity()); + for (auto& chain_head : _chain_heads) { + uint32_t node_idx = chain_head.load_relaxed(); + while (node_idx != no_node_idx) { + auto& node = _nodes[node_idx]; + EntryRef ref = node.get_kv().second.load_relaxed(); + if (ref.valid()) { + uint32_t buffer_id = ref.buffer_id(entry_ref_offset_bits); + if (filter[buffer_id]) { + refs.emplace_back(ref); + change_writer.emplace_back(node.get_kv().second); + if (refs.size() >= refs.capacity()) { + normalize(refs); + changed |= change_writer.write(refs); + refs.clear(); + } + } + } + node_idx = node.get_next_node_idx().load(std::memory_order_relaxed); + } + } + if (!refs.empty()) { + normalize(refs); + changed |= change_writer.write(refs); + } + return changed; +} + +void +FixedSizeHashMap::foreach_value(const std::function<void(const std::vector<EntryRef>&)>& callback, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits) +{ + std::vector<EntryRef> refs; + refs.reserve(1024); + for (auto& chain_head : _chain_heads) { + uint32_t node_idx = chain_head.load_relaxed(); + while (node_idx != no_node_idx) { + auto& node = _nodes[node_idx]; + EntryRef ref = node.get_kv().second.load_relaxed(); + if (ref.valid()) { + uint32_t buffer_id = ref.buffer_id(entry_ref_offset_bits); + if (filter[buffer_id]) { + refs.emplace_back(ref); + if (refs.size() >= refs.capacity()) { + callback(refs); + refs.clear(); + } + } + } + node_idx = node.get_next_node_idx().load(std::memory_order_relaxed); + } + } + if (!refs.empty()) { + callback(refs); + } +} + } diff --git a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h index 035cd84dbee..0356725883d 100644 --- a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h +++ b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h @@ -160,6 +160,8 @@ public: void foreach_key(const std::function<void(EntryRef)>& callback) const; void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits); bool normalize_values(const std::function<EntryRef(EntryRef)>& normalize); + bool normalize_values(const std::function<void(std::vector<EntryRef>&)>& normalize, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits); + void foreach_value(const std::function<void(const std::vector<EntryRef>&)>& callback, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits); }; } diff --git a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp index da4db92a309..89f1b5f7b6f 100644 --- a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp +++ b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp @@ -195,6 +195,31 @@ ShardedHashMap::normalize_values(std::function<EntryRef(EntryRef)> normalize) } bool +ShardedHashMap::normalize_values(std::function<void(std::vector<EntryRef>&)> normalize, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits) +{ + bool changed = false; + for (size_t i = 0; i < num_shards; ++i) { + auto map = _maps[i].load(std::memory_order_relaxed); + if (map != nullptr) { + changed |= map->normalize_values(normalize, filter, entry_ref_offset_bits); + } + } + return changed; +} + +void +ShardedHashMap::foreach_value(std::function<void(const std::vector<EntryRef>&)> callback, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits) +{ + for (size_t i = 0; i < num_shards; ++i) { + auto map = _maps[i].load(std::memory_order_relaxed); + if (map != nullptr) { + map->foreach_value(callback, filter, entry_ref_offset_bits); + } + } +} + + +bool ShardedHashMap::has_held_buffers() const { return _gen_holder.getHeldBytes() != 0; diff --git a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h index df07f7a1990..c12c6a4bbb2 100644 --- a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h +++ b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h @@ -59,6 +59,8 @@ public: void foreach_key(std::function<void(EntryRef)> callback) const; void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits); bool normalize_values(std::function<EntryRef(EntryRef)> normalize); + bool normalize_values(std::function<void(std::vector<EntryRef>&)> normalize, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits); + void foreach_value(std::function<void(const std::vector<EntryRef>&)> callback, const std::vector<bool>& filter, uint32_t entry_ref_offset_bits); bool has_held_buffers() const; void compact_worst_shard(); }; |