diff options
author | Geir Storli <geirst@yahooinc.com> | 2021-12-06 17:21:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-06 17:21:34 +0100 |
commit | 091a90a1b5a4db8ade3369c7c416a4e02491bbb9 (patch) | |
tree | 3b584c71a9816fcb341d98a3318e6b80c6085994 /vespalib/src | |
parent | ffdbd053a2b57383b2d463e8050394776b14abdf (diff) | |
parent | 1330d2c3d3b8647b6053ac37e95503cd0278e2e3 (diff) |
Merge pull request #20356 from vespa-engine/toregge/filter-early-on-buffer-id-for-normalize-values-and-foreach-values
Filter early on buffer id and pass vector of entries in normalize_values
Diffstat (limited to 'vespalib/src')
18 files changed, 498 insertions, 93 deletions
diff --git a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp index e7d923d0e87..77cb8e519e4 100644 --- a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp +++ b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp @@ -73,61 +73,112 @@ BTreeStoreTest::~BTreeStoreTest() inc_generation(); } +namespace { + +class ChangeWriter { + std::vector<EntryRef*> _old_refs; +public: + ChangeWriter(uint32_t capacity); + ~ChangeWriter(); + void write(const std::vector<EntryRef>& refs); + void emplace_back(EntryRef& ref) { _old_refs.emplace_back(&ref); } +}; + +ChangeWriter::ChangeWriter(uint32_t capacity) + : _old_refs() +{ + _old_refs.reserve(capacity); +} + +ChangeWriter::~ChangeWriter() = default; + +void +ChangeWriter::write(const std::vector<EntryRef> &refs) +{ + assert(refs.size() == _old_refs.size()); + auto old_ref_itr = _old_refs.begin(); + for (auto ref : refs) { + **old_ref_itr = ref; + ++old_ref_itr; + } + assert(old_ref_itr == _old_refs.end()); + _old_refs.clear(); +} + +} + void BTreeStoreTest::test_compact_sequence(uint32_t sequence_length) { auto &store = _store; + uint32_t entry_ref_offset_bits = TreeStore::RefType::offset_bits; EntryRef ref1 = add_sequence(4, 4 + sequence_length); EntryRef ref2 = add_sequence(5, 5 + sequence_length); - EntryRef old_ref1 = ref1; - EntryRef old_ref2 = ref2; std::vector<EntryRef> refs; + refs.reserve(2); + refs.emplace_back(ref1); + refs.emplace_back(ref2); + std::vector<EntryRef> temp_refs; for (int i = 0; i < 1000; ++i) { - refs.emplace_back(add_sequence(i + 6, i + 6 + sequence_length)); + temp_refs.emplace_back(add_sequence(i + 6, i + 6 + sequence_length)); } - for (auto& ref : refs) { + for (auto& ref : temp_refs) { store.clear(ref); } inc_generation(); + ChangeWriter change_writer(refs.size()); + std::vector<EntryRef> move_refs; + move_refs.reserve(refs.size()); auto usage_before = store.getMemoryUsage(); for (uint32_t pass = 0; pass < 15; ++pass) { auto to_hold = store.start_compact_worst_buffers(); - ref1 = store.move(ref1); - ref2 = store.move(ref2); + std::vector<bool> filter(TreeStore::RefType::numBuffers()); + for (auto buffer_id : to_hold) { + filter[buffer_id] = true; + } + for (auto& ref : refs) { + if (ref.valid() && filter[ref.buffer_id(entry_ref_offset_bits)]) { + move_refs.emplace_back(ref); + change_writer.emplace_back(ref); + } + } + store.move(move_refs); + change_writer.write(move_refs); + move_refs.clear(); store.finishCompact(to_hold); inc_generation(); } - EXPECT_NE(old_ref1, ref1); - EXPECT_NE(old_ref2, ref2); - EXPECT_EQ(make_exp_sequence(4, 4 + sequence_length), get_sequence(ref1)); - EXPECT_EQ(make_exp_sequence(5, 5 + sequence_length), get_sequence(ref2)); + EXPECT_NE(ref1, refs[0]); + EXPECT_NE(ref2, refs[1]); + EXPECT_EQ(make_exp_sequence(4, 4 + sequence_length), get_sequence(refs[0])); + EXPECT_EQ(make_exp_sequence(5, 5 + sequence_length), get_sequence(refs[1])); auto usage_after = store.getMemoryUsage(); EXPECT_GT(usage_before.deadBytes(), usage_after.deadBytes()); - store.clear(ref1); - store.clear(ref2); + store.clear(refs[0]); + store.clear(refs[1]); } TEST_F(BTreeStoreTest, require_that_nodes_for_multiple_btrees_are_compacted) { auto &store = this->_store; - EntryRef ref1 = add_sequence(4, 40); - EntryRef ref2 = add_sequence(100, 130); + std::vector<EntryRef> refs; + refs.emplace_back(add_sequence(4, 40)); + refs.emplace_back(add_sequence(100, 130)); store.clear(add_sequence(1000, 20000)); inc_generation(); auto usage_before = store.getMemoryUsage(); for (uint32_t pass = 0; pass < 15; ++pass) { auto to_hold = store.start_compact_worst_btree_nodes(); - store.move_btree_nodes(ref1); - store.move_btree_nodes(ref2); + store.move_btree_nodes(refs); store.finish_compact_worst_btree_nodes(to_hold); inc_generation(); } - EXPECT_EQ(make_exp_sequence(4, 40), get_sequence(ref1)); - EXPECT_EQ(make_exp_sequence(100, 130), get_sequence(ref2)); + EXPECT_EQ(make_exp_sequence(4, 40), get_sequence(refs[0])); + EXPECT_EQ(make_exp_sequence(100, 130), get_sequence(refs[1])); auto usage_after = store.getMemoryUsage(); EXPECT_GT(usage_before.deadBytes(), usage_after.deadBytes()); - store.clear(ref1); - store.clear(ref2); + store.clear(refs[0]); + store.clear(refs[1]); } TEST_F(BTreeStoreTest, require_that_short_arrays_are_compacted) diff --git a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp index 6e984f286c1..796e19a97d1 100644 --- a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp +++ b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/datastore/sharded_hash_map.h> +#include <vespa/vespalib/datastore/entry_ref_filter.h> #include <vespa/vespalib/datastore/i_compactable.h> #include <vespa/vespalib/datastore/unique_store_allocator.h> #include <vespa/vespalib/datastore/unique_store_comparator.h> @@ -12,12 +13,14 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/datastore/unique_store_allocator.hpp> +#include <iostream> #include <thread> #include <vespa/log/log.h> LOG_SETUP("vespalib_datastore_shared_hash_test"); using vespalib::datastore::EntryRef; +using vespalib::datastore::EntryRefFilter; using vespalib::datastore::ICompactable; using RefT = vespalib::datastore::EntryRefT<22>; using MyAllocator = vespalib::datastore::UniqueStoreAllocator<uint32_t, RefT>; @@ -27,6 +30,26 @@ using MyHashMap = vespalib::datastore::ShardedHashMap; using GenerationHandler = vespalib::GenerationHandler; using vespalib::makeLambdaTask; +constexpr uint32_t small_population = 50; +/* + * large_population should trigger multiple callbacks from normalize_values + * and foreach_value + */ +constexpr uint32_t large_population = 1200; + +namespace vespalib::datastore { + +/* + * Print EntryRef as RefT which is used by test_normalize_values and + * test_foreach_value to differentiate between buffers + */ +void PrintTo(const EntryRef &ref, std::ostream* os) { + RefT iref(ref); + *os << "RefT(" << iref.offset() << "," << iref.bufferId() << ")"; +} + +} + namespace { void consider_yield(uint32_t i) @@ -58,6 +81,19 @@ public: } }; +uint32_t select_buffer(uint32_t i) { + if ((i % 2) == 0) { + return 0; + } + if ((i % 3) == 0) { + return 1; + } + if ((i % 5) == 0) { + return 2; + } + return 3; +} + } struct DataStoreShardedHashTest : public ::testing::Test @@ -86,7 +122,11 @@ struct DataStoreShardedHashTest : public ::testing::Test void read_work(uint32_t cnt); void read_work(); void write_work(uint32_t cnt); - void populate_sample_data(); + void populate_sample_data(uint32_t cnt); + void populate_sample_values(uint32_t cnt); + void clear_sample_values(uint32_t cnt); + void test_normalize_values(bool use_filter, bool one_filter); + void test_foreach_value(bool one_filter); }; @@ -213,13 +253,94 @@ DataStoreShardedHashTest::write_work(uint32_t cnt) } void -DataStoreShardedHashTest::populate_sample_data() +DataStoreShardedHashTest::populate_sample_data(uint32_t cnt) { - for (uint32_t i = 0; i < 50; ++i) { + for (uint32_t i = 0; i < cnt; ++i) { insert(i); } } +void +DataStoreShardedHashTest::populate_sample_values(uint32_t cnt) +{ + for (uint32_t i = 0; i < cnt; ++i) { + MyCompare comp(_store, i); + auto result = _hash_map.find(comp, EntryRef()); + ASSERT_NE(result, nullptr); + EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); + result->second.store_relaxed(RefT(i + 200, select_buffer(i))); + } +} + +void +DataStoreShardedHashTest::clear_sample_values(uint32_t cnt) +{ + for (uint32_t i = 0; i < cnt; ++i) { + MyCompare comp(_store, i); + auto result = _hash_map.find(comp, EntryRef()); + ASSERT_NE(result, nullptr); + EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); + result->second.store_relaxed(EntryRef()); + } +} + +namespace { + +template <typename RefT> +EntryRefFilter +make_entry_ref_filter(bool one_filter) +{ + if (one_filter) { + EntryRefFilter filter(RefT::numBuffers(), RefT::offset_bits); + filter.add_buffer(3); + return filter; + } + return EntryRefFilter::create_all_filter(RefT::numBuffers(), RefT::offset_bits); +} + +} + +void +DataStoreShardedHashTest::test_normalize_values(bool use_filter, bool one_filter) +{ + populate_sample_data(large_population); + populate_sample_values(large_population); + if (use_filter) { + auto filter = make_entry_ref_filter<RefT>(one_filter); + EXPECT_TRUE(_hash_map.normalize_values([](std::vector<EntryRef> &refs) noexcept { for (auto &ref : refs) { RefT iref(ref); ref = RefT(iref.offset() + 300, iref.bufferId()); } }, filter)); + } else { + EXPECT_TRUE(_hash_map.normalize_values([](EntryRef ref) noexcept { RefT iref(ref); return RefT(iref.offset() + 300, iref.bufferId()); })); + } + for (uint32_t i = 0; i < large_population; ++i) { + MyCompare comp(_store, i); + auto result = _hash_map.find(comp, EntryRef()); + ASSERT_NE(result, nullptr); + EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); + ASSERT_EQ(select_buffer(i), RefT(result->second.load_relaxed()).bufferId()); + if (use_filter && one_filter && select_buffer(i) != 3) { + ASSERT_EQ(i + 200, RefT(result->second.load_relaxed()).offset()); + } else { + ASSERT_EQ(i + 500, RefT(result->second.load_relaxed()).offset()); + } + result->second.store_relaxed(EntryRef()); + } +} + +void +DataStoreShardedHashTest::test_foreach_value(bool one_filter) +{ + populate_sample_data(large_population); + populate_sample_values(large_population); + + auto filter = make_entry_ref_filter<RefT>(one_filter); + std::vector<EntryRef> exp_refs; + EXPECT_FALSE(_hash_map.normalize_values([&exp_refs](std::vector<EntryRef>& refs) { exp_refs.insert(exp_refs.end(), refs.begin(), refs.end()); }, filter)); + std::vector<EntryRef> act_refs; + _hash_map.foreach_value([&act_refs](const std::vector<EntryRef> &refs) { act_refs.insert(act_refs.end(), refs.begin(), refs.end()); }, filter); + EXPECT_EQ(exp_refs, act_refs); + clear_sample_values(large_population); +} + TEST_F(DataStoreShardedHashTest, single_threaded_reader_without_updates) { _report_work = true; @@ -254,7 +375,7 @@ TEST_F(DataStoreShardedHashTest, memory_usage_is_reported) EXPECT_EQ(0, initial_usage.deadBytes()); EXPECT_EQ(0, initial_usage.allocatedBytesOnHold()); auto guard = _generationHandler.takeGuard(); - for (uint32_t i = 0; i < 50; ++i) { + for (uint32_t i = 0; i < small_population; ++i) { insert(i); } auto usage = _hash_map.get_memory_usage(); @@ -264,30 +385,31 @@ TEST_F(DataStoreShardedHashTest, memory_usage_is_reported) TEST_F(DataStoreShardedHashTest, foreach_key_works) { - populate_sample_data(); + populate_sample_data(small_population); std::vector<uint32_t> keys; _hash_map.foreach_key([this, &keys](EntryRef ref) { keys.emplace_back(_allocator.get_wrapped(ref).value()); }); std::sort(keys.begin(), keys.end()); - EXPECT_EQ(50, keys.size()); - for (uint32_t i = 0; i < 50; ++i) { + EXPECT_EQ(small_population, keys.size()); + for (uint32_t i = 0; i < small_population; ++i) { EXPECT_EQ(i, keys[i]); } } TEST_F(DataStoreShardedHashTest, move_keys_works) { - populate_sample_data(); + populate_sample_data(small_population); std::vector<EntryRef> refs; _hash_map.foreach_key([&refs](EntryRef ref) { refs.emplace_back(ref); }); std::vector<EntryRef> new_refs; MyCompactable my_compactable(_allocator, new_refs); - _hash_map.move_keys(my_compactable, std::vector<bool>(RefT::numBuffers(), true), RefT::offset_bits); + auto filter = make_entry_ref_filter<RefT>(false); + _hash_map.move_keys(my_compactable, filter); std::vector<EntryRef> verify_new_refs; _hash_map.foreach_key([&verify_new_refs](EntryRef ref) { verify_new_refs.emplace_back(ref); }); - EXPECT_EQ(50u, refs.size()); + EXPECT_EQ(small_population, refs.size()); EXPECT_NE(refs, new_refs); EXPECT_EQ(new_refs, verify_new_refs); - for (uint32_t i = 0; i < 50; ++i) { + for (uint32_t i = 0; i < small_population; ++i) { EXPECT_NE(refs[i], new_refs[i]); auto value = _allocator.get_wrapped(refs[i]).value(); auto new_value = _allocator.get_wrapped(refs[i]).value(); @@ -297,29 +419,33 @@ TEST_F(DataStoreShardedHashTest, move_keys_works) TEST_F(DataStoreShardedHashTest, normalize_values_works) { - populate_sample_data(); - for (uint32_t i = 0; i < 50; ++i) { - MyCompare comp(_store, i); - auto result = _hash_map.find(comp, EntryRef()); - ASSERT_NE(result, nullptr); - EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); - result->second.store_relaxed(EntryRef(i + 200)); - } - _hash_map.normalize_values([](EntryRef ref) noexcept { return EntryRef(ref.ref() + 300); }); - for (uint32_t i = 0; i < 50; ++i) { - MyCompare comp(_store, i); - auto result = _hash_map.find(comp, EntryRef()); - ASSERT_NE(result, nullptr); - EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value()); - ASSERT_EQ(i + 500, result->second.load_relaxed().ref()); - result->second.store_relaxed(EntryRef()); - } + test_normalize_values(false, false); +} + +TEST_F(DataStoreShardedHashTest, normalize_values_all_filter_works) +{ + test_normalize_values(true, false); +} + +TEST_F(DataStoreShardedHashTest, normalize_values_one_filter_works) +{ + test_normalize_values(true, true); +} + +TEST_F(DataStoreShardedHashTest, foreach_value_all_filter_works) +{ + test_foreach_value(false); +} + +TEST_F(DataStoreShardedHashTest, foreach_value_one_filter_works) +{ + test_foreach_value(true); } TEST_F(DataStoreShardedHashTest, compact_worst_shard_works) { - populate_sample_data(); - for (uint32_t i = 10; i < 50; ++i) { + populate_sample_data(small_population); + for (uint32_t i = 10; i < small_population; ++i) { remove(i); } commit(); diff --git a/vespalib/src/vespa/vespalib/btree/btreeiterator.h b/vespalib/src/vespa/vespalib/btree/btreeiterator.h index 325ce0e0e47..30123b1946e 100644 --- a/vespalib/src/vespa/vespalib/btree/btreeiterator.h +++ b/vespalib/src/vespa/vespalib/btree/btreeiterator.h @@ -113,6 +113,9 @@ public: return _node->getData(_idx); } + // Only use during compaction when changing reference to moved value + DataType &getWData() { return getWNode()->getWData(_idx); } + bool valid() const { @@ -881,6 +884,9 @@ public: _leaf.getWNode()->writeData(_leaf.getIdx(), data); } + // Only use during compaction when changing reference to moved value + DataType &getWData() { return _leaf.getWData(); } + /** * Set a new key for the current iterator position. * The new key must have the same semantic meaning as the old key. diff --git a/vespalib/src/vespa/vespalib/btree/btreenode.h b/vespalib/src/vespa/vespalib/btree/btreenode.h index d8752d77f0b..468f17fcd1a 100644 --- a/vespalib/src/vespa/vespalib/btree/btreenode.h +++ b/vespalib/src/vespa/vespalib/btree/btreenode.h @@ -99,6 +99,8 @@ public: } const DataT &getData(uint32_t idx) const { return _data[idx]; } + // Only use during compaction when changing reference to moved value + DataT &getWData(uint32_t idx) { return _data[idx]; } void setData(uint32_t idx, const DataT &data) { _data[idx] = data; } static bool hasData() { return true; } }; @@ -120,6 +122,9 @@ public: return BTreeNoLeafData::_instance; } + // Only use during compaction when changing reference to moved value + BTreeNoLeafData &getWData(uint32_t) const { return BTreeNoLeafData::_instance; } + void setData(uint32_t idx, const BTreeNoLeafData &data) { (void) idx; (void) data; diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.h b/vespalib/src/vespa/vespalib/btree/btreestore.h index 82913987e44..b4238757e46 100644 --- a/vespalib/src/vespa/vespalib/btree/btreestore.h +++ b/vespalib/src/vespa/vespalib/btree/btreestore.h @@ -298,6 +298,9 @@ public: bool isSmallArray(const EntryRef ref) const; + static bool isBTree(uint32_t typeId) { return typeId == BUFFERTYPE_BTREE; } + bool isBTree(RefType ref) const { return isBTree(getTypeId(ref)); } + /** * Returns the cluster size for the type id. * Cluster size == 0 means we have a tree for the given reference. @@ -391,10 +394,10 @@ public: std::vector<uint32_t> start_compact_worst_btree_nodes(); void finish_compact_worst_btree_nodes(const std::vector<uint32_t>& to_hold); - void move_btree_nodes(EntryRef ref); + void move_btree_nodes(const std::vector<EntryRef>& refs); std::vector<uint32_t> start_compact_worst_buffers(); - EntryRef move(EntryRef ref); + void move(std::vector<EntryRef>& refs); private: static constexpr size_t MIN_BUFFER_ARRAYS = 128u; diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.hpp b/vespalib/src/vespa/vespalib/btree/btreestore.hpp index 15c546a0368..795e526f927 100644 --- a/vespalib/src/vespa/vespalib/btree/btreestore.hpp +++ b/vespalib/src/vespa/vespalib/btree/btreestore.hpp @@ -991,15 +991,15 @@ template <typename KeyT, typename DataT, typename AggrT, typename CompareT, typename TraitsT, typename AggrCalcT> void BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: -move_btree_nodes(EntryRef ref) +move_btree_nodes(const std::vector<EntryRef>& refs) { - if (ref.valid()) { + for (auto& ref : refs) { RefType iRef(ref); - uint32_t clusterSize = getClusterSize(iRef); - if (clusterSize == 0) { - BTreeType *tree = getWTreeEntry(iRef); - tree->move_nodes(_allocator); - } + assert(iRef.valid()); + uint32_t typeId = getTypeId(iRef); + assert(isBTree(typeId)); + BTreeType *tree = getWTreeEntry(iRef); + tree->move_nodes(_allocator); } } @@ -1015,23 +1015,25 @@ start_compact_worst_buffers() template <typename KeyT, typename DataT, typename AggrT, typename CompareT, typename TraitsT, typename AggrCalcT> -typename BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::EntryRef +void BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>:: -move(EntryRef ref) +move(std::vector<EntryRef> &refs) { - if (!ref.valid() || !_store.getCompacting(ref)) { - return ref; - } - RefType iRef(ref); - uint32_t clusterSize = getClusterSize(iRef); - if (clusterSize == 0) { - BTreeType *tree = getWTreeEntry(iRef); - auto ref_and_ptr = allocBTreeCopy(*tree); - tree->prepare_hold(); - return ref_and_ptr.ref; + for (auto& ref : refs) { + RefType iRef(ref); + assert(iRef.valid()); + assert(_store.getCompacting(iRef)); + uint32_t clusterSize = getClusterSize(iRef); + if (clusterSize == 0) { + BTreeType *tree = getWTreeEntry(iRef); + auto ref_and_ptr = allocBTreeCopy(*tree); + tree->prepare_hold(); + ref = ref_and_ptr.ref; + } else { + const KeyDataType *shortArray = getKeyDataEntry(iRef, clusterSize); + ref = allocKeyDataCopy(shortArray, clusterSize).ref; + } } - const KeyDataType *shortArray = getKeyDataEntry(iRef, clusterSize); - return allocKeyDataCopy(shortArray, clusterSize).ref; } } diff --git a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt index 6c6f5258555..9b796c62232 100644 --- a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt @@ -8,6 +8,7 @@ vespa_add_library(vespalib_vespalib_datastore OBJECT datastore.cpp datastorebase.cpp entryref.cpp + entry_ref_filter.cpp fixed_size_hash_map.cpp sharded_hash_map.cpp unique_store.cpp diff --git a/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.cpp b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.cpp new file mode 100644 index 00000000000..87c3c87636c --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.cpp @@ -0,0 +1,28 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "entry_ref_filter.h" + +namespace vespalib::datastore { + +EntryRefFilter::EntryRefFilter(std::vector<bool> filter, uint32_t offset_bits) + : _filter(std::move(filter)), + _offset_bits(offset_bits) +{ +} + +EntryRefFilter::EntryRefFilter(uint32_t num_buffers, uint32_t offset_bits) + : _filter(num_buffers), + _offset_bits(offset_bits) +{ +} + +EntryRefFilter::~EntryRefFilter() = default; + +EntryRefFilter +EntryRefFilter::create_all_filter(uint32_t num_buffers, uint32_t offset_bits) +{ + std::vector<bool> filter(num_buffers, true); + return EntryRefFilter(std::move(filter), offset_bits); +} + +} diff --git a/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.h b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.h new file mode 100644 index 00000000000..c06d843fbd0 --- /dev/null +++ b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.h @@ -0,0 +1,35 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "entryref.h" +#include <vector> + +namespace vespalib::datastore { + +/* + * Class to filter entry refs based on which buffer the entry is referencing. + * + * Buffers being allowed have corresponding bit in _filter set. + */ +class EntryRefFilter { + std::vector<bool> _filter; + uint32_t _offset_bits; + EntryRefFilter(std::vector<bool> filter, uint32_t offset_bits); +public: + EntryRefFilter(uint32_t num_buffers, uint32_t offset_bits); + ~EntryRefFilter(); + bool has(EntryRef ref) const { + uint32_t buffer_id = ref.buffer_id(_offset_bits); + return _filter[buffer_id]; + } + void add_buffer(uint32_t buffer_id) { _filter[buffer_id] = true; } + void add_buffers(const std::vector<uint32_t>& ids) { + for (auto buffer_id : ids) { + _filter[buffer_id] = true; + } + } + static EntryRefFilter create_all_filter(uint32_t num_buffers, uint32_t offset_bits); +}; + +} diff --git a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp index db9fee8ea70..6f001ce3c94 100644 --- a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp +++ b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp @@ -2,6 +2,7 @@ #include "fixed_size_hash_map.h" #include "entry_comparator.h" +#include "entry_ref_filter.h" #include "i_compactable.h" #include <vespa/vespalib/util/array.hpp> #include <vespa/vespalib/util/memoryusage.h> @@ -182,7 +183,7 @@ FixedSizeHashMap::foreach_key(const std::function<void(EntryRef)>& callback) con } void -FixedSizeHashMap::move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) +FixedSizeHashMap::move_keys(ICompactable& compactable, const EntryRefFilter &compacting_buffers) { for (auto& chain_head : _chain_heads) { uint32_t node_idx = chain_head.load_relaxed(); @@ -190,8 +191,7 @@ FixedSizeHashMap::move_keys(ICompactable& compactable, const std::vector<bool>& auto& node = _nodes[node_idx]; EntryRef old_ref = node.get_kv().first.load_relaxed(); assert(old_ref.valid()); - uint32_t buffer_id = old_ref.buffer_id(entry_ref_offset_bits); - if (compacting_buffers[buffer_id]) { + if (compacting_buffers.has(old_ref)) { EntryRef new_ref = compactable.move(old_ref); node.get_kv().first.store_release(new_ref); } @@ -220,4 +220,104 @@ FixedSizeHashMap::normalize_values(const std::function<EntryRef(EntryRef)>& norm return changed; } +namespace { + +class ChangeWriter { + std::vector<AtomicEntryRef*> _atomic_refs; +public: + ChangeWriter(uint32_t capacity); + ~ChangeWriter(); + bool write(const std::vector<EntryRef> &refs); + void emplace_back(AtomicEntryRef &atomic_ref) { _atomic_refs.emplace_back(&atomic_ref); } +}; + +ChangeWriter::ChangeWriter(uint32_t capacity) + : _atomic_refs() +{ + _atomic_refs.reserve(capacity); +} + +ChangeWriter::~ChangeWriter() = default; + +bool +ChangeWriter::write(const std::vector<EntryRef> &refs) +{ + bool changed = false; + assert(refs.size() == _atomic_refs.size()); + auto atomic_ref = _atomic_refs.begin(); + for (auto ref : refs) { + EntryRef old_ref = (*atomic_ref)->load_relaxed(); + if (ref != old_ref) { + (*atomic_ref)->store_release(ref); + changed = true; + } + ++atomic_ref; + } + assert(atomic_ref == _atomic_refs.end()); + _atomic_refs.clear(); + return changed; +} + +} + +bool +FixedSizeHashMap::normalize_values(const std::function<void(std::vector<EntryRef>&)>& normalize, const EntryRefFilter& filter) +{ + std::vector<EntryRef> refs; + refs.reserve(1024); + bool changed = false; + ChangeWriter change_writer(refs.capacity()); + for (auto& chain_head : _chain_heads) { + uint32_t node_idx = chain_head.load_relaxed(); + while (node_idx != no_node_idx) { + auto& node = _nodes[node_idx]; + EntryRef ref = node.get_kv().second.load_relaxed(); + if (ref.valid()) { + if (filter.has(ref)) { + refs.emplace_back(ref); + change_writer.emplace_back(node.get_kv().second); + if (refs.size() >= refs.capacity()) { + normalize(refs); + changed |= change_writer.write(refs); + refs.clear(); + } + } + } + node_idx = node.get_next_node_idx().load(std::memory_order_relaxed); + } + } + if (!refs.empty()) { + normalize(refs); + changed |= change_writer.write(refs); + } + return changed; +} + +void +FixedSizeHashMap::foreach_value(const std::function<void(const std::vector<EntryRef>&)>& callback, const EntryRefFilter& filter) +{ + std::vector<EntryRef> refs; + refs.reserve(1024); + for (auto& chain_head : _chain_heads) { + uint32_t node_idx = chain_head.load_relaxed(); + while (node_idx != no_node_idx) { + auto& node = _nodes[node_idx]; + EntryRef ref = node.get_kv().second.load_relaxed(); + if (ref.valid()) { + if (filter.has(ref)) { + refs.emplace_back(ref); + if (refs.size() >= refs.capacity()) { + callback(refs); + refs.clear(); + } + } + } + node_idx = node.get_next_node_idx().load(std::memory_order_relaxed); + } + } + if (!refs.empty()) { + callback(refs); + } +} + } diff --git a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h index 035cd84dbee..c522bcc3c33 100644 --- a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h +++ b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h @@ -18,6 +18,7 @@ class MemoryUsage; } namespace vespalib::datastore { +class EntryRefFilter; struct ICompactable; class ShardedHashComparator { @@ -158,8 +159,26 @@ public: size_t size() const noexcept { return _count; } MemoryUsage get_memory_usage() const; void foreach_key(const std::function<void(EntryRef)>& callback) const; - void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits); + void move_keys(ICompactable& compactable, const EntryRefFilter &compacting_buffers); + /* + * Scan dictionary and call normalize function for each value. If + * returned value is different then write back the modified value to + * the dictionary. Used when clearing all posting lists. + */ bool normalize_values(const std::function<EntryRef(EntryRef)>& normalize); + /* + * Scan dictionary and call normalize function for batches of values + * that pass the filter. Write back modified values to the dictionary. + * Used by compaction of posting lists when moving short arrays, + * bitvectors or btree roots. + */ + bool normalize_values(const std::function<void(std::vector<EntryRef>&)>& normalize, const EntryRefFilter& filter); + /* + * Scan dictionary and call callback function for batches of values + * that pass the filter. Used by compaction of posting lists when + * moving btree nodes. + */ + void foreach_value(const std::function<void(const std::vector<EntryRef>&)>& callback, const EntryRefFilter& filter); }; } diff --git a/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h b/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h index 886ec095dcd..cf848167070 100644 --- a/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h +++ b/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h @@ -11,6 +11,7 @@ namespace vespalib::datastore { class EntryComparator; +class EntryRefFilter; struct ICompactable; class IUniqueStoreDictionaryReadSnapshot; class UniqueStoreAddResult; @@ -28,7 +29,7 @@ public: virtual UniqueStoreAddResult add(const EntryComparator& comp, std::function<EntryRef(void)> insertEntry) = 0; virtual EntryRef find(const EntryComparator& comp) = 0; virtual void remove(const EntryComparator& comp, EntryRef ref) = 0; - virtual void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) = 0; + virtual void move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers) = 0; virtual uint32_t get_num_uniques() const = 0; virtual vespalib::MemoryUsage get_memory_usage() const = 0; virtual void build(vespalib::ConstArrayRef<EntryRef>, vespalib::ConstArrayRef<uint32_t> ref_counts, std::function<void(EntryRef)> hold) = 0; diff --git a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp index da4db92a309..019b98a53dd 100644 --- a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp +++ b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp @@ -171,12 +171,12 @@ ShardedHashMap::foreach_key(std::function<void(EntryRef)> callback) const } void -ShardedHashMap::move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) +ShardedHashMap::move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers) { for (size_t i = 0; i < num_shards; ++i) { auto map = _maps[i].load(std::memory_order_relaxed); if (map != nullptr) { - map->move_keys(compactable, compacting_buffers, entry_ref_offset_bits); + map->move_keys(compactable, compacting_buffers); } } } @@ -195,6 +195,31 @@ ShardedHashMap::normalize_values(std::function<EntryRef(EntryRef)> normalize) } bool +ShardedHashMap::normalize_values(std::function<void(std::vector<EntryRef>&)> normalize, const EntryRefFilter& filter) +{ + bool changed = false; + for (size_t i = 0; i < num_shards; ++i) { + auto map = _maps[i].load(std::memory_order_relaxed); + if (map != nullptr) { + changed |= map->normalize_values(normalize, filter); + } + } + return changed; +} + +void +ShardedHashMap::foreach_value(std::function<void(const std::vector<EntryRef>&)> callback, const EntryRefFilter& filter) +{ + for (size_t i = 0; i < num_shards; ++i) { + auto map = _maps[i].load(std::memory_order_relaxed); + if (map != nullptr) { + map->foreach_value(callback, filter); + } + } +} + + +bool ShardedHashMap::has_held_buffers() const { return _gen_holder.getHeldBytes() != 0; diff --git a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h index df07f7a1990..e0ba9488351 100644 --- a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h +++ b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h @@ -11,6 +11,7 @@ namespace vespalib { class MemoryUsage; } namespace vespalib::datastore { class EntryComparator; +class EntryRefFilter; class FixedSizeHashMap; struct ICompactable; @@ -57,8 +58,10 @@ public: const EntryComparator &get_default_comparator() const noexcept { return *_comp; } MemoryUsage get_memory_usage() const; void foreach_key(std::function<void(EntryRef)> callback) const; - void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits); + void move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers); bool normalize_values(std::function<EntryRef(EntryRef)> normalize); + bool normalize_values(std::function<void(std::vector<EntryRef>&)> normalize, const EntryRefFilter& filter); + void foreach_value(std::function<void(const std::vector<EntryRef>&)> callback, const EntryRefFilter& filter); bool has_held_buffers() const; void compact_worst_shard(); }; diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp index d375dbae149..b02a2e52185 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp @@ -102,11 +102,9 @@ private: std::vector<uint32_t> _bufferIdsToCompact; void allocMapping() { - _compacting_buffer.resize(RefT::numBuffers()); _mapping.resize(RefT::numBuffers()); for (const auto bufferId : _bufferIdsToCompact) { BufferState &state = _dataStore.getBufferState(bufferId); - _compacting_buffer[bufferId] = true; _mapping[bufferId].resize(state.get_used_arrays()); } } @@ -124,7 +122,7 @@ private: } void fillMapping() { - _dict.move_keys(*this, _compacting_buffer, RefT::offset_bits); + _dict.move_keys(*this, _compacting_buffer); } public: @@ -140,6 +138,7 @@ public: _bufferIdsToCompact(std::move(bufferIdsToCompact)) { if (!_bufferIdsToCompact.empty()) { + _compacting_buffer.add_buffers(_bufferIdsToCompact); allocMapping(); fillMapping(); } diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h index 3b0169b5a34..54d541853c7 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h @@ -79,7 +79,7 @@ public: UniqueStoreAddResult add(const EntryComparator& comp, std::function<EntryRef(void)> insertEntry) override; EntryRef find(const EntryComparator& comp) override; void remove(const EntryComparator& comp, EntryRef ref) override; - void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) override; + void move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers) override; uint32_t get_num_uniques() const override; vespalib::MemoryUsage get_memory_usage() const override; void build(vespalib::ConstArrayRef<EntryRef>, vespalib::ConstArrayRef<uint32_t> ref_counts, std::function<void(EntryRef)> hold) override; diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp index e88376be9fb..13ae0a317e0 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp @@ -4,6 +4,7 @@ #include "datastore.hpp" #include "entry_comparator_wrapper.h" +#include "entry_ref_filter.h" #include "i_compactable.h" #include "unique_store_add_result.h" #include "unique_store_dictionary.h" @@ -139,15 +140,14 @@ UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::remove(const template <typename BTreeDictionaryT, typename ParentT, typename HashDictionaryT> void -UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::move_keys(ICompactable &compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) +UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::move_keys(ICompactable &compactable, const EntryRefFilter& compacting_buffers) { if constexpr (has_btree_dictionary) { auto itr = this->_btree_dict.begin(); while (itr.valid()) { EntryRef oldRef(itr.getKey()); assert(oldRef.valid()); - uint32_t buffer_id = oldRef.buffer_id(entry_ref_offset_bits); - if (compacting_buffers[buffer_id]) { + if (compacting_buffers.has(oldRef)) { EntryRef newRef(compactable.move(oldRef)); this->_btree_dict.thaw(itr); itr.writeKey(newRef); @@ -160,7 +160,7 @@ UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::move_keys(ICo ++itr; } } else { - this->_hash_dict.move_keys(compactable, compacting_buffers, entry_ref_offset_bits); + this->_hash_dict.move_keys(compactable, compacting_buffers); } } diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h index 4a8d72c8685..873af07a902 100644 --- a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h +++ b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h @@ -3,6 +3,7 @@ #pragma once #include "entryref.h" +#include "entry_ref_filter.h" #include <vector> #include <vespa/vespalib/stllike/allocator.h> @@ -18,11 +19,11 @@ public: using RefType = RefT; protected: - std::vector<bool> _compacting_buffer; + EntryRefFilter _compacting_buffer; std::vector<std::vector<EntryRef, allocator_large<EntryRef>>> _mapping; public: UniqueStoreRemapper() - : _compacting_buffer(), + : _compacting_buffer(RefT::numBuffers(), RefT::offset_bits), _mapping() { } @@ -30,11 +31,11 @@ public: EntryRef remap(EntryRef ref) const { if (ref.valid()) { - RefType internal_ref(ref); - if (!_compacting_buffer[internal_ref.bufferId()]) { + if (!_compacting_buffer.has(ref)) { // No remapping for references to buffers not being compacted return ref; } else { + RefType internal_ref(ref); auto &inner_mapping = _mapping[internal_ref.bufferId()]; assert(internal_ref.unscaled_offset() < inner_mapping.size()); EntryRef mapped_ref = inner_mapping[internal_ref.unscaled_offset()]; |