diff options
Diffstat (limited to 'vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp')
-rw-r--r-- | vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp | 106 |
1 files changed, 103 insertions, 3 deletions
diff --git a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp index db9fee8ea70..6f001ce3c94 100644 --- a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp +++ b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp @@ -2,6 +2,7 @@ #include "fixed_size_hash_map.h" #include "entry_comparator.h" +#include "entry_ref_filter.h" #include "i_compactable.h" #include <vespa/vespalib/util/array.hpp> #include <vespa/vespalib/util/memoryusage.h> @@ -182,7 +183,7 @@ FixedSizeHashMap::foreach_key(const std::function<void(EntryRef)>& callback) con } void -FixedSizeHashMap::move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) +FixedSizeHashMap::move_keys(ICompactable& compactable, const EntryRefFilter &compacting_buffers) { for (auto& chain_head : _chain_heads) { uint32_t node_idx = chain_head.load_relaxed(); @@ -190,8 +191,7 @@ FixedSizeHashMap::move_keys(ICompactable& compactable, const std::vector<bool>& auto& node = _nodes[node_idx]; EntryRef old_ref = node.get_kv().first.load_relaxed(); assert(old_ref.valid()); - uint32_t buffer_id = old_ref.buffer_id(entry_ref_offset_bits); - if (compacting_buffers[buffer_id]) { + if (compacting_buffers.has(old_ref)) { EntryRef new_ref = compactable.move(old_ref); node.get_kv().first.store_release(new_ref); } @@ -220,4 +220,104 @@ FixedSizeHashMap::normalize_values(const std::function<EntryRef(EntryRef)>& norm return changed; } +namespace { + +class ChangeWriter { + std::vector<AtomicEntryRef*> _atomic_refs; +public: + ChangeWriter(uint32_t capacity); + ~ChangeWriter(); + bool write(const std::vector<EntryRef> &refs); + void emplace_back(AtomicEntryRef &atomic_ref) { _atomic_refs.emplace_back(&atomic_ref); } +}; + +ChangeWriter::ChangeWriter(uint32_t capacity) + : _atomic_refs() +{ + _atomic_refs.reserve(capacity); +} + +ChangeWriter::~ChangeWriter() = default; + +bool +ChangeWriter::write(const std::vector<EntryRef> &refs) +{ + bool changed = false; + assert(refs.size() == _atomic_refs.size()); + auto atomic_ref = _atomic_refs.begin(); + for (auto ref : refs) { + EntryRef old_ref = (*atomic_ref)->load_relaxed(); + if (ref != old_ref) { + (*atomic_ref)->store_release(ref); + changed = true; + } + ++atomic_ref; + } + assert(atomic_ref == _atomic_refs.end()); + _atomic_refs.clear(); + return changed; +} + +} + +bool +FixedSizeHashMap::normalize_values(const std::function<void(std::vector<EntryRef>&)>& normalize, const EntryRefFilter& filter) +{ + std::vector<EntryRef> refs; + refs.reserve(1024); + bool changed = false; + ChangeWriter change_writer(refs.capacity()); + for (auto& chain_head : _chain_heads) { + uint32_t node_idx = chain_head.load_relaxed(); + while (node_idx != no_node_idx) { + auto& node = _nodes[node_idx]; + EntryRef ref = node.get_kv().second.load_relaxed(); + if (ref.valid()) { + if (filter.has(ref)) { + refs.emplace_back(ref); + change_writer.emplace_back(node.get_kv().second); + if (refs.size() >= refs.capacity()) { + normalize(refs); + changed |= change_writer.write(refs); + refs.clear(); + } + } + } + node_idx = node.get_next_node_idx().load(std::memory_order_relaxed); + } + } + if (!refs.empty()) { + normalize(refs); + changed |= change_writer.write(refs); + } + return changed; +} + +void +FixedSizeHashMap::foreach_value(const std::function<void(const std::vector<EntryRef>&)>& callback, const EntryRefFilter& filter) +{ + std::vector<EntryRef> refs; + refs.reserve(1024); + for (auto& chain_head : _chain_heads) { + uint32_t node_idx = chain_head.load_relaxed(); + while (node_idx != no_node_idx) { + auto& node = _nodes[node_idx]; + EntryRef ref = node.get_kv().second.load_relaxed(); + if (ref.valid()) { + if (filter.has(ref)) { + refs.emplace_back(ref); + if (refs.size() >= refs.capacity()) { + callback(refs); + refs.clear(); + } + } + } + node_idx = node.get_next_node_idx().load(std::memory_order_relaxed); + } + } + if (!refs.empty()) { + callback(refs); + } +} + } |