aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2021-12-06 17:21:34 +0100
committerGitHub <noreply@github.com>2021-12-06 17:21:34 +0100
commit091a90a1b5a4db8ade3369c7c416a4e02491bbb9 (patch)
tree3b584c71a9816fcb341d98a3318e6b80c6085994 /vespalib
parentffdbd053a2b57383b2d463e8050394776b14abdf (diff)
parent1330d2c3d3b8647b6053ac37e95503cd0278e2e3 (diff)
Merge pull request #20356 from vespa-engine/toregge/filter-early-on-buffer-id-for-normalize-values-and-foreach-values
Filter early on buffer id and pass vector of entries in normalize_values
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/btree/btree_store/btree_store_test.cpp91
-rw-r--r--vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp186
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreeiterator.h6
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreenode.h5
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreestore.h7
-rw-r--r--vespalib/src/vespa/vespalib/btree/btreestore.hpp44
-rw-r--r--vespalib/src/vespa/vespalib/datastore/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/datastore/entry_ref_filter.cpp28
-rw-r--r--vespalib/src/vespa/vespalib/datastore/entry_ref_filter.h35
-rw-r--r--vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp106
-rw-r--r--vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h21
-rw-r--r--vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h3
-rw-r--r--vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp29
-rw-r--r--vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h5
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store.hpp5
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h2
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h9
18 files changed, 498 insertions, 93 deletions
diff --git a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp
index e7d923d0e87..77cb8e519e4 100644
--- a/vespalib/src/tests/btree/btree_store/btree_store_test.cpp
+++ b/vespalib/src/tests/btree/btree_store/btree_store_test.cpp
@@ -73,61 +73,112 @@ BTreeStoreTest::~BTreeStoreTest()
inc_generation();
}
+namespace {
+
+class ChangeWriter {
+ std::vector<EntryRef*> _old_refs;
+public:
+ ChangeWriter(uint32_t capacity);
+ ~ChangeWriter();
+ void write(const std::vector<EntryRef>& refs);
+ void emplace_back(EntryRef& ref) { _old_refs.emplace_back(&ref); }
+};
+
+ChangeWriter::ChangeWriter(uint32_t capacity)
+ : _old_refs()
+{
+ _old_refs.reserve(capacity);
+}
+
+ChangeWriter::~ChangeWriter() = default;
+
+void
+ChangeWriter::write(const std::vector<EntryRef> &refs)
+{
+ assert(refs.size() == _old_refs.size());
+ auto old_ref_itr = _old_refs.begin();
+ for (auto ref : refs) {
+ **old_ref_itr = ref;
+ ++old_ref_itr;
+ }
+ assert(old_ref_itr == _old_refs.end());
+ _old_refs.clear();
+}
+
+}
+
void
BTreeStoreTest::test_compact_sequence(uint32_t sequence_length)
{
auto &store = _store;
+ uint32_t entry_ref_offset_bits = TreeStore::RefType::offset_bits;
EntryRef ref1 = add_sequence(4, 4 + sequence_length);
EntryRef ref2 = add_sequence(5, 5 + sequence_length);
- EntryRef old_ref1 = ref1;
- EntryRef old_ref2 = ref2;
std::vector<EntryRef> refs;
+ refs.reserve(2);
+ refs.emplace_back(ref1);
+ refs.emplace_back(ref2);
+ std::vector<EntryRef> temp_refs;
for (int i = 0; i < 1000; ++i) {
- refs.emplace_back(add_sequence(i + 6, i + 6 + sequence_length));
+ temp_refs.emplace_back(add_sequence(i + 6, i + 6 + sequence_length));
}
- for (auto& ref : refs) {
+ for (auto& ref : temp_refs) {
store.clear(ref);
}
inc_generation();
+ ChangeWriter change_writer(refs.size());
+ std::vector<EntryRef> move_refs;
+ move_refs.reserve(refs.size());
auto usage_before = store.getMemoryUsage();
for (uint32_t pass = 0; pass < 15; ++pass) {
auto to_hold = store.start_compact_worst_buffers();
- ref1 = store.move(ref1);
- ref2 = store.move(ref2);
+ std::vector<bool> filter(TreeStore::RefType::numBuffers());
+ for (auto buffer_id : to_hold) {
+ filter[buffer_id] = true;
+ }
+ for (auto& ref : refs) {
+ if (ref.valid() && filter[ref.buffer_id(entry_ref_offset_bits)]) {
+ move_refs.emplace_back(ref);
+ change_writer.emplace_back(ref);
+ }
+ }
+ store.move(move_refs);
+ change_writer.write(move_refs);
+ move_refs.clear();
store.finishCompact(to_hold);
inc_generation();
}
- EXPECT_NE(old_ref1, ref1);
- EXPECT_NE(old_ref2, ref2);
- EXPECT_EQ(make_exp_sequence(4, 4 + sequence_length), get_sequence(ref1));
- EXPECT_EQ(make_exp_sequence(5, 5 + sequence_length), get_sequence(ref2));
+ EXPECT_NE(ref1, refs[0]);
+ EXPECT_NE(ref2, refs[1]);
+ EXPECT_EQ(make_exp_sequence(4, 4 + sequence_length), get_sequence(refs[0]));
+ EXPECT_EQ(make_exp_sequence(5, 5 + sequence_length), get_sequence(refs[1]));
auto usage_after = store.getMemoryUsage();
EXPECT_GT(usage_before.deadBytes(), usage_after.deadBytes());
- store.clear(ref1);
- store.clear(ref2);
+ store.clear(refs[0]);
+ store.clear(refs[1]);
}
TEST_F(BTreeStoreTest, require_that_nodes_for_multiple_btrees_are_compacted)
{
auto &store = this->_store;
- EntryRef ref1 = add_sequence(4, 40);
- EntryRef ref2 = add_sequence(100, 130);
+ std::vector<EntryRef> refs;
+ refs.emplace_back(add_sequence(4, 40));
+ refs.emplace_back(add_sequence(100, 130));
store.clear(add_sequence(1000, 20000));
inc_generation();
auto usage_before = store.getMemoryUsage();
for (uint32_t pass = 0; pass < 15; ++pass) {
auto to_hold = store.start_compact_worst_btree_nodes();
- store.move_btree_nodes(ref1);
- store.move_btree_nodes(ref2);
+ store.move_btree_nodes(refs);
store.finish_compact_worst_btree_nodes(to_hold);
inc_generation();
}
- EXPECT_EQ(make_exp_sequence(4, 40), get_sequence(ref1));
- EXPECT_EQ(make_exp_sequence(100, 130), get_sequence(ref2));
+ EXPECT_EQ(make_exp_sequence(4, 40), get_sequence(refs[0]));
+ EXPECT_EQ(make_exp_sequence(100, 130), get_sequence(refs[1]));
auto usage_after = store.getMemoryUsage();
EXPECT_GT(usage_before.deadBytes(), usage_after.deadBytes());
- store.clear(ref1);
- store.clear(ref2);
+ store.clear(refs[0]);
+ store.clear(refs[1]);
}
TEST_F(BTreeStoreTest, require_that_short_arrays_are_compacted)
diff --git a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp
index 6e984f286c1..796e19a97d1 100644
--- a/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp
+++ b/vespalib/src/tests/datastore/sharded_hash_map/sharded_hash_map_test.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/datastore/sharded_hash_map.h>
+#include <vespa/vespalib/datastore/entry_ref_filter.h>
#include <vespa/vespalib/datastore/i_compactable.h>
#include <vespa/vespalib/datastore/unique_store_allocator.h>
#include <vespa/vespalib/datastore/unique_store_comparator.h>
@@ -12,12 +13,14 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/datastore/unique_store_allocator.hpp>
+#include <iostream>
#include <thread>
#include <vespa/log/log.h>
LOG_SETUP("vespalib_datastore_shared_hash_test");
using vespalib::datastore::EntryRef;
+using vespalib::datastore::EntryRefFilter;
using vespalib::datastore::ICompactable;
using RefT = vespalib::datastore::EntryRefT<22>;
using MyAllocator = vespalib::datastore::UniqueStoreAllocator<uint32_t, RefT>;
@@ -27,6 +30,26 @@ using MyHashMap = vespalib::datastore::ShardedHashMap;
using GenerationHandler = vespalib::GenerationHandler;
using vespalib::makeLambdaTask;
+constexpr uint32_t small_population = 50;
+/*
+ * large_population should trigger multiple callbacks from normalize_values
+ * and foreach_value
+ */
+constexpr uint32_t large_population = 1200;
+
+namespace vespalib::datastore {
+
+/*
+ * Print EntryRef as RefT which is used by test_normalize_values and
+ * test_foreach_value to differentiate between buffers
+ */
+void PrintTo(const EntryRef &ref, std::ostream* os) {
+ RefT iref(ref);
+ *os << "RefT(" << iref.offset() << "," << iref.bufferId() << ")";
+}
+
+}
+
namespace {
void consider_yield(uint32_t i)
@@ -58,6 +81,19 @@ public:
}
};
+uint32_t select_buffer(uint32_t i) {
+ if ((i % 2) == 0) {
+ return 0;
+ }
+ if ((i % 3) == 0) {
+ return 1;
+ }
+ if ((i % 5) == 0) {
+ return 2;
+ }
+ return 3;
+}
+
}
struct DataStoreShardedHashTest : public ::testing::Test
@@ -86,7 +122,11 @@ struct DataStoreShardedHashTest : public ::testing::Test
void read_work(uint32_t cnt);
void read_work();
void write_work(uint32_t cnt);
- void populate_sample_data();
+ void populate_sample_data(uint32_t cnt);
+ void populate_sample_values(uint32_t cnt);
+ void clear_sample_values(uint32_t cnt);
+ void test_normalize_values(bool use_filter, bool one_filter);
+ void test_foreach_value(bool one_filter);
};
@@ -213,13 +253,94 @@ DataStoreShardedHashTest::write_work(uint32_t cnt)
}
void
-DataStoreShardedHashTest::populate_sample_data()
+DataStoreShardedHashTest::populate_sample_data(uint32_t cnt)
{
- for (uint32_t i = 0; i < 50; ++i) {
+ for (uint32_t i = 0; i < cnt; ++i) {
insert(i);
}
}
+void
+DataStoreShardedHashTest::populate_sample_values(uint32_t cnt)
+{
+ for (uint32_t i = 0; i < cnt; ++i) {
+ MyCompare comp(_store, i);
+ auto result = _hash_map.find(comp, EntryRef());
+ ASSERT_NE(result, nullptr);
+ EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value());
+ result->second.store_relaxed(RefT(i + 200, select_buffer(i)));
+ }
+}
+
+void
+DataStoreShardedHashTest::clear_sample_values(uint32_t cnt)
+{
+ for (uint32_t i = 0; i < cnt; ++i) {
+ MyCompare comp(_store, i);
+ auto result = _hash_map.find(comp, EntryRef());
+ ASSERT_NE(result, nullptr);
+ EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value());
+ result->second.store_relaxed(EntryRef());
+ }
+}
+
+namespace {
+
+template <typename RefT>
+EntryRefFilter
+make_entry_ref_filter(bool one_filter)
+{
+ if (one_filter) {
+ EntryRefFilter filter(RefT::numBuffers(), RefT::offset_bits);
+ filter.add_buffer(3);
+ return filter;
+ }
+ return EntryRefFilter::create_all_filter(RefT::numBuffers(), RefT::offset_bits);
+}
+
+}
+
+void
+DataStoreShardedHashTest::test_normalize_values(bool use_filter, bool one_filter)
+{
+ populate_sample_data(large_population);
+ populate_sample_values(large_population);
+ if (use_filter) {
+ auto filter = make_entry_ref_filter<RefT>(one_filter);
+ EXPECT_TRUE(_hash_map.normalize_values([](std::vector<EntryRef> &refs) noexcept { for (auto &ref : refs) { RefT iref(ref); ref = RefT(iref.offset() + 300, iref.bufferId()); } }, filter));
+ } else {
+ EXPECT_TRUE(_hash_map.normalize_values([](EntryRef ref) noexcept { RefT iref(ref); return RefT(iref.offset() + 300, iref.bufferId()); }));
+ }
+ for (uint32_t i = 0; i < large_population; ++i) {
+ MyCompare comp(_store, i);
+ auto result = _hash_map.find(comp, EntryRef());
+ ASSERT_NE(result, nullptr);
+ EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value());
+ ASSERT_EQ(select_buffer(i), RefT(result->second.load_relaxed()).bufferId());
+ if (use_filter && one_filter && select_buffer(i) != 3) {
+ ASSERT_EQ(i + 200, RefT(result->second.load_relaxed()).offset());
+ } else {
+ ASSERT_EQ(i + 500, RefT(result->second.load_relaxed()).offset());
+ }
+ result->second.store_relaxed(EntryRef());
+ }
+}
+
+void
+DataStoreShardedHashTest::test_foreach_value(bool one_filter)
+{
+ populate_sample_data(large_population);
+ populate_sample_values(large_population);
+
+ auto filter = make_entry_ref_filter<RefT>(one_filter);
+ std::vector<EntryRef> exp_refs;
+ EXPECT_FALSE(_hash_map.normalize_values([&exp_refs](std::vector<EntryRef>& refs) { exp_refs.insert(exp_refs.end(), refs.begin(), refs.end()); }, filter));
+ std::vector<EntryRef> act_refs;
+ _hash_map.foreach_value([&act_refs](const std::vector<EntryRef> &refs) { act_refs.insert(act_refs.end(), refs.begin(), refs.end()); }, filter);
+ EXPECT_EQ(exp_refs, act_refs);
+ clear_sample_values(large_population);
+}
+
TEST_F(DataStoreShardedHashTest, single_threaded_reader_without_updates)
{
_report_work = true;
@@ -254,7 +375,7 @@ TEST_F(DataStoreShardedHashTest, memory_usage_is_reported)
EXPECT_EQ(0, initial_usage.deadBytes());
EXPECT_EQ(0, initial_usage.allocatedBytesOnHold());
auto guard = _generationHandler.takeGuard();
- for (uint32_t i = 0; i < 50; ++i) {
+ for (uint32_t i = 0; i < small_population; ++i) {
insert(i);
}
auto usage = _hash_map.get_memory_usage();
@@ -264,30 +385,31 @@ TEST_F(DataStoreShardedHashTest, memory_usage_is_reported)
TEST_F(DataStoreShardedHashTest, foreach_key_works)
{
- populate_sample_data();
+ populate_sample_data(small_population);
std::vector<uint32_t> keys;
_hash_map.foreach_key([this, &keys](EntryRef ref) { keys.emplace_back(_allocator.get_wrapped(ref).value()); });
std::sort(keys.begin(), keys.end());
- EXPECT_EQ(50, keys.size());
- for (uint32_t i = 0; i < 50; ++i) {
+ EXPECT_EQ(small_population, keys.size());
+ for (uint32_t i = 0; i < small_population; ++i) {
EXPECT_EQ(i, keys[i]);
}
}
TEST_F(DataStoreShardedHashTest, move_keys_works)
{
- populate_sample_data();
+ populate_sample_data(small_population);
std::vector<EntryRef> refs;
_hash_map.foreach_key([&refs](EntryRef ref) { refs.emplace_back(ref); });
std::vector<EntryRef> new_refs;
MyCompactable my_compactable(_allocator, new_refs);
- _hash_map.move_keys(my_compactable, std::vector<bool>(RefT::numBuffers(), true), RefT::offset_bits);
+ auto filter = make_entry_ref_filter<RefT>(false);
+ _hash_map.move_keys(my_compactable, filter);
std::vector<EntryRef> verify_new_refs;
_hash_map.foreach_key([&verify_new_refs](EntryRef ref) { verify_new_refs.emplace_back(ref); });
- EXPECT_EQ(50u, refs.size());
+ EXPECT_EQ(small_population, refs.size());
EXPECT_NE(refs, new_refs);
EXPECT_EQ(new_refs, verify_new_refs);
- for (uint32_t i = 0; i < 50; ++i) {
+ for (uint32_t i = 0; i < small_population; ++i) {
EXPECT_NE(refs[i], new_refs[i]);
auto value = _allocator.get_wrapped(refs[i]).value();
auto new_value = _allocator.get_wrapped(refs[i]).value();
@@ -297,29 +419,33 @@ TEST_F(DataStoreShardedHashTest, move_keys_works)
TEST_F(DataStoreShardedHashTest, normalize_values_works)
{
- populate_sample_data();
- for (uint32_t i = 0; i < 50; ++i) {
- MyCompare comp(_store, i);
- auto result = _hash_map.find(comp, EntryRef());
- ASSERT_NE(result, nullptr);
- EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value());
- result->second.store_relaxed(EntryRef(i + 200));
- }
- _hash_map.normalize_values([](EntryRef ref) noexcept { return EntryRef(ref.ref() + 300); });
- for (uint32_t i = 0; i < 50; ++i) {
- MyCompare comp(_store, i);
- auto result = _hash_map.find(comp, EntryRef());
- ASSERT_NE(result, nullptr);
- EXPECT_EQ(i, _allocator.get_wrapped(result->first.load_relaxed()).value());
- ASSERT_EQ(i + 500, result->second.load_relaxed().ref());
- result->second.store_relaxed(EntryRef());
- }
+ test_normalize_values(false, false);
+}
+
+TEST_F(DataStoreShardedHashTest, normalize_values_all_filter_works)
+{
+ test_normalize_values(true, false);
+}
+
+TEST_F(DataStoreShardedHashTest, normalize_values_one_filter_works)
+{
+ test_normalize_values(true, true);
+}
+
+TEST_F(DataStoreShardedHashTest, foreach_value_all_filter_works)
+{
+ test_foreach_value(false);
+}
+
+TEST_F(DataStoreShardedHashTest, foreach_value_one_filter_works)
+{
+ test_foreach_value(true);
}
TEST_F(DataStoreShardedHashTest, compact_worst_shard_works)
{
- populate_sample_data();
- for (uint32_t i = 10; i < 50; ++i) {
+ populate_sample_data(small_population);
+ for (uint32_t i = 10; i < small_population; ++i) {
remove(i);
}
commit();
diff --git a/vespalib/src/vespa/vespalib/btree/btreeiterator.h b/vespalib/src/vespa/vespalib/btree/btreeiterator.h
index 325ce0e0e47..30123b1946e 100644
--- a/vespalib/src/vespa/vespalib/btree/btreeiterator.h
+++ b/vespalib/src/vespa/vespalib/btree/btreeiterator.h
@@ -113,6 +113,9 @@ public:
return _node->getData(_idx);
}
+ // Only use during compaction when changing reference to moved value
+ DataType &getWData() { return getWNode()->getWData(_idx); }
+
bool
valid() const
{
@@ -881,6 +884,9 @@ public:
_leaf.getWNode()->writeData(_leaf.getIdx(), data);
}
+ // Only use during compaction when changing reference to moved value
+ DataType &getWData() { return _leaf.getWData(); }
+
/**
* Set a new key for the current iterator position.
* The new key must have the same semantic meaning as the old key.
diff --git a/vespalib/src/vespa/vespalib/btree/btreenode.h b/vespalib/src/vespa/vespalib/btree/btreenode.h
index d8752d77f0b..468f17fcd1a 100644
--- a/vespalib/src/vespa/vespalib/btree/btreenode.h
+++ b/vespalib/src/vespa/vespalib/btree/btreenode.h
@@ -99,6 +99,8 @@ public:
}
const DataT &getData(uint32_t idx) const { return _data[idx]; }
+ // Only use during compaction when changing reference to moved value
+ DataT &getWData(uint32_t idx) { return _data[idx]; }
void setData(uint32_t idx, const DataT &data) { _data[idx] = data; }
static bool hasData() { return true; }
};
@@ -120,6 +122,9 @@ public:
return BTreeNoLeafData::_instance;
}
+ // Only use during compaction when changing reference to moved value
+ BTreeNoLeafData &getWData(uint32_t) const { return BTreeNoLeafData::_instance; }
+
void setData(uint32_t idx, const BTreeNoLeafData &data) {
(void) idx;
(void) data;
diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.h b/vespalib/src/vespa/vespalib/btree/btreestore.h
index 82913987e44..b4238757e46 100644
--- a/vespalib/src/vespa/vespalib/btree/btreestore.h
+++ b/vespalib/src/vespa/vespalib/btree/btreestore.h
@@ -298,6 +298,9 @@ public:
bool
isSmallArray(const EntryRef ref) const;
+ static bool isBTree(uint32_t typeId) { return typeId == BUFFERTYPE_BTREE; }
+ bool isBTree(RefType ref) const { return isBTree(getTypeId(ref)); }
+
/**
* Returns the cluster size for the type id.
* Cluster size == 0 means we have a tree for the given reference.
@@ -391,10 +394,10 @@ public:
std::vector<uint32_t> start_compact_worst_btree_nodes();
void finish_compact_worst_btree_nodes(const std::vector<uint32_t>& to_hold);
- void move_btree_nodes(EntryRef ref);
+ void move_btree_nodes(const std::vector<EntryRef>& refs);
std::vector<uint32_t> start_compact_worst_buffers();
- EntryRef move(EntryRef ref);
+ void move(std::vector<EntryRef>& refs);
private:
static constexpr size_t MIN_BUFFER_ARRAYS = 128u;
diff --git a/vespalib/src/vespa/vespalib/btree/btreestore.hpp b/vespalib/src/vespa/vespalib/btree/btreestore.hpp
index 15c546a0368..795e526f927 100644
--- a/vespalib/src/vespa/vespalib/btree/btreestore.hpp
+++ b/vespalib/src/vespa/vespalib/btree/btreestore.hpp
@@ -991,15 +991,15 @@ template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
typename TraitsT, typename AggrCalcT>
void
BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
-move_btree_nodes(EntryRef ref)
+move_btree_nodes(const std::vector<EntryRef>& refs)
{
- if (ref.valid()) {
+ for (auto& ref : refs) {
RefType iRef(ref);
- uint32_t clusterSize = getClusterSize(iRef);
- if (clusterSize == 0) {
- BTreeType *tree = getWTreeEntry(iRef);
- tree->move_nodes(_allocator);
- }
+ assert(iRef.valid());
+ uint32_t typeId = getTypeId(iRef);
+ assert(isBTree(typeId));
+ BTreeType *tree = getWTreeEntry(iRef);
+ tree->move_nodes(_allocator);
}
}
@@ -1015,23 +1015,25 @@ start_compact_worst_buffers()
template <typename KeyT, typename DataT, typename AggrT, typename CompareT,
typename TraitsT, typename AggrCalcT>
-typename BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::EntryRef
+void
BTreeStore<KeyT, DataT, AggrT, CompareT, TraitsT, AggrCalcT>::
-move(EntryRef ref)
+move(std::vector<EntryRef> &refs)
{
- if (!ref.valid() || !_store.getCompacting(ref)) {
- return ref;
- }
- RefType iRef(ref);
- uint32_t clusterSize = getClusterSize(iRef);
- if (clusterSize == 0) {
- BTreeType *tree = getWTreeEntry(iRef);
- auto ref_and_ptr = allocBTreeCopy(*tree);
- tree->prepare_hold();
- return ref_and_ptr.ref;
+ for (auto& ref : refs) {
+ RefType iRef(ref);
+ assert(iRef.valid());
+ assert(_store.getCompacting(iRef));
+ uint32_t clusterSize = getClusterSize(iRef);
+ if (clusterSize == 0) {
+ BTreeType *tree = getWTreeEntry(iRef);
+ auto ref_and_ptr = allocBTreeCopy(*tree);
+ tree->prepare_hold();
+ ref = ref_and_ptr.ref;
+ } else {
+ const KeyDataType *shortArray = getKeyDataEntry(iRef, clusterSize);
+ ref = allocKeyDataCopy(shortArray, clusterSize).ref;
+ }
}
- const KeyDataType *shortArray = getKeyDataEntry(iRef, clusterSize);
- return allocKeyDataCopy(shortArray, clusterSize).ref;
}
}
diff --git a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
index 6c6f5258555..9b796c62232 100644
--- a/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/datastore/CMakeLists.txt
@@ -8,6 +8,7 @@ vespa_add_library(vespalib_vespalib_datastore OBJECT
datastore.cpp
datastorebase.cpp
entryref.cpp
+ entry_ref_filter.cpp
fixed_size_hash_map.cpp
sharded_hash_map.cpp
unique_store.cpp
diff --git a/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.cpp b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.cpp
new file mode 100644
index 00000000000..87c3c87636c
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.cpp
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "entry_ref_filter.h"
+
+namespace vespalib::datastore {
+
+EntryRefFilter::EntryRefFilter(std::vector<bool> filter, uint32_t offset_bits)
+ : _filter(std::move(filter)),
+ _offset_bits(offset_bits)
+{
+}
+
+EntryRefFilter::EntryRefFilter(uint32_t num_buffers, uint32_t offset_bits)
+ : _filter(num_buffers),
+ _offset_bits(offset_bits)
+{
+}
+
+EntryRefFilter::~EntryRefFilter() = default;
+
+EntryRefFilter
+EntryRefFilter::create_all_filter(uint32_t num_buffers, uint32_t offset_bits)
+{
+ std::vector<bool> filter(num_buffers, true);
+ return EntryRefFilter(std::move(filter), offset_bits);
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.h b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.h
new file mode 100644
index 00000000000..c06d843fbd0
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/datastore/entry_ref_filter.h
@@ -0,0 +1,35 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "entryref.h"
+#include <vector>
+
+namespace vespalib::datastore {
+
+/*
+ * Class to filter entry refs based on which buffer the entry is referencing.
+ *
+ * Buffers being allowed have corresponding bit in _filter set.
+ */
+class EntryRefFilter {
+ std::vector<bool> _filter;
+ uint32_t _offset_bits;
+ EntryRefFilter(std::vector<bool> filter, uint32_t offset_bits);
+public:
+ EntryRefFilter(uint32_t num_buffers, uint32_t offset_bits);
+ ~EntryRefFilter();
+ bool has(EntryRef ref) const {
+ uint32_t buffer_id = ref.buffer_id(_offset_bits);
+ return _filter[buffer_id];
+ }
+ void add_buffer(uint32_t buffer_id) { _filter[buffer_id] = true; }
+ void add_buffers(const std::vector<uint32_t>& ids) {
+ for (auto buffer_id : ids) {
+ _filter[buffer_id] = true;
+ }
+ }
+ static EntryRefFilter create_all_filter(uint32_t num_buffers, uint32_t offset_bits);
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp
index db9fee8ea70..6f001ce3c94 100644
--- a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.cpp
@@ -2,6 +2,7 @@
#include "fixed_size_hash_map.h"
#include "entry_comparator.h"
+#include "entry_ref_filter.h"
#include "i_compactable.h"
#include <vespa/vespalib/util/array.hpp>
#include <vespa/vespalib/util/memoryusage.h>
@@ -182,7 +183,7 @@ FixedSizeHashMap::foreach_key(const std::function<void(EntryRef)>& callback) con
}
void
-FixedSizeHashMap::move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits)
+FixedSizeHashMap::move_keys(ICompactable& compactable, const EntryRefFilter &compacting_buffers)
{
for (auto& chain_head : _chain_heads) {
uint32_t node_idx = chain_head.load_relaxed();
@@ -190,8 +191,7 @@ FixedSizeHashMap::move_keys(ICompactable& compactable, const std::vector<bool>&
auto& node = _nodes[node_idx];
EntryRef old_ref = node.get_kv().first.load_relaxed();
assert(old_ref.valid());
- uint32_t buffer_id = old_ref.buffer_id(entry_ref_offset_bits);
- if (compacting_buffers[buffer_id]) {
+ if (compacting_buffers.has(old_ref)) {
EntryRef new_ref = compactable.move(old_ref);
node.get_kv().first.store_release(new_ref);
}
@@ -220,4 +220,104 @@ FixedSizeHashMap::normalize_values(const std::function<EntryRef(EntryRef)>& norm
return changed;
}
+namespace {
+
+class ChangeWriter {
+ std::vector<AtomicEntryRef*> _atomic_refs;
+public:
+ ChangeWriter(uint32_t capacity);
+ ~ChangeWriter();
+ bool write(const std::vector<EntryRef> &refs);
+ void emplace_back(AtomicEntryRef &atomic_ref) { _atomic_refs.emplace_back(&atomic_ref); }
+};
+
+ChangeWriter::ChangeWriter(uint32_t capacity)
+ : _atomic_refs()
+{
+ _atomic_refs.reserve(capacity);
+}
+
+ChangeWriter::~ChangeWriter() = default;
+
+bool
+ChangeWriter::write(const std::vector<EntryRef> &refs)
+{
+ bool changed = false;
+ assert(refs.size() == _atomic_refs.size());
+ auto atomic_ref = _atomic_refs.begin();
+ for (auto ref : refs) {
+ EntryRef old_ref = (*atomic_ref)->load_relaxed();
+ if (ref != old_ref) {
+ (*atomic_ref)->store_release(ref);
+ changed = true;
+ }
+ ++atomic_ref;
+ }
+ assert(atomic_ref == _atomic_refs.end());
+ _atomic_refs.clear();
+ return changed;
+}
+
+}
+
+bool
+FixedSizeHashMap::normalize_values(const std::function<void(std::vector<EntryRef>&)>& normalize, const EntryRefFilter& filter)
+{
+ std::vector<EntryRef> refs;
+ refs.reserve(1024);
+ bool changed = false;
+ ChangeWriter change_writer(refs.capacity());
+ for (auto& chain_head : _chain_heads) {
+ uint32_t node_idx = chain_head.load_relaxed();
+ while (node_idx != no_node_idx) {
+ auto& node = _nodes[node_idx];
+ EntryRef ref = node.get_kv().second.load_relaxed();
+ if (ref.valid()) {
+ if (filter.has(ref)) {
+ refs.emplace_back(ref);
+ change_writer.emplace_back(node.get_kv().second);
+ if (refs.size() >= refs.capacity()) {
+ normalize(refs);
+ changed |= change_writer.write(refs);
+ refs.clear();
+ }
+ }
+ }
+ node_idx = node.get_next_node_idx().load(std::memory_order_relaxed);
+ }
+ }
+ if (!refs.empty()) {
+ normalize(refs);
+ changed |= change_writer.write(refs);
+ }
+ return changed;
+}
+
+void
+FixedSizeHashMap::foreach_value(const std::function<void(const std::vector<EntryRef>&)>& callback, const EntryRefFilter& filter)
+{
+ std::vector<EntryRef> refs;
+ refs.reserve(1024);
+ for (auto& chain_head : _chain_heads) {
+ uint32_t node_idx = chain_head.load_relaxed();
+ while (node_idx != no_node_idx) {
+ auto& node = _nodes[node_idx];
+ EntryRef ref = node.get_kv().second.load_relaxed();
+ if (ref.valid()) {
+ if (filter.has(ref)) {
+ refs.emplace_back(ref);
+ if (refs.size() >= refs.capacity()) {
+ callback(refs);
+ refs.clear();
+ }
+ }
+ }
+ node_idx = node.get_next_node_idx().load(std::memory_order_relaxed);
+ }
+ }
+ if (!refs.empty()) {
+ callback(refs);
+ }
+}
+
}
diff --git a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h
index 035cd84dbee..c522bcc3c33 100644
--- a/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h
+++ b/vespalib/src/vespa/vespalib/datastore/fixed_size_hash_map.h
@@ -18,6 +18,7 @@ class MemoryUsage;
}
namespace vespalib::datastore {
+class EntryRefFilter;
struct ICompactable;
class ShardedHashComparator {
@@ -158,8 +159,26 @@ public:
size_t size() const noexcept { return _count; }
MemoryUsage get_memory_usage() const;
void foreach_key(const std::function<void(EntryRef)>& callback) const;
- void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits);
+ void move_keys(ICompactable& compactable, const EntryRefFilter &compacting_buffers);
+ /*
+ * Scan dictionary and call normalize function for each value. If
+ * returned value is different then write back the modified value to
+ * the dictionary. Used when clearing all posting lists.
+ */
bool normalize_values(const std::function<EntryRef(EntryRef)>& normalize);
+ /*
+ * Scan dictionary and call normalize function for batches of values
+ * that pass the filter. Write back modified values to the dictionary.
+ * Used by compaction of posting lists when moving short arrays,
+ * bitvectors or btree roots.
+ */
+ bool normalize_values(const std::function<void(std::vector<EntryRef>&)>& normalize, const EntryRefFilter& filter);
+ /*
+ * Scan dictionary and call callback function for batches of values
+ * that pass the filter. Used by compaction of posting lists when
+ * moving btree nodes.
+ */
+ void foreach_value(const std::function<void(const std::vector<EntryRef>&)>& callback, const EntryRefFilter& filter);
};
}
diff --git a/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h b/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h
index 886ec095dcd..cf848167070 100644
--- a/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h
+++ b/vespalib/src/vespa/vespalib/datastore/i_unique_store_dictionary.h
@@ -11,6 +11,7 @@
namespace vespalib::datastore {
class EntryComparator;
+class EntryRefFilter;
struct ICompactable;
class IUniqueStoreDictionaryReadSnapshot;
class UniqueStoreAddResult;
@@ -28,7 +29,7 @@ public:
virtual UniqueStoreAddResult add(const EntryComparator& comp, std::function<EntryRef(void)> insertEntry) = 0;
virtual EntryRef find(const EntryComparator& comp) = 0;
virtual void remove(const EntryComparator& comp, EntryRef ref) = 0;
- virtual void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) = 0;
+ virtual void move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers) = 0;
virtual uint32_t get_num_uniques() const = 0;
virtual vespalib::MemoryUsage get_memory_usage() const = 0;
virtual void build(vespalib::ConstArrayRef<EntryRef>, vespalib::ConstArrayRef<uint32_t> ref_counts, std::function<void(EntryRef)> hold) = 0;
diff --git a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp
index da4db92a309..019b98a53dd 100644
--- a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.cpp
@@ -171,12 +171,12 @@ ShardedHashMap::foreach_key(std::function<void(EntryRef)> callback) const
}
void
-ShardedHashMap::move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits)
+ShardedHashMap::move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers)
{
for (size_t i = 0; i < num_shards; ++i) {
auto map = _maps[i].load(std::memory_order_relaxed);
if (map != nullptr) {
- map->move_keys(compactable, compacting_buffers, entry_ref_offset_bits);
+ map->move_keys(compactable, compacting_buffers);
}
}
}
@@ -195,6 +195,31 @@ ShardedHashMap::normalize_values(std::function<EntryRef(EntryRef)> normalize)
}
bool
+ShardedHashMap::normalize_values(std::function<void(std::vector<EntryRef>&)> normalize, const EntryRefFilter& filter)
+{
+ bool changed = false;
+ for (size_t i = 0; i < num_shards; ++i) {
+ auto map = _maps[i].load(std::memory_order_relaxed);
+ if (map != nullptr) {
+ changed |= map->normalize_values(normalize, filter);
+ }
+ }
+ return changed;
+}
+
+void
+ShardedHashMap::foreach_value(std::function<void(const std::vector<EntryRef>&)> callback, const EntryRefFilter& filter)
+{
+ for (size_t i = 0; i < num_shards; ++i) {
+ auto map = _maps[i].load(std::memory_order_relaxed);
+ if (map != nullptr) {
+ map->foreach_value(callback, filter);
+ }
+ }
+}
+
+
+bool
ShardedHashMap::has_held_buffers() const
{
return _gen_holder.getHeldBytes() != 0;
diff --git a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h
index df07f7a1990..e0ba9488351 100644
--- a/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h
+++ b/vespalib/src/vespa/vespalib/datastore/sharded_hash_map.h
@@ -11,6 +11,7 @@ namespace vespalib { class MemoryUsage; }
namespace vespalib::datastore {
class EntryComparator;
+class EntryRefFilter;
class FixedSizeHashMap;
struct ICompactable;
@@ -57,8 +58,10 @@ public:
const EntryComparator &get_default_comparator() const noexcept { return *_comp; }
MemoryUsage get_memory_usage() const;
void foreach_key(std::function<void(EntryRef)> callback) const;
- void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits);
+ void move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers);
bool normalize_values(std::function<EntryRef(EntryRef)> normalize);
+ bool normalize_values(std::function<void(std::vector<EntryRef>&)> normalize, const EntryRefFilter& filter);
+ void foreach_value(std::function<void(const std::vector<EntryRef>&)> callback, const EntryRefFilter& filter);
bool has_held_buffers() const;
void compact_worst_shard();
};
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
index d375dbae149..b02a2e52185 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store.hpp
@@ -102,11 +102,9 @@ private:
std::vector<uint32_t> _bufferIdsToCompact;
void allocMapping() {
- _compacting_buffer.resize(RefT::numBuffers());
_mapping.resize(RefT::numBuffers());
for (const auto bufferId : _bufferIdsToCompact) {
BufferState &state = _dataStore.getBufferState(bufferId);
- _compacting_buffer[bufferId] = true;
_mapping[bufferId].resize(state.get_used_arrays());
}
}
@@ -124,7 +122,7 @@ private:
}
void fillMapping() {
- _dict.move_keys(*this, _compacting_buffer, RefT::offset_bits);
+ _dict.move_keys(*this, _compacting_buffer);
}
public:
@@ -140,6 +138,7 @@ public:
_bufferIdsToCompact(std::move(bufferIdsToCompact))
{
if (!_bufferIdsToCompact.empty()) {
+ _compacting_buffer.add_buffers(_bufferIdsToCompact);
allocMapping();
fillMapping();
}
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h
index 3b0169b5a34..54d541853c7 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.h
@@ -79,7 +79,7 @@ public:
UniqueStoreAddResult add(const EntryComparator& comp, std::function<EntryRef(void)> insertEntry) override;
EntryRef find(const EntryComparator& comp) override;
void remove(const EntryComparator& comp, EntryRef ref) override;
- void move_keys(ICompactable& compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits) override;
+ void move_keys(ICompactable& compactable, const EntryRefFilter& compacting_buffers) override;
uint32_t get_num_uniques() const override;
vespalib::MemoryUsage get_memory_usage() const override;
void build(vespalib::ConstArrayRef<EntryRef>, vespalib::ConstArrayRef<uint32_t> ref_counts, std::function<void(EntryRef)> hold) override;
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp
index e88376be9fb..13ae0a317e0 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_dictionary.hpp
@@ -4,6 +4,7 @@
#include "datastore.hpp"
#include "entry_comparator_wrapper.h"
+#include "entry_ref_filter.h"
#include "i_compactable.h"
#include "unique_store_add_result.h"
#include "unique_store_dictionary.h"
@@ -139,15 +140,14 @@ UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::remove(const
template <typename BTreeDictionaryT, typename ParentT, typename HashDictionaryT>
void
-UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::move_keys(ICompactable &compactable, const std::vector<bool>& compacting_buffers, uint32_t entry_ref_offset_bits)
+UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::move_keys(ICompactable &compactable, const EntryRefFilter& compacting_buffers)
{
if constexpr (has_btree_dictionary) {
auto itr = this->_btree_dict.begin();
while (itr.valid()) {
EntryRef oldRef(itr.getKey());
assert(oldRef.valid());
- uint32_t buffer_id = oldRef.buffer_id(entry_ref_offset_bits);
- if (compacting_buffers[buffer_id]) {
+ if (compacting_buffers.has(oldRef)) {
EntryRef newRef(compactable.move(oldRef));
this->_btree_dict.thaw(itr);
itr.writeKey(newRef);
@@ -160,7 +160,7 @@ UniqueStoreDictionary<BTreeDictionaryT, ParentT, HashDictionaryT>::move_keys(ICo
++itr;
}
} else {
- this->_hash_dict.move_keys(compactable, compacting_buffers, entry_ref_offset_bits);
+ this->_hash_dict.move_keys(compactable, compacting_buffers);
}
}
diff --git a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h
index 4a8d72c8685..873af07a902 100644
--- a/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h
+++ b/vespalib/src/vespa/vespalib/datastore/unique_store_remapper.h
@@ -3,6 +3,7 @@
#pragma once
#include "entryref.h"
+#include "entry_ref_filter.h"
#include <vector>
#include <vespa/vespalib/stllike/allocator.h>
@@ -18,11 +19,11 @@ public:
using RefType = RefT;
protected:
- std::vector<bool> _compacting_buffer;
+ EntryRefFilter _compacting_buffer;
std::vector<std::vector<EntryRef, allocator_large<EntryRef>>> _mapping;
public:
UniqueStoreRemapper()
- : _compacting_buffer(),
+ : _compacting_buffer(RefT::numBuffers(), RefT::offset_bits),
_mapping()
{
}
@@ -30,11 +31,11 @@ public:
EntryRef remap(EntryRef ref) const {
if (ref.valid()) {
- RefType internal_ref(ref);
- if (!_compacting_buffer[internal_ref.bufferId()]) {
+ if (!_compacting_buffer.has(ref)) {
// No remapping for references to buffers not being compacted
return ref;
} else {
+ RefType internal_ref(ref);
auto &inner_mapping = _mapping[internal_ref.bufferId()];
assert(internal_ref.unscaled_offset() < inner_mapping.size());
EntryRef mapped_ref = inner_mapping[internal_ref.unscaled_offset()];