diff options
23 files changed, 2274 insertions, 695 deletions
diff --git a/storage/src/tests/bucketdb/initializertest.cpp b/storage/src/tests/bucketdb/initializertest.cpp index 63f990f7cc1..7d3d2c185da 100644 --- a/storage/src/tests/bucketdb/initializertest.cpp +++ b/storage/src/tests/bucketdb/initializertest.cpp @@ -139,11 +139,11 @@ typedef std::map<document::BucketId, BucketData> DiskData; struct BucketInfoLogger { std::map<PartitionId, DiskData>& map; - BucketInfoLogger(std::map<PartitionId, DiskData>& m) + explicit BucketInfoLogger(std::map<PartitionId, DiskData>& m) : map(m) {} StorBucketDatabase::Decision operator()( - uint64_t revBucket, StorBucketDatabase::Entry& entry) + uint64_t revBucket, const StorBucketDatabase::Entry& entry) { document::BucketId bucket( document::BucketId::keyToBucketId(revBucket)); @@ -152,14 +152,14 @@ struct BucketInfoLogger { DiskData& ddata(map[entry.disk]); BucketData& bdata(ddata[bucket]); bdata.info = entry.getBucketInfo(); - return StorBucketDatabase::CONTINUE; + return StorBucketDatabase::Decision::CONTINUE; } }; std::map<PartitionId, DiskData> createMapFromBucketDatabase(StorBucketDatabase& db) { std::map<PartitionId, DiskData> result; BucketInfoLogger infoLogger(result); - db.all(infoLogger, "createmap"); + db.for_each(std::ref(infoLogger), "createmap"); return result; } // Create data we want to have in this test @@ -551,8 +551,8 @@ struct DatabaseInsertCallback : MessageCallback BucketData d; StorBucketDatabase::WrappedEntry entry( _database.get(bid, "DatabaseInsertCallback::onMessage", - StorBucketDatabase::LOCK_IF_NONEXISTING_AND_NOT_CREATING)); - if (entry.exist()) { + StorBucketDatabase::CREATE_IF_NONEXISTING)); + if (entry.preExisted()) { _errors << "db entry for " << bid << " already existed"; } if (i < 5) { diff --git a/storage/src/tests/bucketdb/lockablemaptest.cpp b/storage/src/tests/bucketdb/lockablemaptest.cpp index 101b9d014fa..50cde580f55 100644 --- a/storage/src/tests/bucketdb/lockablemaptest.cpp +++ b/storage/src/tests/bucketdb/lockablemaptest.cpp @@ -4,8 +4,9 @@ #include <vespa/storage/bucketdb/judymultimap.h> #include <vespa/storage/bucketdb/judymultimap.hpp> #include <vespa/storage/bucketdb/lockablemap.hpp> +#include <vespa/storage/bucketdb/btree_lockable_map.hpp> #include <vespa/vespalib/gtest/gtest.h> -#include <boost/operators.hpp> +#include <gmock/gmock.h> #include <vespa/log/log.h> LOG_SETUP(".lockable_map_test"); @@ -13,41 +14,62 @@ LOG_SETUP(".lockable_map_test"); // FIXME these old tests may have the least obvious semantics and worst naming in the entire storage module using namespace ::testing; +using document::BucketId; namespace storage { namespace { - struct A : public boost::operators<A> { - int _val1; - int _val2; - int _val3; - A() : _val1(0), _val2(0), _val3(0) {} - A(int val1, int val2, int val3) - : _val1(val1), _val2(val2), _val3(val3) {} +struct A { + int _val1; + int _val2; + int _val3; - static bool mayContain(const A&) { return true; } + A() : _val1(0), _val2(0), _val3(0) {} + A(int val1, int val2, int val3) + : _val1(val1), _val2(val2), _val3(val3) {} - bool operator==(const A& a) const { - return (_val1 == a._val1 && _val2 == a._val2 && _val3 == a._val3); - } - bool operator<(const A& a) const { - if (_val1 != a._val1) return (_val1 < a._val1); - if (_val2 != a._val2) return (_val2 < a._val2); - return (_val3 < a._val3); - } - }; + static bool mayContain(const A&) { return true; } + // Make this type smell more like a proper bucket DB value type. + constexpr bool verifyLegal() const noexcept { return true; } + constexpr bool valid() const noexcept { return true; } - std::ostream& operator<<(std::ostream& out, const A& a) { - return out << "A(" << a._val1 << ", " << a._val2 << ", " << a._val3 << ")"; + bool operator==(const A& a) const noexcept { + return (_val1 == a._val1 && _val2 == a._val2 && _val3 == a._val3); + } + bool operator!=(const A& a) const noexcept { + return !(*this == a); } + bool operator<(const A& a) const noexcept { + if (_val1 != a._val1) return (_val1 < a._val1); + if (_val2 != a._val2) return (_val2 < a._val2); + return (_val3 < a._val3); + } +}; + +std::ostream& operator<<(std::ostream& out, const A& a) { + return out << "A(" << a._val1 << ", " << a._val2 << ", " << a._val3 << ")"; +} - typedef LockableMap<JudyMultiMap<A> > Map; } -TEST(LockableMapTest, simple_usage) { +template <typename MapT> +struct LockableMapTest : ::testing::Test { + using Map = MapT; +}; + +using MapTypes = ::testing::Types<LockableMap<JudyMultiMap<A>>, bucketdb::BTreeLockableMap<A>>; +VESPA_GTEST_TYPED_TEST_SUITE(LockableMapTest, MapTypes); + +// Disable warnings emitted by gtest generated files when using typed tests +#pragma GCC diagnostic push +#ifndef __clang__ +#pragma GCC diagnostic ignored "-Wsuggest-override" +#endif + +TYPED_TEST(LockableMapTest, simple_usage) { // Tests insert, erase, size, empty, operator[] - Map map; + TypeParam map; // Do some insertions EXPECT_TRUE(map.empty()); bool preExisted; @@ -57,11 +79,11 @@ TEST(LockableMapTest, simple_usage) { EXPECT_EQ(false, preExisted); map.insert(14, A(42, 0, 0), "foo", preExisted); EXPECT_EQ(false, preExisted); - EXPECT_EQ((Map::size_type) 3, map.size()) << map.toString(); + EXPECT_THAT(map, SizeIs(3)); map.insert(11, A(4, 7, 0), "foo", preExisted); EXPECT_EQ(true, preExisted); - EXPECT_EQ((Map::size_type) 3, map.size()); + EXPECT_THAT(map, SizeIs(3)); EXPECT_FALSE(map.empty()); // Access some elements @@ -71,20 +93,20 @@ TEST(LockableMapTest, simple_usage) { // Do removes EXPECT_EQ(map.erase(12, "foo"), 0); - EXPECT_EQ((Map::size_type) 3, map.size()); + EXPECT_THAT(map, SizeIs(3)); EXPECT_EQ(map.erase(14, "foo"), 1); - EXPECT_EQ((Map::size_type) 2, map.size()); + EXPECT_THAT(map, SizeIs(2)); EXPECT_EQ(map.erase(11, "foo"), 1); EXPECT_EQ(map.erase(16, "foo"), 1); - EXPECT_EQ((Map::size_type) 0, map.size()); + EXPECT_THAT(map, SizeIs(0)); EXPECT_TRUE(map.empty()); } -TEST(LockableMapTest, comparison) { - Map map1; - Map map2; +TYPED_TEST(LockableMapTest, comparison) { + TypeParam map1; + TypeParam map2; bool preExisted; // Check empty state is correct @@ -123,154 +145,194 @@ TEST(LockableMapTest, comparison) { } namespace { - struct NonConstProcessor { - Map::Decision operator()(int key, A& a) { - (void) key; - ++a._val2; - return Map::UPDATE; + +template <typename Map> +struct NonConstProcessor { + typename Map::Decision operator()(int key, A& a) { + (void) key; + ++a._val2; + return Map::UPDATE; + } +}; + +template <typename Map> +struct EntryProcessor { + mutable uint32_t count; + mutable std::vector<std::string> log; + mutable std::vector<typename Map::Decision> behaviour; + + EntryProcessor(); + explicit EntryProcessor(const std::vector<typename Map::Decision>& decisions); + ~EntryProcessor(); + + typename Map::Decision operator()(uint64_t key, A& a) const { + std::ostringstream ost; + ost << key << " - " << a; + log.push_back(ost.str()); + typename Map::Decision d = Map::CONTINUE; + if (behaviour.size() > count) { + d = behaviour[count++]; } - }; - struct EntryProcessor { - mutable uint32_t count; - mutable std::vector<std::string> log; - mutable std::vector<Map::Decision> behaviour; - - EntryProcessor(); - EntryProcessor(const std::vector<Map::Decision>& decisions); - ~EntryProcessor(); - - Map::Decision operator()(uint64_t key, A& a) const { - std::ostringstream ost; - ost << key << " - " << a; - log.push_back(ost.str()); - Map::Decision d = Map::CONTINUE; - if (behaviour.size() > count) { - d = behaviour[count++]; - } - if (d == Map::UPDATE) { - ++a._val3; - } - return d; + if (d == Map::UPDATE) { + ++a._val3; } + return d; + } - std::string toString() { - std::ostringstream ost; - for (uint32_t i=0; i<log.size(); ++i) { - ost << log[i] << "\n"; - } - return ost.str(); + std::string toString() { + std::ostringstream ost; + for (uint32_t i=0; i<log.size(); ++i) { + ost << log[i] << "\n"; } - }; -} + return ost.str(); + } +}; + +template <typename Map> +EntryProcessor<Map>::EntryProcessor() + : count(0), log(), behaviour() {} + +template <typename Map> +EntryProcessor<Map>::EntryProcessor(const std::vector<typename Map::Decision>& decisions) + : count(0), log(), behaviour(decisions) {} + +template <typename Map> +EntryProcessor<Map>::~EntryProcessor() = default; + +template <typename Map> +struct ConstProcessor { + mutable std::vector<std::string> log; + + typename Map::Decision operator()(uint64_t key, const A& a) const { + std::ostringstream ost; + ost << key << " - " << a; + log.push_back(ost.str()); + return Map::CONTINUE; + } -EntryProcessor::EntryProcessor() : count(0), log(), behaviour() {} -EntryProcessor::EntryProcessor(const std::vector<Map::Decision>& decisions) - : count(0), log(), behaviour(decisions) {} -EntryProcessor::~EntryProcessor() = default; + std::string toString() { + std::ostringstream ost; + for (const auto& entry : log) { + ost << entry << "\n"; + } + return ost.str(); + } +}; + +} -TEST(LockableMapTest, iterating) { - Map map; +TYPED_TEST(LockableMapTest, iterating) { + TypeParam map; bool preExisted; map.insert(16, A(1, 2, 3), "foo", preExisted); map.insert(11, A(4, 6, 0), "foo", preExisted); map.insert(14, A(42, 0, 0), "foo", preExisted); - // Test that we can use functor with non-const function + // Test that we can use functor with non-const function { - NonConstProcessor ncproc; - map.each(ncproc, "foo"); // Locking both for each element + NonConstProcessor<TypeParam> ncproc; + map.for_each_mutable(std::ref(ncproc), "foo"); // First round of mutating functor for `all` EXPECT_EQ(A(4, 7, 0), *map.get(11, "foo")); EXPECT_EQ(A(42,1, 0), *map.get(14, "foo")); EXPECT_EQ(A(1, 3, 3), *map.get(16, "foo")); - map.all(ncproc, "foo"); // And for all + map.for_each_mutable(std::ref(ncproc), "foo"); // Once more, with feeling. EXPECT_EQ(A(4, 8, 0), *map.get(11, "foo")); EXPECT_EQ(A(42,2, 0), *map.get(14, "foo")); EXPECT_EQ(A(1, 4, 3), *map.get(16, "foo")); } - // Test that we can use const functors directly.. - map.each(EntryProcessor(), "foo"); - // Test iterator bounds { - EntryProcessor proc; - map.each(proc, "foo", 11, 16); + ConstProcessor<TypeParam> cproc; + map.for_each(std::ref(cproc), "foo"); + std::string expected("11 - A(4, 8, 0)\n" + "14 - A(42, 2, 0)\n" + "16 - A(1, 4, 3)\n"); + EXPECT_EQ(expected, cproc.toString()); + } + // Test that we can use const functors directly.. + map.for_each(ConstProcessor<TypeParam>(), "foo"); + + // Test iterator bounds + { + EntryProcessor<TypeParam> proc; + map.for_each_mutable(std::ref(proc), "foo", 11, 16); std::string expected("11 - A(4, 8, 0)\n" "14 - A(42, 2, 0)\n" "16 - A(1, 4, 3)\n"); EXPECT_EQ(expected, proc.toString()); - EntryProcessor proc2; - map.each(proc2, "foo", 12, 15); + EntryProcessor<TypeParam> proc2; + map.for_each_mutable(std::ref(proc2), "foo", 12, 15); expected = "14 - A(42, 2, 0)\n"; EXPECT_EQ(expected, proc2.toString()); } // Test that we can abort iterating { - std::vector<Map::Decision> decisions; - decisions.push_back(Map::CONTINUE); - decisions.push_back(Map::ABORT); - EntryProcessor proc(decisions); - map.each(proc, "foo"); + std::vector<typename TypeParam::Decision> decisions; + decisions.push_back(TypeParam::CONTINUE); + decisions.push_back(TypeParam::ABORT); + EntryProcessor<TypeParam> proc(decisions); + map.for_each_mutable(std::ref(proc), "foo"); std::string expected("11 - A(4, 8, 0)\n" "14 - A(42, 2, 0)\n"); EXPECT_EQ(expected, proc.toString()); } - // Test that we can remove during iteration + // Test that we can remove during iteration { - std::vector<Map::Decision> decisions; - decisions.push_back(Map::CONTINUE); - decisions.push_back(Map::REMOVE); - EntryProcessor proc(decisions); - map.each(proc, "foo"); + std::vector<typename TypeParam::Decision> decisions; + decisions.push_back(TypeParam::CONTINUE); + decisions.push_back(TypeParam::REMOVE); // TODO consider removing; not used + EntryProcessor<TypeParam> proc(decisions); + map.for_each_mutable(std::ref(proc), "foo"); std::string expected("11 - A(4, 8, 0)\n" "14 - A(42, 2, 0)\n" "16 - A(1, 4, 3)\n"); EXPECT_EQ(expected, proc.toString()); - EXPECT_EQ((Map::size_type) 2, map.size()) << map.toString(); + EXPECT_EQ(2u, map.size()); EXPECT_EQ(A(4, 8, 0), *map.get(11, "foo")); EXPECT_EQ(A(1, 4, 3), *map.get(16, "foo")); - Map::WrappedEntry entry = map.get(14, "foo"); + auto entry = map.get(14, "foo"); EXPECT_FALSE(entry.exist()); } } -TEST(LockableMapTest, chunked_iteration_is_transparent_across_chunk_sizes) { - Map map; +TYPED_TEST(LockableMapTest, chunked_iteration_is_transparent_across_chunk_sizes) { + TypeParam map; bool preExisted; map.insert(16, A(1, 2, 3), "foo", preExisted); map.insert(11, A(4, 6, 0), "foo", preExisted); map.insert(14, A(42, 0, 0), "foo", preExisted); - NonConstProcessor ncproc; // Increments 2nd value in all entries. - // chunkedAll with chunk size of 1 - map.chunkedAll(ncproc, "foo", 1us, 1); + NonConstProcessor<TypeParam> ncproc; // Increments 2nd value in all entries. + // for_each_chunked with chunk size of 1 + map.for_each_chunked(std::ref(ncproc), "foo", 1us, 1); EXPECT_EQ(A(4, 7, 0), *map.get(11, "foo")); EXPECT_EQ(A(42, 1, 0), *map.get(14, "foo")); EXPECT_EQ(A(1, 3, 3), *map.get(16, "foo")); - // chunkedAll with chunk size larger than db size - map.chunkedAll(ncproc, "foo", 1us, 100); + // for_each_chunked with chunk size larger than db size + map.for_each_chunked(std::ref(ncproc), "foo", 1us, 100); EXPECT_EQ(A(4, 8, 0), *map.get(11, "foo")); EXPECT_EQ(A(42, 2, 0), *map.get(14, "foo")); EXPECT_EQ(A(1, 4, 3), *map.get(16, "foo")); } -TEST(LockableMapTest, can_abort_during_chunked_iteration) { - Map map; +TYPED_TEST(LockableMapTest, can_abort_during_chunked_iteration) { + TypeParam map; bool preExisted; map.insert(16, A(1, 2, 3), "foo", preExisted); map.insert(11, A(4, 6, 0), "foo", preExisted); map.insert(14, A(42, 0, 0), "foo", preExisted); - std::vector<Map::Decision> decisions; - decisions.push_back(Map::CONTINUE); - decisions.push_back(Map::ABORT); - EntryProcessor proc(decisions); - map.chunkedAll(proc, "foo", 1us, 100); + std::vector<typename TypeParam::Decision> decisions; + decisions.push_back(TypeParam::CONTINUE); + decisions.push_back(TypeParam::ABORT); + EntryProcessor<TypeParam> proc(decisions); + map.for_each_chunked(std::ref(proc), "foo", 1us, 100); std::string expected("11 - A(4, 6, 0)\n" "14 - A(42, 0, 0)\n"); EXPECT_EQ(expected, proc.toString()); } -TEST(LockableMapTest, find_buckets_simple) { - Map map; +TYPED_TEST(LockableMapTest, find_buckets_simple) { + TypeParam map; document::BucketId id1(17, 0x0ffff); id1 = id1.stripUnused(); @@ -293,8 +355,8 @@ TEST(LockableMapTest, find_buckets_simple) { EXPECT_EQ(A(3,4,5), *results[id3]); } -TEST(LockableMapTest, find_buckets) { - Map map; +TYPED_TEST(LockableMapTest, find_buckets) { + TypeParam map; document::BucketId id1(16, 0x0ffff); document::BucketId id2(17, 0x0ffff); @@ -317,8 +379,8 @@ TEST(LockableMapTest, find_buckets) { EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); } -TEST(LockableMapTest, find_buckets_2) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_buckets_2) { // ticket 3121525 + TypeParam map; document::BucketId id1(16, 0x0ffff); document::BucketId id2(17, 0x0ffff); @@ -341,8 +403,8 @@ TEST(LockableMapTest, find_buckets_2) { // ticket 3121525 EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); } -TEST(LockableMapTest, find_buckets_3) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_buckets_3) { // ticket 3121525 + TypeParam map; document::BucketId id1(16, 0x0ffff); document::BucketId id2(17, 0x0ffff); @@ -359,8 +421,8 @@ TEST(LockableMapTest, find_buckets_3) { // ticket 3121525 EXPECT_EQ(A(1,2,3), *results[id1.stripUnused()]); } -TEST(LockableMapTest, find_buckets_4) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_buckets_4) { // ticket 3121525 + TypeParam map; document::BucketId id1(16, 0x0ffff); document::BucketId id2(17, 0x0ffff); @@ -379,8 +441,8 @@ TEST(LockableMapTest, find_buckets_4) { // ticket 3121525 EXPECT_EQ(A(1,2,3), *results[id1.stripUnused()]); } -TEST(LockableMapTest, find_buckets_5) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_buckets_5) { // ticket 3121525 + TypeParam map; document::BucketId id1(16, 0x0ffff); document::BucketId id2(17, 0x0ffff); @@ -399,8 +461,8 @@ TEST(LockableMapTest, find_buckets_5) { // ticket 3121525 EXPECT_EQ(A(1,2,3), *results[id1.stripUnused()]); } -TEST(LockableMapTest, find_no_buckets) { - Map map; +TYPED_TEST(LockableMapTest, find_no_buckets) { + TypeParam map; document::BucketId id(16, 0x0ffff); auto results = map.getAll(id, "foo"); @@ -408,8 +470,8 @@ TEST(LockableMapTest, find_no_buckets) { EXPECT_EQ(0, results.size()); } -TEST(LockableMapTest, find_all) { - Map map; +TYPED_TEST(LockableMapTest, find_all) { + TypeParam map; document::BucketId id1(16, 0x0aaaa); // contains id2-id7 document::BucketId id2(17, 0x0aaaa); // contains id3-id4 @@ -450,8 +512,8 @@ TEST(LockableMapTest, find_all) { EXPECT_EQ(A(9,10,11), *results[id9.stripUnused()]); // sub bucket } -TEST(LockableMapTest, find_all_2) { // Ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_all_2) { // Ticket 3121525 + TypeParam map; document::BucketId id1(17, 0x00001); document::BucketId id2(17, 0x10001); @@ -469,8 +531,8 @@ TEST(LockableMapTest, find_all_2) { // Ticket 3121525 EXPECT_EQ(A(2,3,4), *results[id2.stripUnused()]); // sub bucket } -TEST(LockableMapTest, find_all_unused_bit_is_set) { // ticket 2938896 - Map map; +TYPED_TEST(LockableMapTest, find_all_unused_bit_is_set) { // ticket 2938896 + TypeParam map; document::BucketId id1(24, 0x000dc7089); document::BucketId id2(33, 0x0053c7089); @@ -493,8 +555,8 @@ TEST(LockableMapTest, find_all_unused_bit_is_set) { // ticket 2938896 EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); // sub bucket } -TEST(LockableMapTest, find_all_inconsistently_split) { // Ticket 2938896 - Map map; +TYPED_TEST(LockableMapTest, find_all_inconsistently_split) { // Ticket 2938896 + TypeParam map; document::BucketId id1(16, 0x00001); // contains id2-id3 document::BucketId id2(17, 0x00001); @@ -515,8 +577,8 @@ TEST(LockableMapTest, find_all_inconsistently_split) { // Ticket 2938896 EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); // sub bucket } -TEST(LockableMapTest, find_all_inconsistently_split_2) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_all_inconsistently_split_2) { // ticket 3121525 + TypeParam map; document::BucketId id1(17, 0x10000); document::BucketId id2(27, 0x007228034); // contains id3 @@ -538,8 +600,8 @@ TEST(LockableMapTest, find_all_inconsistently_split_2) { // ticket 3121525 EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); // most specific match (super bucket) } -TEST(LockableMapTest, find_all_inconsistently_split_3) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_all_inconsistently_split_3) { // ticket 3121525 + TypeParam map; document::BucketId id1(16, 0x0ffff); // contains id2 document::BucketId id2(17, 0x0ffff); @@ -556,8 +618,8 @@ TEST(LockableMapTest, find_all_inconsistently_split_3) { // ticket 3121525 EXPECT_EQ(A(1,2,3), *results[id1.stripUnused()]); // super bucket } -TEST(LockableMapTest, find_all_inconsistently_split_4) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_all_inconsistently_split_4) { // ticket 3121525 + TypeParam map; document::BucketId id1(16, 0x0ffff); // contains id2-id3 document::BucketId id2(17, 0x0ffff); @@ -577,8 +639,8 @@ TEST(LockableMapTest, find_all_inconsistently_split_4) { // ticket 3121525 EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); // sub bucket } -TEST(LockableMapTest, find_all_inconsistently_split_5) { // ticket 3121525 - Map map; +TYPED_TEST(LockableMapTest, find_all_inconsistently_split_5) { // ticket 3121525 + TypeParam map; document::BucketId id1(16, 0x0ffff); // contains id2-id3 document::BucketId id2(17, 0x0ffff); @@ -598,8 +660,8 @@ TEST(LockableMapTest, find_all_inconsistently_split_5) { // ticket 3121525 EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); // sub bucket } -TEST(LockableMapTest, find_all_inconsistently_split_6) { - Map map; +TYPED_TEST(LockableMapTest, find_all_inconsistently_split_6) { + TypeParam map; document::BucketId id1(16, 0x0ffff); // contains id2-id3 document::BucketId id2(18, 0x1ffff); @@ -619,8 +681,8 @@ TEST(LockableMapTest, find_all_inconsistently_split_6) { EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); // sub bucket } -TEST(LockableMapTest, find_all_inconsistent_below_16_bits) { - Map map; +TYPED_TEST(LockableMapTest, find_all_inconsistent_below_16_bits) { + TypeParam map; document::BucketId id1(1, 0x1); // contains id2-id3 document::BucketId id2(3, 0x1); @@ -641,24 +703,64 @@ TEST(LockableMapTest, find_all_inconsistent_below_16_bits) { EXPECT_EQ(A(3,4,5), *results[id3.stripUnused()]); // sub bucket } -TEST(LockableMapTest, is_consistent) { - Map map; +TYPED_TEST(LockableMapTest, is_consistent) { + TypeParam map; document::BucketId id1(16, 0x00001); // contains id2-id3 document::BucketId id2(17, 0x00001); bool preExisted; map.insert(id1.stripUnused().toKey(), A(1,2,3), "foo", preExisted); { - Map::WrappedEntry entry( - map.get(id1.stripUnused().toKey(), "foo", true)); + auto entry = map.get(id1.stripUnused().toKey(), "foo", true); EXPECT_TRUE(map.isConsistent(entry)); } map.insert(id2.stripUnused().toKey(), A(1,2,3), "foo", preExisted); { - Map::WrappedEntry entry( - map.get(id1.stripUnused().toKey(), "foo", true)); + auto entry = map.get(id1.stripUnused().toKey(), "foo", true); EXPECT_FALSE(map.isConsistent(entry)); } } +TYPED_TEST(LockableMapTest, get_without_auto_create_does_implicitly_not_lock_bucket) { + TypeParam map; + BucketId id(16, 0x00001); + + auto entry = map.get(id.toKey(), "foo", false); + EXPECT_FALSE(entry.exist()); + EXPECT_FALSE(entry.preExisted()); + EXPECT_FALSE(entry.locked()); +} + +TYPED_TEST(LockableMapTest, get_with_auto_create_returns_default_constructed_entry_if_missing) { + TypeParam map; + BucketId id(16, 0x00001); + + auto entry = map.get(id.toKey(), "foo", true); + EXPECT_TRUE(entry.exist()); + EXPECT_FALSE(entry.preExisted()); + EXPECT_TRUE(entry.locked()); + EXPECT_EQ(*entry, A()); + *entry = A(1, 2, 3); + entry.write(); // Implicit unlock (!) + + // Should now exist + entry = map.get(id.toKey(), "foo", true); + EXPECT_TRUE(entry.exist()); + EXPECT_TRUE(entry.preExisted()); + EXPECT_TRUE(entry.locked()); + EXPECT_EQ(*entry, A(1, 2, 3)); +} + +TYPED_TEST(LockableMapTest, entry_changes_not_visible_if_write_not_invoked_on_guard) { + TypeParam map; + BucketId id(16, 0x00001); + auto entry = map.get(id.toKey(), "foo", true); + *entry = A(1, 2, 3); + // No write() call on guard + entry.unlock(); + + entry = map.get(id.toKey(), "foo", true); + EXPECT_EQ(*entry, A()); +} + } // storage diff --git a/storage/src/vespa/storage/bucketdb/CMakeLists.txt b/storage/src/vespa/storage/bucketdb/CMakeLists.txt index a99d16d9f0f..5bd966ae0e1 100644 --- a/storage/src/vespa/storage/bucketdb/CMakeLists.txt +++ b/storage/src/vespa/storage/bucketdb/CMakeLists.txt @@ -2,11 +2,13 @@ vespa_add_library(storage_bucketdb OBJECT SOURCES btree_bucket_database.cpp + btree_lockable_map.cpp bucketcopy.cpp bucketdatabase.cpp bucketinfo.cpp bucketmanager.cpp bucketmanagermetrics.cpp + generic_btree_bucket_database.cpp judyarray.cpp lockablemap.cpp mapbucketdatabase.cpp diff --git a/storage/src/vespa/storage/bucketdb/abstract_bucket_map.h b/storage/src/vespa/storage/bucketdb/abstract_bucket_map.h new file mode 100644 index 00000000000..55b52a2cd38 --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/abstract_bucket_map.h @@ -0,0 +1,244 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/document/bucket/bucketid.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/stllike/hash_set.h> +#include <vespa/vespalib/util/time.h> +#include <cassert> +#include <functional> +#include <iosfwd> +#include <map> + +namespace storage::bucketdb { + +/* + * Interface for content node bucket database implementations. + * + * Allows for multiple divergent implementations to exist of the + * bucket database in a transition period. + */ +template <typename ValueT> +class AbstractBucketMap { +public: + using key_type = uint64_t; // Always a raw u64 bucket key. + using mapped_type = ValueT; + using size_type = size_t; + using BucketId = document::BucketId; + struct WrappedEntry; + + // Responsible for releasing lock in map when out of scope. + class LockKeeper { + friend struct WrappedEntry; + AbstractBucketMap& _map; + key_type _key; + bool _locked; + + LockKeeper(AbstractBucketMap& map, key_type key) + : _map(map), _key(key), _locked(true) {} + void unlock() { _map.unlock(_key); _locked = false; } + public: + ~LockKeeper() { if (_locked) unlock(); } + }; + + struct WrappedEntry { + WrappedEntry() + : _exists(false), + _preExisted(false), + _lockKeeper(), + _value(), + _clientId(nullptr) + {} + WrappedEntry(AbstractBucketMap& map, + const key_type& key, const mapped_type& val, + const char* clientId, bool preExisted_) + : _exists(true), + _preExisted(preExisted_), + _lockKeeper(new LockKeeper(map, key)), + _value(val), + _clientId(clientId) {} + WrappedEntry(AbstractBucketMap& map, const key_type& key, + const char* clientId) + : _exists(false), + _preExisted(false), + _lockKeeper(new LockKeeper(map, key)), + _value(), + _clientId(clientId) {} + // TODO noexcept on these: + WrappedEntry(WrappedEntry&&) = default; + WrappedEntry& operator=(WrappedEntry&&) = default; + ~WrappedEntry(); + + mapped_type* operator->() { return &_value; } + const mapped_type* operator->() const { return &_value; } + mapped_type& operator*() { return _value; } + const mapped_type& operator*() const { return _value; } + + const mapped_type *get() const { return &_value; } + mapped_type *get() { return &_value; } + + void write(); + void remove(); + void unlock(); + [[nodiscard]] bool exist() const { return _exists; } // TODO rename to exists() + [[nodiscard]] bool preExisted() const { return _preExisted; } + [[nodiscard]] bool locked() const { return _lockKeeper.get(); } + const key_type& getKey() const { return _lockKeeper->_key; }; + + BucketId getBucketId() const { + return BucketId(BucketId::keyToBucketId(getKey())); + } + protected: + bool _exists; + bool _preExisted; + std::unique_ptr<LockKeeper> _lockKeeper; + mapped_type _value; + const char* _clientId; + friend class AbstractLockableMap; + }; + + struct LockId { + key_type _key; + const char* _owner; + + LockId() : _key(0), _owner("none - empty token") {} + LockId(key_type key, const char* owner) + : _key(key), _owner(owner) + { + assert(_owner); + } + + size_t hash() const { return _key; } + size_t operator%(size_t val) const { return _key % val; } + bool operator==(const LockId& id) const { return (_key == id._key); } + operator key_type() const { return _key; } + }; + + using EntryMap = std::map<BucketId, WrappedEntry>; // TODO ordered std::vector instead? map interface needed? + + enum Decision { ABORT, UPDATE, REMOVE, CONTINUE, DECISION_COUNT }; + + AbstractBucketMap() = default; + virtual ~AbstractBucketMap() = default; + + virtual void insert(const key_type& key, const mapped_type& value, + const char* client_id, bool has_lock, + bool& pre_existed) = 0; + virtual bool erase(const key_type& key, const char* clientId, bool has_lock) = 0; + + virtual WrappedEntry get(const key_type& key, const char* clientId, bool createIfNonExisting) = 0; + WrappedEntry get(const key_type& key, const char* clientId) { + return get(key, clientId, false); + } + /** + * Returns all buckets in the bucket database that can contain the given + * bucket, and all buckets that that bucket contains. + */ + virtual EntryMap getAll(const BucketId& bucketId, const char* clientId) = 0; + /** + * Returns all buckets in the bucket database that can contain the given + * bucket. Usually, there should be only one such bucket, but in the case + * of inconsistent splitting, there may be more than one. + */ + virtual EntryMap getContained(const BucketId& bucketId, const char* clientId) = 0; + /** + * Returns true iff bucket has no superbuckets or sub-buckets in the + * database. Usage assumption is that any operation that can cause the + * bucket to become inconsistent will require taking its lock, so by + * requiring the lock to be provided here we avoid race conditions. + */ + virtual bool isConsistent(const WrappedEntry& entry) = 0; // TODO const + + static constexpr uint32_t DEFAULT_CHUNK_SIZE = 1000; + + + /** + * Iterate over the entire database contents, holding the global database + * mutex for `chunkSize` processed entries at a time, yielding the current + * thread between each such such to allow other threads to get a chance + * at acquiring a bucket lock. + * + * TODO deprecate in favor of snapshotting once fully on B-tree DB + * + * Type erasure of functor needed due to virtual indirection. + */ + void for_each_chunked(std::function<Decision(uint64_t, ValueT&)> func, + const char* clientId, + vespalib::duration yieldTime = 10us, + uint32_t chunkSize = DEFAULT_CHUNK_SIZE) + { + do_for_each_chunked(std::move(func), clientId, yieldTime, chunkSize); + } + + void for_each_mutable(std::function<Decision(uint64_t, ValueT&)> func, + const char* clientId, + const key_type& first = 0, + const key_type& last = UINT64_MAX) + { + do_for_each_mutable(std::move(func), clientId, first, last); + } + + void for_each(std::function<Decision(uint64_t, const ValueT&)> func, + const char* clientId, + const key_type& first = 0, + const key_type& last = UINT64_MAX) + { + do_for_each(std::move(func), clientId, first, last); + } + + [[nodiscard]] virtual size_type size() const noexcept = 0; + [[nodiscard]] virtual size_type getMemoryUsage() const noexcept = 0; + [[nodiscard]] virtual bool empty() const noexcept = 0; + + virtual void showLockClients(vespalib::asciistream& out) const = 0; + + virtual void print(std::ostream& out, bool verbose, const std::string& indent) const = 0; +private: + virtual void unlock(const key_type& key) = 0; // Only for bucket lock guards + virtual void do_for_each_chunked(std::function<Decision(uint64_t, ValueT&)> func, + const char* clientId, + vespalib::duration yieldTime, + uint32_t chunkSize) = 0; + virtual void do_for_each_mutable(std::function<Decision(uint64_t, ValueT&)> func, + const char* clientId, + const key_type& first, + const key_type& last) = 0; + virtual void do_for_each(std::function<Decision(uint64_t, const ValueT&)> func, + const char* clientId, + const key_type& first, + const key_type& last) = 0; +}; + +template <typename ValueT> +std::ostream& operator<<(std::ostream& os, const AbstractBucketMap<ValueT>& map) { + map.print(os, false, ""); + return os; +} + +template <typename ValueT> +AbstractBucketMap<ValueT>::WrappedEntry::~WrappedEntry() = default; + +template <typename ValueT> +void AbstractBucketMap<ValueT>::WrappedEntry::write() { + assert(_lockKeeper->_locked); + assert(_value.verifyLegal()); + bool b; + _lockKeeper->_map.insert(_lockKeeper->_key, _value, _clientId, true, b); + _lockKeeper->unlock(); +} + +template <typename ValueT> +void AbstractBucketMap<ValueT>::WrappedEntry::remove() { + assert(_lockKeeper->_locked); + assert(_exists); + _lockKeeper->_map.erase(_lockKeeper->_key, _clientId, true); + _lockKeeper->unlock(); +} + +template <typename ValueT> +void AbstractBucketMap<ValueT>::WrappedEntry::unlock() { + assert(_lockKeeper->_locked); + _lockKeeper->unlock(); +} + +} diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp index 30fa8bb7543..53fa5afc93c 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp @@ -13,6 +13,10 @@ #include <vespa/vespalib/datastore/array_store.hpp> #include <iostream> +// TODO remove once this impl uses the generic bucket B-tree code! +#include "generic_btree_bucket_database.h" +#include <vespa/vespalib/datastore/datastore.h> + /* * Buckets in our tree are represented by their 64-bit numeric key, in what's known as * "reversed bit order with appended used-bits" form. I.e. a bucket ID (16, 0xcafe), which @@ -40,16 +44,8 @@ using vespalib::datastore::EntryRef; using vespalib::ConstArrayRef; using document::BucketId; -BTreeBucketDatabase::BTreeBucketDatabase() - : _tree(), - _store(make_default_array_store_config()), - _generation_handler() -{ -} - -BTreeBucketDatabase::~BTreeBucketDatabase() = default; - -vespalib::datastore::ArrayStoreConfig BTreeBucketDatabase::make_default_array_store_config() { +template <typename ReplicaStore> +vespalib::datastore::ArrayStoreConfig make_default_array_store_config() { return ReplicaStore::optimizedConfigForHugePage(1023, vespalib::alloc::MemoryAllocator::HUGEPAGE_SIZE, 4 * 1024, 8 * 1024, 0.2).enable_free_lists(true); } @@ -121,6 +117,15 @@ uint8_t next_parent_bit_seek_level(uint8_t minBits, const document::BucketId& a, } +BTreeBucketDatabase::BTreeBucketDatabase() + : _tree(), + _store(make_default_array_store_config<ReplicaStore>()), + _generation_handler() +{ +} + +BTreeBucketDatabase::~BTreeBucketDatabase() = default; + void BTreeBucketDatabase::commit_tree_changes() { // TODO break up and refactor // TODO verify semantics and usage @@ -574,4 +579,45 @@ uint64_t BTreeBucketDatabase::ReadGuardImpl::generation() const noexcept { return _guard.getGeneration(); } +// TODO replace existing distributor DB code with generic impl. +// This is to ensure the generic implementation compiles with an ArrayStore backing in +// the meantime. +struct BTreeBucketDatabase2 { + struct ReplicaValueTraits { + using ValueType = Entry; + using ConstValueRef = ConstEntryRef; + using DataStoreType = vespalib::datastore::ArrayStore<BucketCopy>; + + static ValueType make_invalid_value() { + return Entry::createInvalid(); + } + static uint64_t store_and_wrap_value(DataStoreType& store, const Entry& entry) noexcept { + auto replicas_ref = store.add(entry.getBucketInfo().getRawNodes()); + return value_from(entry.getBucketInfo().getLastGarbageCollectionTime(), replicas_ref); + } + static void remove_by_wrapped_value(DataStoreType& store, uint64_t value) noexcept { + store.remove(entry_ref_from_value(value)); + } + static ValueType unwrap_from_key_value(const DataStoreType& store, uint64_t key, uint64_t value) { + const auto replicas_ref = store.get(entry_ref_from_value(value)); + const auto bucket = BucketId(BucketId::keyToBucketId(key)); + return entry_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref); + } + static ConstValueRef unwrap_const_ref_from_key_value(const DataStoreType& store, uint64_t key, uint64_t value) { + const auto replicas_ref = store.get(entry_ref_from_value(value)); + const auto bucket = BucketId(BucketId::keyToBucketId(key)); + return const_entry_ref_from_replica_array_ref(bucket, gc_timestamp_from_value(value), replicas_ref); + } + }; + + using BTreeImpl = bucketdb::GenericBTreeBucketDatabase<ReplicaValueTraits>; + BTreeImpl _impl; + + BTreeBucketDatabase2() + : _impl(make_default_array_store_config<ReplicaValueTraits::DataStoreType>()) + {} +}; + +template class bucketdb::GenericBTreeBucketDatabase<BTreeBucketDatabase2::ReplicaValueTraits>; + } diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h index 35e248cfc76..0dfa2b07b8a 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h @@ -84,8 +84,6 @@ private: const document::BucketId& bucket, std::vector<Entry>& entries) const; - static vespalib::datastore::ArrayStoreConfig make_default_array_store_config(); - class ReadGuardImpl : public ReadGuard { const BTreeBucketDatabase* _db; GenerationHandler::Guard _guard; diff --git a/storage/src/vespa/storage/bucketdb/btree_lockable_map.cpp b/storage/src/vespa/storage/bucketdb/btree_lockable_map.cpp new file mode 100644 index 00000000000..a76f50d41ab --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/btree_lockable_map.cpp @@ -0,0 +1,8 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "btree_lockable_map.hpp" + +namespace storage::bucketdb { + +template class BTreeLockableMap<StorageBucketInfo>; // Forced instantiation. + +} diff --git a/storage/src/vespa/storage/bucketdb/btree_lockable_map.h b/storage/src/vespa/storage/bucketdb/btree_lockable_map.h new file mode 100644 index 00000000000..a31cad7bf35 --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/btree_lockable_map.h @@ -0,0 +1,158 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "abstract_bucket_map.h" +#include "storagebucketinfo.h" +#include <vespa/document/bucket/bucketid.h> +#include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/stllike/hash_set.h> +#include <map> +#include <memory> +#include <mutex> +#include <condition_variable> +#include <cassert> +#include <iosfwd> + +namespace storage::bucketdb { + +template <typename DataStoreTraitsT> class GenericBTreeBucketDatabase; + +template <typename T> +class BTreeLockableMap : public AbstractBucketMap<T> { + struct ValueTraits; +public: + using ParentType = AbstractBucketMap<T>; + using WrappedEntry = typename ParentType::WrappedEntry; + using key_type = typename ParentType::key_type; + using mapped_type = typename ParentType::mapped_type; + using LockId = typename ParentType::LockId; + using EntryMap = typename ParentType::EntryMap; + using Decision = typename ParentType::Decision; + using BucketId = document::BucketId; + + BTreeLockableMap(); + ~BTreeLockableMap(); + + bool operator==(const BTreeLockableMap& other) const; + bool operator!=(const BTreeLockableMap& other) const { + return ! (*this == other); + } + bool operator<(const BTreeLockableMap& other) const; + size_t size() const noexcept override; + size_t getMemoryUsage() const noexcept override; + bool empty() const noexcept override; + void swap(BTreeLockableMap&); + + WrappedEntry get(const key_type& key, const char* clientId, bool createIfNonExisting) override; + WrappedEntry get(const key_type& key, const char* clientId) { + return get(key, clientId, false); + } + bool erase(const key_type& key, const char* clientId, bool has_lock) override; + void insert(const key_type& key, const mapped_type& value, + const char* client_id, bool has_lock, bool& pre_existed) override; + + bool erase(const key_type& key, const char* client_id) { + return erase(key, client_id, false); + } + void insert(const key_type& key, const mapped_type& value, + const char* client_id, bool& pre_existed) { + return insert(key, value, client_id, false, pre_existed); + } + void clear(); + void print(std::ostream& out, bool verbose, const std::string& indent) const override; + EntryMap getContained(const BucketId& bucketId, const char* clientId) override; + EntryMap getAll(const BucketId& bucketId, const char* clientId) override; + bool isConsistent(const WrappedEntry& entry) override; + void showLockClients(vespalib::asciistream & out) const override; + +private: + struct hasher { + size_t operator () (const LockId & lid) const { return lid.hash(); } + }; + class LockIdSet : public vespalib::hash_set<LockId, hasher> { + typedef vespalib::hash_set<LockId, hasher> Hash; + public: + LockIdSet(); + ~LockIdSet(); + void print(std::ostream& out, bool verbose, const std::string& indent) const; + bool exists(const LockId & lid) const { return this->find(lid) != Hash::end(); } + size_t getMemoryUsage() const; + }; + + class LockWaiters { + typedef vespalib::hash_map<size_t, LockId> WaiterMap; + public: + typedef size_t Key; + typedef typename WaiterMap::const_iterator const_iterator; + LockWaiters(); + ~LockWaiters(); + Key insert(const LockId & lid); + void erase(Key id) { _map.erase(id); } + const_iterator begin() const { return _map.begin(); } + const_iterator end() const { return _map.end(); } + private: + Key _id; + WaiterMap _map; + }; + + mutable std::mutex _lock; + std::condition_variable _cond; + std::unique_ptr<GenericBTreeBucketDatabase<ValueTraits>> _impl; + LockIdSet _lockedKeys; + LockWaiters _lockWaiters; + + void unlock(const key_type& key) override; + bool findNextKey(key_type& key, mapped_type& val, const char* clientId, + std::unique_lock<std::mutex> &guard); + bool handleDecision(key_type& key, mapped_type& val, Decision decision); + void acquireKey(const LockId & lid, std::unique_lock<std::mutex> &guard); + + void do_for_each_mutable(std::function<Decision(uint64_t, mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) override; + + void do_for_each(std::function<Decision(uint64_t, const mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) override; + + void do_for_each_chunked(std::function<Decision(uint64_t, mapped_type&)> func, + const char* client_id, + vespalib::duration yield_time, + uint32_t chunk_size) override; + + /** + * Process up to `chunk_size` bucket database entries from--and possibly + * including--the bucket pointed to by `key`. + * + * Returns true if additional chunks may be processed after the call to + * this function has returned, false if iteration has completed or if + * `func` returned an abort-decision. + * + * Modifies `key` in-place to point to the next key to process for the next + * invocation of this function. + */ + bool processNextChunk(std::function<Decision(uint64_t, mapped_type&)>& func, + key_type& key, + const char* client_id, + uint32_t chunk_size); + + /** + * Returns the given bucket, its super buckets and its sub buckets. + */ + void getAllWithoutLocking(const BucketId& bucket, + std::vector<BucketId::Type>& keys); + + /** + * Find the given list of keys in the map and add them to the map of + * results, locking them in the process. + */ + void addAndLockResults(const std::vector<BucketId::Type>& keys, + const char* clientId, + std::map<BucketId, WrappedEntry>& results, + std::unique_lock<std::mutex> &guard); +}; + +} diff --git a/storage/src/vespa/storage/bucketdb/btree_lockable_map.hpp b/storage/src/vespa/storage/bucketdb/btree_lockable_map.hpp new file mode 100644 index 00000000000..4d0db1e6979 --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/btree_lockable_map.hpp @@ -0,0 +1,509 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "btree_lockable_map.h" +#include "generic_btree_bucket_database.hpp" +#include <vespa/vespalib/btree/btreebuilder.h> +#include <vespa/vespalib/btree/btreenodeallocator.hpp> +#include <vespa/vespalib/btree/btreenode.hpp> +#include <vespa/vespalib/btree/btreenodestore.hpp> +#include <vespa/vespalib/btree/btreeiterator.hpp> +#include <vespa/vespalib/btree/btreeroot.hpp> +#include <vespa/vespalib/btree/btreebuilder.hpp> +#include <vespa/vespalib/btree/btree.hpp> +#include <vespa/vespalib/btree/btreestore.hpp> +#include <vespa/vespalib/datastore/datastore.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/stllike/hash_set.hpp> +#include <thread> +#include <sstream> + +// Major TODOs in the short term: +// - Introduce snapshotting for readers +// - Greatly improve performance for DB iteration for readers by avoiding +// requirement to lock individual buckets and perform O(n) lbound seeks +// just to do a sweep. + +namespace storage::bucketdb { + +using vespalib::datastore::EntryRef; +using vespalib::ConstArrayRef; +using document::BucketId; + +template <typename T> +struct BTreeLockableMap<T>::ValueTraits { + using ValueType = T; + using ConstValueRef = const T&; + using DataStoreType = vespalib::datastore::DataStore<ValueType>; + + static EntryRef entry_ref_from_value(uint64_t value) { + return EntryRef(value & 0xffffffffULL); + } + static ValueType make_invalid_value() { + return ValueType(); + } + static uint64_t store_and_wrap_value(DataStoreType& store, const ValueType& value) noexcept { + return store.addEntry(value).ref(); + } + static void remove_by_wrapped_value(DataStoreType& store, uint64_t value) noexcept { + store.freeElem(entry_ref_from_value(value), 1); + } + static ValueType unwrap_from_key_value(const DataStoreType& store, [[maybe_unused]] uint64_t key, uint64_t value) { + return store.getEntry(entry_ref_from_value(value)); + } + static ConstValueRef unwrap_const_ref_from_key_value(const DataStoreType& store, [[maybe_unused]] uint64_t key, uint64_t value) { + return store.getEntry(entry_ref_from_value(value)); + } +}; + +template <typename T> +BTreeLockableMap<T>::BTreeLockableMap() + : _impl(std::make_unique<GenericBTreeBucketDatabase<ValueTraits>>()) +{} + +template <typename T> +BTreeLockableMap<T>::~BTreeLockableMap() = default; + +template <typename T> +BTreeLockableMap<T>::LockIdSet::LockIdSet() : Hash() {} + +template <typename T> +BTreeLockableMap<T>::LockIdSet::~LockIdSet() = default; + +template <typename T> +size_t BTreeLockableMap<T>::LockIdSet::getMemoryUsage() const { + return Hash::getMemoryConsumption(); +} + +template <typename T> +BTreeLockableMap<T>::LockWaiters::LockWaiters() : _id(0), _map() {} + +template <typename T> +BTreeLockableMap<T>::LockWaiters::~LockWaiters() = default; + +template <typename T> +size_t BTreeLockableMap<T>::LockWaiters::insert(const LockId & lid) { + Key id(_id++); + _map.insert(typename WaiterMap::value_type(id, lid)); + return id; +} + +template <typename T> +bool BTreeLockableMap<T>::operator==(const BTreeLockableMap& other) const { + std::lock_guard guard(_lock); + std::lock_guard guard2(other._lock); + if (_impl->size() != other._impl->size()) { + return false; + } + auto lhs = _impl->begin(); + auto rhs = other._impl->begin(); + for (; lhs.valid(); ++lhs, ++rhs) { + assert(rhs.valid()); + if (lhs.getKey() != rhs.getKey()) { + return false; + } + if (_impl->const_value_ref_from_valid_iterator(lhs) + != other._impl->const_value_ref_from_valid_iterator(rhs)) + { + return false; + } + } + return true; +} + +template <typename T> +bool BTreeLockableMap<T>::operator<(const BTreeLockableMap& other) const { + std::lock_guard guard(_lock); + std::lock_guard guard2(other._lock); + auto lhs = _impl->begin(); + auto rhs = other._impl->begin(); + for (; lhs.valid() && rhs.valid(); ++lhs, ++rhs) { + if (lhs.getKey() != rhs.getKey()) { + return (lhs.getKey() < rhs.getKey()); + } + if (_impl->const_value_ref_from_valid_iterator(lhs) + != other._impl->const_value_ref_from_valid_iterator(rhs)) + { + return (_impl->const_value_ref_from_valid_iterator(lhs) + < other._impl->const_value_ref_from_valid_iterator(rhs)); + } + } + if (lhs.valid() == rhs.valid()) { + return false; // All keys are equal in maps of equal size. + } + return rhs.valid(); // Rhs still valid, lhs is not; ergo lhs is "less". +} + +template <typename T> +size_t BTreeLockableMap<T>::size() const noexcept { + std::lock_guard guard(_lock); + return _impl->size(); +} + +template <typename T> +size_t BTreeLockableMap<T>::getMemoryUsage() const noexcept { + std::lock_guard guard(_lock); + const auto impl_usage = _impl->memory_usage(); + return (impl_usage.allocatedBytes() + impl_usage.usedBytes() + + impl_usage.deadBytes() + impl_usage.allocatedBytesOnHold() + + _lockedKeys.getMemoryUsage() + + sizeof(std::mutex) + sizeof(std::condition_variable)); +} + +template <typename T> +bool BTreeLockableMap<T>::empty() const noexcept { + std::lock_guard guard(_lock); + return _impl->empty(); +} + +template <typename T> +void BTreeLockableMap<T>::swap(BTreeLockableMap& other) { + std::lock_guard guard(_lock); + std::lock_guard guard2(other._lock); + _impl.swap(other._impl); +} + +template <typename T> +void BTreeLockableMap<T>::acquireKey(const LockId& lid, std::unique_lock<std::mutex>& guard) { + if (_lockedKeys.exists(lid)) { + auto waitId = _lockWaiters.insert(lid); + while (_lockedKeys.exists(lid)) { + _cond.wait(guard); + } + _lockWaiters.erase(waitId); + } +} + +template <typename T> +typename BTreeLockableMap<T>::WrappedEntry +BTreeLockableMap<T>::get(const key_type& key, const char* clientId, bool createIfNonExisting) { + LockId lid(key, clientId); + std::unique_lock guard(_lock); + acquireKey(lid, guard); + auto iter = _impl->find(key); + bool preExisted = iter.valid(); + + if (!preExisted && createIfNonExisting) { + _impl->update_by_raw_key(key, mapped_type()); + // TODO avoid double lookup, though this is in an unlikely branch so shouldn't matter much. + iter = _impl->find(key); + assert(iter.valid()); + } + if (!iter.valid()) { + return WrappedEntry(); + } + _lockedKeys.insert(lid); + return WrappedEntry(*this, key, _impl->entry_from_iterator(iter), clientId, preExisted); +} + +template <typename T> +bool BTreeLockableMap<T>::erase(const key_type& key, const char* client_id, bool has_lock) { + LockId lid(key, client_id); + std::unique_lock guard(_lock); + if (!has_lock) { + acquireKey(lid, guard); + } + return _impl->remove_by_raw_key(key); +} + +template <typename T> +void BTreeLockableMap<T>::insert(const key_type& key, const mapped_type& value, + const char* clientId, bool has_lock, bool& pre_existed) +{ + LockId lid(key, clientId); + std::unique_lock guard(_lock); + if (!has_lock) { + acquireKey(lid, guard); + } + pre_existed = _impl->update_by_raw_key(key, value); +} + +template <typename T> +void BTreeLockableMap<T>::clear() { + std::lock_guard<std::mutex> guard(_lock); + _impl->clear(); +} + +template <typename T> +bool BTreeLockableMap<T>::findNextKey(key_type& key, mapped_type& val, + const char* clientId, + std::unique_lock<std::mutex> &guard) +{ + // Wait for next value to unlock. + auto it = _impl->lower_bound(key); + while (it.valid() && _lockedKeys.exists(LockId(it.getKey(), ""))) { + auto wait_id = _lockWaiters.insert(LockId(it.getKey(), clientId)); + _cond.wait(guard); + _lockWaiters.erase(wait_id); + it = _impl->lower_bound(key); + } + if (!it.valid()) { + return true; + } + key = it.getKey(); + val = _impl->entry_from_iterator(it); + return false; +} + +template <typename T> +bool BTreeLockableMap<T>::handleDecision(key_type& key, mapped_type& val, + Decision decision) +{ + switch (decision) { + case Decision::UPDATE: + _impl->update_by_raw_key(key, val); + break; + case Decision::REMOVE: + // Invalidating is fine, since the caller doesn't hold long-lived iterators. + _impl->remove_by_raw_key(key); + break; + case Decision::ABORT: + return true; + case Decision::CONTINUE: + break; + default: + HDR_ABORT("should not be reached"); + } + return false; +} + +template <typename T> +void BTreeLockableMap<T>::do_for_each_mutable(std::function<Decision(uint64_t, mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) +{ + key_type key = first; + mapped_type val; + std::unique_lock guard(_lock); + while (true) { + if (findNextKey(key, val, clientId, guard) || key > last) { + return; + } + Decision d(func(const_cast<const key_type&>(key), val)); + if (handleDecision(key, val, d)) { + return; + } + ++key; + } +} + +template <typename T> +void BTreeLockableMap<T>::do_for_each(std::function<Decision(uint64_t, const mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) +{ + key_type key = first; + mapped_type val; + std::unique_lock guard(_lock); + while (true) { + if (findNextKey(key, val, clientId, guard) || key > last) { + return; + } + Decision d(func(const_cast<const key_type&>(key), val)); + assert(d == Decision::ABORT || d == Decision::CONTINUE); + if (handleDecision(key, val, d)) { + return; + } + ++key; + } +} + +template <typename T> +bool BTreeLockableMap<T>::processNextChunk(std::function<Decision(uint64_t, mapped_type&)>& func, + key_type& key, + const char* client_id, + const uint32_t chunk_size) +{ + mapped_type val; + std::unique_lock guard(_lock); + for (uint32_t processed = 0; processed < chunk_size; ++processed) { + if (findNextKey(key, val, client_id, guard)) { + return false; + } + Decision d(func(const_cast<const key_type&>(key), val)); + if (handleDecision(key, val, d)) { + return false; + } + ++key; + } + return true; +} + +template <typename T> +void BTreeLockableMap<T>::do_for_each_chunked(std::function<Decision(uint64_t, mapped_type&)> func, + const char* client_id, + vespalib::duration yield_time, + uint32_t chunk_size) +{ + key_type key{}; + while (processNextChunk(func, key, client_id, chunk_size)) { + // Rationale: delay iteration for as short a time as possible while + // allowing another thread blocked on the main DB mutex to acquire it + // in the meantime. Simply yielding the thread does not have the + // intended effect with the Linux scheduler. + // This is a pragmatic stop-gap solution; a more robust change requires + // the redesign of bucket DB locking and signalling semantics in the + // face of blocked point lookups. + std::this_thread::sleep_for(yield_time); + } +} + +template <typename T> +void BTreeLockableMap<T>::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + std::lock_guard guard(_lock); + out << "BTreeLockableMap {\n" << indent << " "; + + if (verbose) { + for (auto it = _impl->begin(); it.valid(); ++it) { + out << "Key: " << BucketId(BucketId::keyToBucketId(it.getKey())) + << " Value: " << _impl->entry_from_iterator(it) << "\n" << indent << " "; + } + out << "\n" << indent << " Locked keys: "; + _lockedKeys.print(out, verbose, indent + " "); + } + out << "} : "; +} + +template <typename T> +void BTreeLockableMap<T>::LockIdSet::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + out << "hash {"; + for (const auto& entry : *this) { + if (verbose) { + out << "\n" << indent << " "; + } else { + out << " "; + } + out << entry; + } + if (verbose) { + out << "\n" << indent; + } + out << " }"; +} + + + +template <typename T> +void BTreeLockableMap<T>::unlock(const key_type& key) { + std::lock_guard guard(_lock); + _lockedKeys.erase(LockId(key, "")); + _cond.notify_all(); +} + +template <typename T> +void BTreeLockableMap<T>::addAndLockResults( + const std::vector<BucketId::Type>& keys, + const char* clientId, + std::map<BucketId, WrappedEntry>& results, + std::unique_lock<std::mutex> &guard) +{ + // Wait until all buckets are free to be added, then add them all. + while (true) { + bool allOk = true; + key_type waitingFor(0); + + for (const auto key : keys) { + if (_lockedKeys.exists(LockId(key, clientId))) { + waitingFor = key; + allOk = false; + break; + } + } + + if (!allOk) { + auto waitId = _lockWaiters.insert(LockId(waitingFor, clientId)); + _cond.wait(guard); + _lockWaiters.erase(waitId); + } else { + for (const auto key : keys) { + auto iter = _impl->find(key); + if (iter.valid()) { + _lockedKeys.insert(LockId(key, clientId)); + results[BucketId(BucketId::keyToBucketId(key))] = WrappedEntry( + *this, key, _impl->entry_from_iterator(iter), clientId, true); + } + } + break; + } + } +} + +template <typename T> +typename BTreeLockableMap<T>::EntryMap +BTreeLockableMap<T>::getContained(const BucketId& bucket, + const char* clientId) +{ + std::unique_lock guard(_lock); + std::map<BucketId, WrappedEntry> results; + + std::vector<BucketId::Type> keys; + _impl->find_parents_and_self(bucket, [&keys](uint64_t key, [[maybe_unused]]const auto& value){ + keys.emplace_back(key); + }); + + if (!keys.empty()) { + addAndLockResults(keys, clientId, results, guard); + } + + return results; +} + +template <typename T> +void BTreeLockableMap<T>::getAllWithoutLocking(const BucketId& bucket, + std::vector<BucketId::Type>& keys) +{ + _impl->find_parents_self_and_children(bucket, [&keys](uint64_t key, [[maybe_unused]]const auto& value){ + keys.emplace_back(key); + }); +} + +/** + * Returns the given bucket, its super buckets and its sub buckets. + */ +template <typename T> +typename BTreeLockableMap<T>::EntryMap +BTreeLockableMap<T>::getAll(const BucketId& bucket, const char* clientId) { + std::unique_lock guard(_lock); + + std::map<BucketId, WrappedEntry> results; + std::vector<BucketId::Type> keys; + + getAllWithoutLocking(bucket, keys); + addAndLockResults(keys, clientId, results, guard); + + return results; +} + +template <typename T> +bool BTreeLockableMap<T>::isConsistent(const BTreeLockableMap::WrappedEntry& entry) { + std::lock_guard guard(_lock); + uint64_t n_buckets = 0; + _impl->find_parents_self_and_children(entry.getBucketId(), + [&n_buckets]([[maybe_unused]] uint64_t key, [[maybe_unused]] const auto& value) { + ++n_buckets; + }); + return (n_buckets == 1); +} + +template <typename T> +void BTreeLockableMap<T>::showLockClients(vespalib::asciistream& out) const { + std::lock_guard guard(_lock); + out << "Currently grabbed locks:"; + for (const auto& locked : _lockedKeys) { + out << "\n " + << BucketId(BucketId::keyToBucketId(locked._key)) + << " - " << locked._owner; + } + out << "\nClients waiting for keys:"; + for (const auto& waiter : _lockWaiters) { + out << "\n " + << BucketId(BucketId::keyToBucketId(waiter.second._key)) + << " - " << waiter.second._owner; + } +} + +} diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index c8cdaaaaa23..4dbeb5a9a22 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -116,12 +116,12 @@ namespace { : _state(*distribution, systemState), _result(result), _factory(factory), - _storageDistribution(distribution) + _storageDistribution(std::move(distribution)) { } StorBucketDatabase::Decision operator()(uint64_t bucketId, - StorBucketDatabase::Entry& data) + const StorBucketDatabase::Entry& data) { document::BucketId b(document::BucketId::keyToBucketId(bucketId)); try{ @@ -155,7 +155,7 @@ namespace { .getDistributionConfigHash().c_str(), _state.getClusterState().toString().c_str()); } - return StorBucketDatabase::CONTINUE; + return StorBucketDatabase::Decision::CONTINUE; } }; @@ -180,7 +180,7 @@ namespace { StorBucketDatabase::Decision operator()( document::BucketId::Type bucketId, - StorBucketDatabase::Entry& data) + const StorBucketDatabase::Entry& data) { document::BucketId bucket( document::BucketId::keyToBucketId(bucketId)); @@ -202,7 +202,7 @@ namespace { } } - return StorBucketDatabase::CONTINUE; + return StorBucketDatabase::Decision::CONTINUE; }; void add(const MetricsUpdater& rhs) { @@ -242,7 +242,7 @@ BucketManager::updateMetrics(bool updateDocCount) MetricsUpdater total(diskCount); for (auto& space : _component.getBucketSpaceRepo()) { MetricsUpdater m(diskCount); - space.second->bucketDatabase().chunkedAll(m, "BucketManager::updateMetrics"); + space.second->bucketDatabase().for_each_chunked(std::ref(m), "BucketManager::updateMetrics"); total.add(m); if (updateDocCount) { auto bm = _metrics->bucket_spaces.find(space.first); @@ -338,10 +338,10 @@ namespace { class BucketDBDumper { vespalib::XmlOutputStream& _xos; public: - BucketDBDumper(vespalib::XmlOutputStream& xos) : _xos(xos) {} + explicit BucketDBDumper(vespalib::XmlOutputStream& xos) : _xos(xos) {} StorBucketDatabase::Decision operator()( - uint64_t bucketId, StorBucketDatabase::Entry& info) + uint64_t bucketId, const StorBucketDatabase::Entry& info) { using namespace vespalib::xml; document::BucketId bucket( @@ -356,7 +356,7 @@ namespace { info.getBucketInfo().printXml(_xos); _xos << XmlAttribute("disk", info.disk); _xos << XmlEndTag(); - return StorBucketDatabase::CONTINUE; + return StorBucketDatabase::Decision::CONTINUE; }; }; } @@ -378,8 +378,8 @@ BucketManager::reportStatus(std::ostream& out, xmlReporter << XmlTag("bucket-space") << XmlAttribute("name", document::FixedBucketSpaces::to_string(space.first)); BucketDBDumper dumper(xmlReporter.getStream()); - _component.getBucketSpaceRepo().get(space.first).bucketDatabase().chunkedAll( - dumper, "BucketManager::reportStatus"); + _component.getBucketSpaceRepo().get(space.first).bucketDatabase().for_each_chunked( + std::ref(dumper), "BucketManager::reportStatus"); xmlReporter << XmlEndTag(); } xmlReporter << XmlEndTag(); @@ -655,12 +655,12 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac if (LOG_WOULD_LOG(spam)) { DistributorInfoGatherer<true> builder( *clusterState, result, idFac, distribution); - _component.getBucketDatabase(bucketSpace).chunkedAll(builder, + _component.getBucketDatabase(bucketSpace).for_each_chunked(std::ref(builder), "BucketManager::processRequestBucketInfoCommands-1"); } else { DistributorInfoGatherer<false> builder( *clusterState, result, idFac, distribution); - _component.getBucketDatabase(bucketSpace).chunkedAll(builder, + _component.getBucketDatabase(bucketSpace).for_each_chunked(std::ref(builder), "BucketManager::processRequestBucketInfoCommands-2"); } _metrics->fullBucketInfoLatency.addValue( diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.cpp new file mode 100644 index 00000000000..bcc471cc903 --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.cpp @@ -0,0 +1,36 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "generic_btree_bucket_database.h" + +namespace storage::bucketdb { + +using document::BucketId; + +// TODO dedupe and unify common code +uint8_t +getMinDiffBits(uint16_t minBits, const BucketId& a, const BucketId& b) { + for (uint32_t i = minBits; i <= std::min(a.getUsedBits(), b.getUsedBits()); i++) { + BucketId a1(i, a.getRawId()); + BucketId b1(i, b.getRawId()); + if (b1.getId() != a1.getId()) { + return i; + } + } + return minBits; +} + +uint8_t next_parent_bit_seek_level(uint8_t minBits, const BucketId& a, const BucketId& b) { + const uint8_t min_used = std::min(a.getUsedBits(), b.getUsedBits()); + assert(min_used >= minBits); // Always monotonically descending towards leaves + for (uint32_t i = minBits; i <= min_used; i++) { + BucketId a1(i, a.getRawId()); + BucketId b1(i, b.getRawId()); + if (b1.getId() != a1.getId()) { + return i; + } + } + // The bit prefix is equal, which means that one node is a parent of the other. In this + // case we have to force the seek to continue from the next level in the tree. + return std::max(min_used, minBits) + 1; +} + +} diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h new file mode 100644 index 00000000000..15de7f3525b --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.h @@ -0,0 +1,242 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/document/bucket/bucketid.h> +#include <vespa/vespalib/btree/btree.h> +#include <vespa/vespalib/btree/minmaxaggregated.h> +#include <vespa/vespalib/btree/minmaxaggrcalc.h> + +namespace storage::bucketdb { + +/** + * Database implementation-specific interface for appending entries + * during a merge() operation. + */ +template <typename ValueT> +struct TrailingInserter { + virtual ~TrailingInserter() = default; + /** + * Insert a new database entry at the end of the current bucket space. + * + * Precondition: the bucket ID must sort after all entries that + * have already been iterated over or inserted via insert_at_end(). + */ + virtual void insert_at_end(const document::BucketId& bucket_id, const ValueT&) = 0; +}; + +/** + * Database implementation-specific interface for accessing bucket + * entries and prepending entries during a merge() operation. + */ +template <typename ValueT> +struct Merger { + virtual ~Merger() = default; + + // TODO this should ideally be separated into read/write functions, but this + // will suffice for now to avoid too many changes. + + /** + * Bucket key/ID of the currently iterated entry. Unless the information stored + * in the DB Entry is needed, using one of these methods should be preferred to + * getting the bucket ID via current_entry(). The underlying DB is expected to + * have cheap access to the ID but _may_ have expensive access to the entry itself. + */ + [[nodiscard]] virtual uint64_t bucket_key() const noexcept = 0; + [[nodiscard]] virtual document::BucketId bucket_id() const noexcept = 0; + /** + * Returns a mutable representation of the currently iterated database + * entry. If changes are made to this object, Result::Update must be + * returned from merge(). Otherwise, mutation visibility is undefined. + */ + [[nodiscard]] virtual ValueT& current_entry() = 0; + /** + * Insert a new entry into the bucket database that is ordered before the + * currently iterated entry. + * + * Preconditions: + * - The bucket ID must sort _before_ the currently iterated + * entry's bucket ID, in "reversed bits" bucket key order. + * - The bucket ID must sort _after_ any entries previously + * inserted with insert_before_current(). + * - The bucket ID must not be the same as a bucket that was + * already iterated over as part of the DB merge() call or inserted + * via a previous call to insert_before_current(). + * Such buckets must be handled by explicitly updating the provided + * entry for the iterated bucket and returning Result::Update. + */ + virtual void insert_before_current(const document::BucketId& bucket_id, const ValueT&) = 0; +}; + +/** + * Interface to be implemented by callers that wish to receive callbacks + * during a bucket merge() operation. + */ +template <typename ValueT> +struct MergingProcessor { + // See merge() for semantics on enum values. + enum class Result { + Update, + KeepUnchanged, + Skip + }; + + virtual ~MergingProcessor() = default; + /** + * Invoked for each existing bucket in the database, in bucket key order. + * The provided Merge instance may be used to access the current entry + * and prepend entries to the DB. + * + * Return value semantics: + * - Result::Update: + * when merge() returns, the changes made to the current entry will + * become visible in the bucket database. + * - Result::KeepUnchanged: + * when merge() returns, the entry will remain in the same state as + * it was when merge() was originally called. + * - Result::Skip: + * when merge() returns, the entry will no longer be part of the DB. + * Any entries added via insert_before_current() _will_ be present. + * + */ + virtual Result merge(Merger<ValueT>&) = 0; + /** + * Invoked once after all existing buckets have been iterated over. + * The provided TrailingInserter instance may be used to append + * an arbitrary number of entries to the database. + * + * This is used to handle elements remaining at the end of a linear + * merge operation. + */ + virtual void insert_remaining_at_end(TrailingInserter<ValueT>&) {} +}; + +/* + * Bucket database implementation built around lock-free single-writer/multiple-readers B+tree. + * + * Key is always treated as a 64-bit uint bucket ID key. + * Value is a 64-bit uint whose semantics are handled by the provided DataStoreTraitsT. + * All DataStore access and value type (un)marshalling is deferred to the traits type, + * allowing this class to be used for both fixed-sized and dynamic-sized value types. + * + * Buckets in our tree are represented by their 64-bit numeric key, in what's known as + * "reversed bit order with appended used-bits" form. I.e. a bucket ID (16, 0xcafe), which + * in its canonical representation has 16 (the used-bits) in its 6 MSBs and 0xcafe in its + * LSBs is transformed into 0x7f53000000000010. This key is logically comprised of two parts: + * - the reversed bucket ID itself (0xcafe - 0x7f53) with all trailing zeroes for unset bits + * - the _non-reversed_ used-bits appended as the LSBs + * + * This particular transformation gives us keys with the following invariants: + * - all distinct bucket IDs map to exactly 1 key + * - buckets with the same ID but different used-bits are ordered in such a way that buckets + * with higher used-bits sort after buckets with lower used-bits + * - the key ordering represents an implicit in-order traversal of the binary bucket tree + * - consequently, all parent buckets are ordered before their child buckets + * + * The in-order traversal invariant is fundamental to many of the algorithms that operate + * on the bucket tree. + */ +template <typename DataStoreTraitsT> +class GenericBTreeBucketDatabase { +public: + using DataStoreType = typename DataStoreTraitsT::DataStoreType; + using ValueType = typename DataStoreTraitsT::ValueType; + using ConstValueRef = typename DataStoreTraitsT::ConstValueRef; + using GenerationHandler = vespalib::GenerationHandler; + + struct KeyUsedBitsMinMaxAggrCalc : vespalib::btree::MinMaxAggrCalc { + constexpr static bool aggregate_over_values() { return false; } + constexpr static int32_t getVal(uint64_t key) noexcept { + static_assert(document::BucketId::CountBits == 6u); + return static_cast<int32_t>(key & 0b11'1111U); // 6 LSB of key contains used-bits + } + }; + + using BTree = vespalib::btree::BTree<uint64_t, uint64_t, + vespalib::btree::MinMaxAggregated, + std::less<>, + vespalib::btree::BTreeDefaultTraits, + KeyUsedBitsMinMaxAggrCalc>; + using BTreeConstIterator = typename BTree::ConstIterator; + + BTree _tree; + DataStoreType _store; + GenerationHandler _generation_handler; + + template <typename... DataStoreArgs> + explicit GenericBTreeBucketDatabase(DataStoreArgs&&... data_store_args) + : _store(std::forward<DataStoreArgs>(data_store_args)...) + {} + + GenericBTreeBucketDatabase(const GenericBTreeBucketDatabase&) = delete; + GenericBTreeBucketDatabase& operator=(const GenericBTreeBucketDatabase&) = delete; + GenericBTreeBucketDatabase(GenericBTreeBucketDatabase&&) = delete; + GenericBTreeBucketDatabase& operator=(GenericBTreeBucketDatabase&&) = delete; + + // TODO move + struct EntryProcessor { + virtual ~EntryProcessor() = default; + /** Return false to stop iterating. */ + virtual bool process(const typename DataStoreTraitsT::ConstValueRef& e) = 0; + }; + + ValueType entry_from_iterator(const BTreeConstIterator& iter) const; + ConstValueRef const_value_ref_from_valid_iterator(const BTreeConstIterator& iter) const; + + static document::BucketId bucket_from_valid_iterator(const BTreeConstIterator& iter); + + BTreeConstIterator find(uint64_t key) const noexcept; + BTreeConstIterator lower_bound(uint64_t key) const noexcept; + BTreeConstIterator begin() const noexcept; + + void clear() noexcept; + [[nodiscard]] size_t size() const noexcept; + [[nodiscard]] bool empty() const noexcept; + [[nodiscard]] vespalib::MemoryUsage memory_usage() const noexcept; + + ValueType get(const document::BucketId& bucket) const; + ValueType get_by_raw_key(uint64_t key) const; + // Return true if bucket existed in DB, false otherwise. + bool remove(const document::BucketId& bucket); + bool remove_by_raw_key(uint64_t key); + // Returns true if bucket pre-existed in the DB, false otherwise + bool update(const document::BucketId& bucket, const ValueType& new_entry); + bool update_by_raw_key(uint64_t bucket_key, const ValueType& new_entry); + + template <typename Func> + void find_parents_and_self(const document::BucketId& bucket, + Func func) const; + + template <typename Func> + void find_parents_self_and_children(const document::BucketId& bucket, + Func func) const; + + void for_each(EntryProcessor& proc, const document::BucketId& after) const; + + document::BucketId getAppropriateBucket(uint16_t minBits, const document::BucketId& bid) const; + + [[nodiscard]] uint32_t child_subtree_count(const document::BucketId& bucket) const; + + const DataStoreType& store() const noexcept { return _store; } + DataStoreType& store() noexcept { return _store; } + + void merge(MergingProcessor<ValueType>& proc); +private: + // Functor is called for each found element in key order, with raw u64 keys and values. + template <typename Func> + BTreeConstIterator find_parents_internal(const typename BTree::FrozenView& frozen_view, + const document::BucketId& bucket, + Func func) const; + template <typename Func> + void find_parents_and_self_internal(const typename BTree::FrozenView& frozen_view, + const document::BucketId& bucket, + Func func) const; + void commit_tree_changes(); + + template <typename DataStoreTraitsT2> friend struct BTreeBuilderMerger; + template <typename DataStoreTraitsT2> friend struct BTreeTrailingInserter; +}; + +uint8_t getMinDiffBits(uint16_t minBits, const document::BucketId& a, const document::BucketId& b); +uint8_t next_parent_bit_seek_level(uint8_t minBits, const document::BucketId& a, const document::BucketId& b); + +} diff --git a/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp new file mode 100644 index 00000000000..6484ad70804 --- /dev/null +++ b/storage/src/vespa/storage/bucketdb/generic_btree_bucket_database.hpp @@ -0,0 +1,494 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "generic_btree_bucket_database.h" + +namespace storage::bucketdb { + +using document::BucketId; + +template <typename DataStoreTraitsT> +BucketId GenericBTreeBucketDatabase<DataStoreTraitsT>::bucket_from_valid_iterator(const BTreeConstIterator& iter) { + return BucketId(BucketId::keyToBucketId(iter.getKey())); +} + +template <typename DataStoreTraitsT> +void GenericBTreeBucketDatabase<DataStoreTraitsT>::commit_tree_changes() { + // TODO break up and refactor + // TODO verify semantics and usage + // TODO make BTree wrapping API which abstracts away all this stuff via reader/writer interfaces + _tree.getAllocator().freeze(); + + auto current_gen = _generation_handler.getCurrentGeneration(); + _store.transferHoldLists(current_gen); + _tree.getAllocator().transferHoldLists(current_gen); + + _generation_handler.incGeneration(); + + auto used_gen = _generation_handler.getFirstUsedGeneration(); + _store.trimHoldLists(used_gen); + _tree.getAllocator().trimHoldLists(used_gen); +} + +template <typename DataStoreTraitsT> +void GenericBTreeBucketDatabase<DataStoreTraitsT>::clear() noexcept { + _tree.clear(); + commit_tree_changes(); +} + +template <typename DataStoreTraitsT> +size_t GenericBTreeBucketDatabase<DataStoreTraitsT>::size() const noexcept { + return _tree.size(); +} + +template <typename DataStoreTraitsT> +bool GenericBTreeBucketDatabase<DataStoreTraitsT>::empty() const noexcept { + return !_tree.begin().valid(); +} + +template <typename DataStoreTraitsT> +vespalib::MemoryUsage GenericBTreeBucketDatabase<DataStoreTraitsT>::memory_usage() const noexcept { + auto mem_usage = _tree.getMemoryUsage(); + mem_usage.merge(_store.getMemoryUsage()); + return mem_usage; +} + +template <typename DataStoreTraitsT> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ValueType +GenericBTreeBucketDatabase<DataStoreTraitsT>::entry_from_iterator(const BTreeConstIterator& iter) const { + if (!iter.valid()) { + return DataStoreTraitsT::make_invalid_value(); + } + const auto value = iter.getData(); + std::atomic_thread_fence(std::memory_order_acquire); + return DataStoreTraitsT::unwrap_from_key_value(_store, iter.getKey(), value); +} + +template <typename DataStoreTraitsT> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ConstValueRef +GenericBTreeBucketDatabase<DataStoreTraitsT>::const_value_ref_from_valid_iterator(const BTreeConstIterator& iter) const { + const auto value = iter.getData(); + std::atomic_thread_fence(std::memory_order_acquire); + return DataStoreTraitsT::unwrap_const_ref_from_key_value(_store, iter.getKey(), value); +} + +template <typename DataStoreTraitsT> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::BTreeConstIterator +GenericBTreeBucketDatabase<DataStoreTraitsT>::lower_bound(uint64_t key) const noexcept { + return _tree.lowerBound(key); +} + +template <typename DataStoreTraitsT> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::BTreeConstIterator +GenericBTreeBucketDatabase<DataStoreTraitsT>::find(uint64_t key) const noexcept { + return _tree.find(key); +} + +template <typename DataStoreTraitsT> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::BTreeConstIterator +GenericBTreeBucketDatabase<DataStoreTraitsT>::begin() const noexcept { + return _tree.begin(); +} + +/* + * Finding the complete set of parents of a given bucket is not obvious how to + * do efficiently, as we only know that the parents are ordered before their + * children, but we do not a-priori know if any exist at all. The Judy DB impl + * does O(b) explicit point lookups (where b is the number of used bits in the + * bucket), starting at the leaf bit and working towards the root. To avoid + * having to re-create iterators and perform a full tree search every time, we + * turn this on its head and start from the root, progressing towards the leaf. + * This allows us to reuse a single iterator and to continue seeking forwards + * from its current position. + * + * To speed up the process of converging on the target bucket without needing + * to check many unrelated subtrees, we let the underlying B-tree automatically + * aggregate the min/max range of the used-bits of all contained bucket keys. + * If we e.g. know that the minimum number of used bits in the DB is 16, we can + * immediately seek to this level in the tree instead of working our way down + * one bit at a time. By definition, no parents can exist above this level. + * This is a very important optimization, as bucket trees are usually very well + * balanced due to randomized distribution of data (combined with a cluster-wide + * minimum tree level imposed by distribution bits). It is common that the minimum + * number of used bits == max number of used bits, i.e. a totally even split. + * This means that for a system without inconsistently split buckets (i.e. no + * parents) we're highly likely to converge on the target bucket in a single seek. + * + * Algorithm: + * + * Core invariant: every subsequent iterator seek performed in this algorithm + * is for a key that is strictly higher than the one the iterator is currently at. + * + * 1. Lbound seek to the lowest key that is known to exclude all already visited + * parents. On the first iteration we use a bit count equal to the minimum number + * of key used-bits in the entire DB, allowing us to potentially skip most subtrees. + * 2. If the current node's key is greater than that of the requested bucket's key, + * we've either descended to--or beyond--it in its own subtree or we've entered + * a disjoint subtree. Since we know that all parents must sort before any given + * child bucket, no more parents may be found at this point. Algorithm terminates. + * 3. As the main body of the loop is entered, we know one of following must hold: + * 3.1 The current node is an explicitly present parent of our bucket. + * 3.2 The current node is contained in a left subtree branch of a parent that + * does not have a bucket explicitly present in the tree. It cannot be in + * a right subtree of any parent, as that would imply the node is ordered + * _after_ our own bucket in an in-order traversal, which would contradict + * the check in step 2 above. + * 4. If the current node contains the requested bucket, we're at a parent + * node of the bucket; add it to the result set. + * If this is _not_ the case, we're in a different subtree. Example: the + * requested bucket has a key whose MSB is 1 but the first bucket in the + * tree has a key with an MSB of 0. Either way we need to update our search + * key to home in on the target subtree where more parents may be found; + * 5. Update the seek key to find the next possible parent. To ensure this key is + * strictly greater than the iterator's current key we find the largest shared + * prefix of bits in common between the current node's key and the requested + * bucket's key. The prefix length + 1 is then the depth in the tree at which the + * two subtrees branch off and diverge. + * The new key is then the MSB prefix length + 1 requested bucket's key with a + * matching number of used-bits set. Forward lbound-seek the iterator to this key. + * `--> TODO elaborate on prefix semantics when they are equal wrt. min used bits + * 6. Iff iterator is still valid, go to step 2 + * + * This algorithm is able to skip through large parts of the tree in a sparsely populated + * tree, but the number of seeks will trend towards O(b - min_bits) as with the legacy + * implementation when a tree is densely populated (where `b` is the used-bits count of the + * most specific node in the tree for the target bucket, and min_bits is the minimum number + * of used-bits for any key in the database). This because all logical inner nodes in the tree + * will have subtrees under them. Even in the worst case we should be more efficient than the + * legacy Judy-based implementation since we've cut any dense search space in half for each + * invocation of seek() on the iterator. + */ +template <typename DataStoreTraitsT> +template <typename Func> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::BTreeConstIterator +GenericBTreeBucketDatabase<DataStoreTraitsT>::find_parents_internal( + const typename BTree::FrozenView& frozen_view, + const BucketId& bucket, + Func func) const +{ + const uint64_t bucket_key = bucket.toKey(); + if (frozen_view.empty()) { + return frozen_view.begin(); // Will be invalid. + } + const auto min_db_bits = frozen_view.getAggregated().getMin(); + assert(min_db_bits >= static_cast<int32_t>(BucketId::minNumBits)); + assert(min_db_bits <= static_cast<int32_t>(BucketId::maxNumBits)); + // Start at the lowest possible tree level no parents can exist above, + // descending towards the bucket itself. + // Note: important to use getId() rather than getRawId(), as min_db_bits may be + // greater than the used bits of the queried bucket. If we used the raw ID, we'd + // end up looking at undefined bits. + const auto first_key = BucketId(min_db_bits, bucket.getId()).toKey(); + auto iter = frozen_view.lowerBound(first_key); + // Try skipping as many levels of the tree as possible as we go. + uint32_t bits = min_db_bits; + while (iter.valid() && (iter.getKey() < bucket_key)) { + auto candidate = BucketId(BucketId::keyToBucketId(iter.getKey())); + if (candidate.contains(bucket)) { + assert(candidate.getUsedBits() >= bits); + func(iter.getKey(), entry_from_iterator(iter)); + } + bits = next_parent_bit_seek_level(bits, candidate, bucket); + const auto parent_key = BucketId(bits, bucket.getRawId()).toKey(); + assert(parent_key > iter.getKey()); + iter.seek(parent_key); + } + return iter; +} + +template <typename DataStoreTraitsT> +template <typename Func> +void GenericBTreeBucketDatabase<DataStoreTraitsT>::find_parents_and_self_internal( + const typename BTree::FrozenView& frozen_view, + const BucketId& bucket, + Func func) const +{ + auto iter = find_parents_internal(frozen_view, bucket, func); + if (iter.valid() && iter.getKey() == bucket.toKey()) { + func(iter.getKey(), entry_from_iterator(iter)); + } +} + +template <typename DataStoreTraitsT> +template <typename Func> +void GenericBTreeBucketDatabase<DataStoreTraitsT>::find_parents_and_self( + const document::BucketId& bucket, + Func func) const +{ + auto view = _tree.getFrozenView(); + find_parents_and_self_internal(view, bucket, std::move(func)); +} + +template <typename DataStoreTraitsT> +template <typename Func> +void GenericBTreeBucketDatabase<DataStoreTraitsT>::find_parents_self_and_children( + const BucketId& bucket, + Func func) const +{ + auto view = _tree.getFrozenView(); + auto iter = find_parents_internal(view, bucket, func); + // `iter` is already pointing at, or beyond, one of the bucket's subtrees. + for (; iter.valid(); ++iter) { + auto candidate = BucketId(BucketId::keyToBucketId(iter.getKey())); + if (bucket.contains(candidate)) { + func(iter.getKey(), entry_from_iterator(iter)); + } else { + break; + } + } +} + +template <typename DataStoreTraitsT> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ValueType +GenericBTreeBucketDatabase<DataStoreTraitsT>::get(const BucketId& bucket) const { + return entry_from_iterator(_tree.find(bucket.toKey())); +} + +template <typename DataStoreTraitsT> +typename GenericBTreeBucketDatabase<DataStoreTraitsT>::ValueType +GenericBTreeBucketDatabase<DataStoreTraitsT>::get_by_raw_key(uint64_t key) const { + return entry_from_iterator(_tree.find(key)); +} + +template <typename DataStoreTraitsT> +bool GenericBTreeBucketDatabase<DataStoreTraitsT>::remove_by_raw_key(uint64_t key) { + auto iter = _tree.find(key); + if (!iter.valid()) { + return false; + } + const auto value = iter.getData(); + DataStoreTraitsT::remove_by_wrapped_value(_store, value); + _tree.remove(iter); + commit_tree_changes(); + return true; +} + +template <typename DataStoreTraitsT> +bool GenericBTreeBucketDatabase<DataStoreTraitsT>::remove(const BucketId& bucket) { + return remove_by_raw_key(bucket.toKey()); +} + +template <typename DataStoreTraitsT> +bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update_by_raw_key(uint64_t bucket_key, + const ValueType& new_entry) +{ + const auto new_value = DataStoreTraitsT::store_and_wrap_value(_store, new_entry); + auto iter = _tree.lowerBound(bucket_key); + const bool pre_existed = (iter.valid() && (iter.getKey() == bucket_key)); + if (pre_existed) { + DataStoreTraitsT::remove_by_wrapped_value(_store, iter.getData()); + // In-place update of value; does not require tree structure modification + std::atomic_thread_fence(std::memory_order_release); // Must ensure visibility when new array ref is observed + iter.writeData(new_value); + } else { + _tree.insert(iter, bucket_key, new_value); + } + commit_tree_changes(); // TODO does publishing a new root imply an implicit memory fence? + return pre_existed; +} + +template <typename DataStoreTraitsT> +bool GenericBTreeBucketDatabase<DataStoreTraitsT>::update(const BucketId& bucket, + const ValueType& new_entry) +{ + return update_by_raw_key(bucket.toKey(), new_entry); +} + +// TODO need snapshot read with guarding +// FIXME semantics of for-each in judy and bit tree DBs differ, former expects lbound, latter ubound..! +// FIXME but bit-tree code says "lowerBound" in impl and "after" in declaration??? +template <typename DataStoreTraitsT> +void GenericBTreeBucketDatabase<DataStoreTraitsT>::for_each(EntryProcessor& proc, const BucketId& after) const { + for (auto iter = _tree.upperBound(after.toKey()); iter.valid(); ++iter) { + // TODO memory fencing once we use snapshots! + if (!proc.process(DataStoreTraitsT::unwrap_const_ref_from_key_value(_store, iter.getKey(), iter.getData()))) { + break; + } + } +} + +/* + * Returns the bucket ID which, based on the buckets already existing in the DB, + * is the most specific location in the tree in which it should reside. This may + * or may not be a bucket that already exists. + * + * Example: if there is a single bucket (1, 1) in the tree, a query for (1, 1) or + * (1, 3) will return (1, 1) as that is the most specific leaf in that subtree. + * A query for (1, 0) will return (1, 0) even though this doesn't currently exist, + * as there is no existing bucket that can contain the queried bucket. It is up to + * the caller to create this bucket according to its needs. + * + * Usually this function will be called with an ID whose used-bits is at max (58), in + * order to find a leaf bucket to route an incoming document operation to. + * + * TODO rename this function, it's very much _not_ obvious what an "appropriate" bucket is..! + * TODO this should be possible to do concurrently + */ +template <typename DataStoreTraitsT> +BucketId GenericBTreeBucketDatabase<DataStoreTraitsT>::getAppropriateBucket(uint16_t minBits, const BucketId& bid) const { + // The bucket tree is ordered in such a way that it represents a + // natural in-order traversal of all buckets, with inner nodes being + // visited before leaf nodes. This means that a lower bound seek will + // never return a parent of a seeked bucket. The iterator will be pointing + // to a bucket that is either the actual bucket given as the argument to + // lowerBound() or the next in-order bucket (or end() if none exists). + // TODO snapshot + auto iter = _tree.lowerBound(bid.toKey()); + if (iter.valid()) { + // Find the first level in the tree where the paths through the bucket tree + // diverge for the target bucket and the current bucket. + minBits = getMinDiffBits(minBits, bucket_from_valid_iterator(iter), bid); + } + // TODO is it better to copy original iterator and do begin() on the copy? + auto first_iter = _tree.begin(); + // Original iterator might be in a different subtree than that of our + // target bucket. If possible, rewind one node to discover any parent or + // leftmost sibling of our node. If there's no such node, we'll still + // discover the greatest equal bit prefix. + if (iter != first_iter) { + --iter; + minBits = getMinDiffBits(minBits, bucket_from_valid_iterator(iter), bid); + } + return BucketId(minBits, bid.getRawId()); +} + +/* + * Enumerate the number of child subtrees under `bucket`. The value returned is in the + * range [0, 2] regardless of how many subtrees are present further down in the tree. + * + * Finding this number is reasonably straight forward; we construct two buckets that + * represent the key ranges for the left and right subtrees under `bucket` and check + * if there are any ranges in the tree's keyspace that are contained in these. + */ +template <typename DataStoreTraitsT> +uint32_t GenericBTreeBucketDatabase<DataStoreTraitsT>::child_subtree_count(const BucketId& bucket) const { + assert(bucket.getUsedBits() < BucketId::maxNumBits); + BucketId lhs_bucket(bucket.getUsedBits() + 1, bucket.getId()); + BucketId rhs_bucket(bucket.getUsedBits() + 1, (1ULL << bucket.getUsedBits()) | bucket.getId()); + + auto iter = _tree.lowerBound(lhs_bucket.toKey()); + if (!iter.valid()) { + return 0; + } + if (lhs_bucket.contains(bucket_from_valid_iterator(iter))) { + iter.seek(rhs_bucket.toKey()); + if (!iter.valid()) { + return 1; // lhs subtree only + } + return (rhs_bucket.contains(bucket_from_valid_iterator(iter)) ? 2 : 1); + } else if (rhs_bucket.contains(bucket_from_valid_iterator(iter))) { + return 1; // rhs subtree only + } + return 0; +} + +template <typename DataStoreTraitsT> +struct BTreeBuilderMerger final : Merger<typename DataStoreTraitsT::ValueType> { + using DBType = GenericBTreeBucketDatabase<DataStoreTraitsT>; + using ValueType = typename DataStoreTraitsT::ValueType; + using BTreeBuilderType = typename DBType::BTree::Builder; + + DBType& _db; + BTreeBuilderType& _builder; + uint64_t _current_key; + uint64_t _current_value; + ValueType _cached_value; + bool _valid_cached_value; + + BTreeBuilderMerger(DBType& db, BTreeBuilderType& builder) + : _db(db), + _builder(builder), + _current_key(0), + _current_value(0), + _cached_value(), + _valid_cached_value(false) + {} + ~BTreeBuilderMerger() override = default; + + uint64_t bucket_key() const noexcept override { + return _current_key; + } + BucketId bucket_id() const noexcept override { + return BucketId(BucketId::keyToBucketId(_current_key)); + } + ValueType& current_entry() override { + if (!_valid_cached_value) { + _cached_value = DataStoreTraitsT::unwrap_from_key_value(_db.store(), _current_key, _current_value); + _valid_cached_value = true; + } + return _cached_value; + } + void insert_before_current(const BucketId& bucket_id, const ValueType& e) override { + const uint64_t bucket_key = bucket_id.toKey(); + assert(bucket_key < _current_key); + const auto new_value = DataStoreTraitsT::store_and_wrap_value(_db.store(), e); + _builder.insert(bucket_key, new_value); + } + + void update_iteration_state(uint64_t key, uint64_t value) { + _current_key = key; + _current_value = value; + _valid_cached_value = false; + } +}; + +template <typename DataStoreTraitsT> +struct BTreeTrailingInserter final : TrailingInserter<typename DataStoreTraitsT::ValueType> { + using DBType = GenericBTreeBucketDatabase<DataStoreTraitsT>; + using ValueType = typename DataStoreTraitsT::ValueType; + using BTreeBuilderType = typename DBType::BTree::Builder; + + DBType& _db; + BTreeBuilderType& _builder; + + BTreeTrailingInserter(DBType& db, BTreeBuilderType& builder) + : _db(db), + _builder(builder) + {} + + ~BTreeTrailingInserter() override = default; + + void insert_at_end(const BucketId& bucket_id, const ValueType& e) override { + const uint64_t bucket_key = bucket_id.toKey(); + const auto new_value = DataStoreTraitsT::store_and_wrap_value(_db.store(), e); + _builder.insert(bucket_key, new_value); + } +}; + +// TODO lbound arg? +template <typename DataStoreTraitsT> +void GenericBTreeBucketDatabase<DataStoreTraitsT>::merge(MergingProcessor<ValueType>& proc) { + typename BTree::Builder builder(_tree.getAllocator()); + BTreeBuilderMerger<DataStoreTraitsT> merger(*this, builder); + + // TODO for_each instead? + for (auto iter = _tree.begin(); iter.valid(); ++iter) { + const uint64_t key = iter.getKey(); + const uint64_t value = iter.getData(); + merger.update_iteration_state(key, value); + + auto result = proc.merge(merger); + + if (result == MergingProcessor<ValueType>::Result::KeepUnchanged) { + builder.insert(key, value); // Reuse array store ref with no changes + } else if (result == MergingProcessor<ValueType>::Result::Update) { + assert(merger._valid_cached_value); // Must actually have been touched + assert(merger._cached_value.valid()); + DataStoreTraitsT::remove_by_wrapped_value(_store, value); + const auto new_value = DataStoreTraitsT::store_and_wrap_value(_store, merger._cached_value); + builder.insert(key, new_value); + } else if (result == MergingProcessor<ValueType>::Result::Skip) { + DataStoreTraitsT::remove_by_wrapped_value(_store, value); + } else { + abort(); + } + } + BTreeTrailingInserter<DataStoreTraitsT> inserter(*this, builder); + proc.insert_remaining_at_end(inserter); + + _tree.assign(builder); + commit_tree_changes(); +} + + +} diff --git a/storage/src/vespa/storage/bucketdb/lockablemap.h b/storage/src/vespa/storage/bucketdb/lockablemap.h index 8b4e403b899..83ecac0a94f 100644 --- a/storage/src/vespa/storage/bucketdb/lockablemap.h +++ b/storage/src/vespa/storage/bucketdb/lockablemap.h @@ -14,6 +14,7 @@ */ #pragma once +#include "abstract_bucket_map.h" #include <map> #include <vespa/vespalib/util/printable.h> #include <vespa/vespalib/stllike/hash_map.h> @@ -26,98 +27,19 @@ namespace storage { -template<typename Map> -class LockableMap : public vespalib::Printable +template <typename Map> +class LockableMap + : public bucketdb::AbstractBucketMap<typename Map::mapped_type> { public: - typedef typename Map::key_type key_type; - typedef typename Map::mapped_type mapped_type; - typedef typename Map::value_type value_type; - typedef typename Map::size_type size_type; - using BucketId = document::BucketId; - struct WrappedEntry; - - /** Responsible for releasing lock in map when out of scope. */ - class LockKeeper { - friend struct WrappedEntry; - LockableMap<Map>& _map; - key_type _key; - bool _locked; - - LockKeeper(LockableMap<Map>& map, key_type key) - : _map(map), _key(key), _locked(true) {} - void unlock() { _map.unlock(_key); _locked = false;} - public: - ~LockKeeper() { if (_locked) unlock(); } - }; - - struct WrappedEntry { - WrappedEntry() : _exists(false), _lockKeeper(), _value() {} - WrappedEntry(WrappedEntry &&) = default; - WrappedEntry & operator = (WrappedEntry &&) = default; - ~WrappedEntry() { } - - mapped_type* operator->() { return &_value; } - const mapped_type* operator->() const { return &_value; } - mapped_type& operator*() { return _value; } - const mapped_type& operator*() const { return _value; } - - const mapped_type *get() const { return &_value; } - mapped_type *get() { return &_value; } - - void write(); - void remove(); - void unlock(); - bool exist() const { return _exists; } - bool preExisted() const { return _preExisted; } - bool locked() const { return _lockKeeper.get(); } - const key_type& getKey() const { return _lockKeeper->_key; }; - - BucketId getBucketId() const { - return BucketId(BucketId::keyToBucketId(getKey())); - } - - protected: - WrappedEntry(LockableMap<Map>& map, - const key_type& key, const mapped_type& val, - const char* clientId, bool preExisted_) - : _exists(true), - _preExisted(preExisted_), - _lockKeeper(new LockKeeper(map, key)), - _value(val), - _clientId(clientId) {} - WrappedEntry(LockableMap<Map>& map, const key_type& key, - const char* clientId) - : _exists(false), - _preExisted(false), - _lockKeeper(new LockKeeper(map, key)), - _value(), - _clientId(clientId) {} - - bool _exists; - bool _preExisted; - std::unique_ptr<LockKeeper> _lockKeeper; - mapped_type _value; - const char* _clientId; - friend class LockableMap<Map>; - }; - - struct LockId { - key_type _key; - const char* _owner; - - LockId() : _key(0), _owner("none - empty token") {} - LockId(key_type key, const char* owner) - : _key(key), _owner(owner) - { - assert(_owner != 0); - } - - size_t hash() const { return _key; } - size_t operator%(size_t val) const { return _key % val; } - bool operator==(const LockId& id) const { return (_key == id._key); } - operator key_type() const { return _key; } - }; + using ParentType = bucketdb::AbstractBucketMap<typename Map::mapped_type>; + using WrappedEntry = typename ParentType::WrappedEntry; + using key_type = typename ParentType::key_type; + using mapped_type = typename ParentType::mapped_type; + using LockId = typename ParentType::LockId; + using EntryMap = typename ParentType::EntryMap; + using Decision = typename ParentType::Decision; + using BucketId = document::BucketId; LockableMap(); ~LockableMap(); @@ -126,14 +48,20 @@ public: return ! (*this == other); } bool operator<(const LockableMap& other) const; - typename Map::size_type size() const; - size_type getMemoryUsage() const; - bool empty() const; + size_t size() const noexcept override; + size_t getMemoryUsage() const noexcept override; + bool empty() const noexcept override; void swap(LockableMap&); - WrappedEntry get(const key_type& key, const char* clientId, - bool createIfNonExisting = false, - bool lockIfNonExistingAndNotCreating = false); + WrappedEntry get(const key_type& key, const char* clientId, bool createIfNonExisting) override; + WrappedEntry get(const key_type& key, const char* clientId) { + return get(key, clientId, false); + } + + bool erase(const key_type& key, const char* clientId, bool haslock) override; + void insert(const key_type& key, const mapped_type& value, + const char* clientId, bool haslock, bool& preExisted) override; + bool erase(const key_type& key, const char* clientId) { return erase(key, clientId, false); } void insert(const key_type& key, const mapped_type& value, @@ -141,42 +69,6 @@ public: { return insert(key, value, clientId, false, preExisted); } void clear(); - enum Decision { ABORT, UPDATE, REMOVE, CONTINUE, DECISION_COUNT }; - - template<typename Functor> - void each(Functor& functor, const char* clientId, - const key_type& first = key_type(), - const key_type& last = key_type() - 1 ); - - template<typename Functor> - void each(const Functor& functor, const char* clientId, - const key_type& first = key_type(), - const key_type& last = key_type() - 1 ); - - template<typename Functor> - void all(Functor& functor, const char* clientId, - const key_type& first = key_type(), - const key_type& last = key_type()-1); - - template<typename Functor> - void all(const Functor& functor, const char* clientId, - const key_type& first = key_type(), - const key_type& last = key_type() - 1 ); - - static constexpr uint32_t DEFAULT_CHUNK_SIZE = 1000; - - /** - * Iterate over the entire database contents, holding the global database - * mutex for `chunkSize` processed entries at a time, yielding the current - * thread between each such such to allow other threads to get a chance - * at acquiring a bucket lock. - */ - template <typename Functor> - void chunkedAll(Functor& functor, - const char* clientId, - vespalib::duration yieldTime = 10us, - uint32_t chunkSize = DEFAULT_CHUNK_SIZE); - void print(std::ostream& out, bool verbose, const std::string& indent) const override; /** @@ -184,18 +76,13 @@ public: * bucket. Usually, there should be only one such bucket, but in the case * of inconsistent splitting, there may be more than one. */ - std::map<BucketId, WrappedEntry> - getContained(const BucketId& bucketId, const char* clientId); - - typedef std::map<BucketId, WrappedEntry> EntryMap; + EntryMap getContained(const BucketId& bucketId, const char* clientId) override; /** * Returns all buckets in the bucket database that can contain the given * bucket, and all buckets that that bucket contains. - * - * If sibling is != 0, also fetch that bucket if possible. */ - EntryMap getAll(const BucketId& bucketId, const char* clientId, const BucketId& sibling = BucketId(0)); + EntryMap getAll(const BucketId& bucketId, const char* clientId) override; /** * Returns true iff bucket has no superbuckets or sub-buckets in the @@ -203,9 +90,9 @@ public: * bucket to become inconsistent will require taking its lock, so by * requiring the lock to be provided here we avoid race conditions. */ - bool isConsistent(const WrappedEntry& entry); + bool isConsistent(const WrappedEntry& entry) override; - void showLockClients(vespalib::asciistream & out) const; + void showLockClients(vespalib::asciistream & out) const override; private: struct hasher { @@ -217,7 +104,7 @@ private: LockIdSet(); ~LockIdSet(); void print(std::ostream& out, bool verbose, const std::string& indent) const; - bool exist(const LockId & lid) const { return this->find(lid) != Hash::end(); } + bool exist(const LockId& lid) const { return this->find(lid) != Hash::end(); } size_t getMemoryUsage() const; }; @@ -243,15 +130,27 @@ private: LockIdSet _lockedKeys; LockWaiters _lockWaiters; - bool erase(const key_type& key, const char* clientId, bool haslock); - void insert(const key_type& key, const mapped_type& value, - const char* clientId, bool haslock, bool& preExisted); - void unlock(const key_type& key); + void unlock(const key_type& key) override; bool findNextKey(key_type& key, mapped_type& val, const char* clientId, std::unique_lock<std::mutex> &guard); bool handleDecision(key_type& key, mapped_type& val, Decision decision); void acquireKey(const LockId & lid, std::unique_lock<std::mutex> &guard); + void do_for_each_mutable(std::function<Decision(uint64_t, mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) override; + + void do_for_each(std::function<Decision(uint64_t, const mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) override; + + void do_for_each_chunked(std::function<Decision(uint64_t, mapped_type&)> func, + const char* clientId, + vespalib::duration yieldTime, + uint32_t chunkSize) override; + /** * Process up to `chunkSize` bucket database entries from--and possibly * including--the bucket pointed to by `key`. @@ -263,17 +162,15 @@ private: * Modifies `key` in-place to point to the next key to process for the next * invocation of this function. */ - template <typename Functor> - bool processNextChunk(Functor& functor, + bool processNextChunk(std::function<Decision(uint64_t, mapped_type&)>& func, key_type& key, const char* clientId, - const uint32_t chunkSize); + uint32_t chunkSize); /** * Returns the given bucket, its super buckets and its sub buckets. */ void getAllWithoutLocking(const BucketId& bucket, - const BucketId& sibling, std::vector<BucketId::Type>& keys); /** diff --git a/storage/src/vespa/storage/bucketdb/lockablemap.hpp b/storage/src/vespa/storage/bucketdb/lockablemap.hpp index 2ca2183ae26..fcdde0a810c 100644 --- a/storage/src/vespa/storage/bucketdb/lockablemap.hpp +++ b/storage/src/vespa/storage/bucketdb/lockablemap.hpp @@ -16,7 +16,7 @@ template<typename Map> LockableMap<Map>::LockIdSet::LockIdSet() : Hash() { } template<typename Map> -LockableMap<Map>::LockIdSet::~LockIdSet() { } +LockableMap<Map>::LockIdSet::~LockIdSet() = default; template<typename Map> size_t @@ -28,7 +28,7 @@ template<typename Map> LockableMap<Map>::LockWaiters::LockWaiters() : _id(0), _map() { } template<typename Map> -LockableMap<Map>::LockWaiters::~LockWaiters() { } +LockableMap<Map>::LockWaiters::~LockWaiters() = default; template<typename Map> size_t @@ -39,35 +39,6 @@ LockableMap<Map>::LockWaiters::insert(const LockId & lid) { } template<typename Map> -void -LockableMap<Map>::WrappedEntry::write() -{ - assert(_lockKeeper->_locked); - assert(_value.verifyLegal()); - bool b; - _lockKeeper->_map.insert(_lockKeeper->_key, _value, _clientId, true, b); - _lockKeeper->unlock(); -} - -template<typename Map> -void -LockableMap<Map>::WrappedEntry::remove() -{ - assert(_lockKeeper->_locked); - assert(_exists); - _lockKeeper->_map.erase(_lockKeeper->_key, _clientId, true); - _lockKeeper->unlock(); -} - -template<typename Map> -void -LockableMap<Map>::WrappedEntry::unlock() -{ - assert(_lockKeeper->_locked); - _lockKeeper->unlock(); -} - -template<typename Map> LockableMap<Map>::LockableMap() : _map(), _lock(), @@ -77,7 +48,7 @@ LockableMap<Map>::LockableMap() {} template<typename Map> -LockableMap<Map>::~LockableMap() {} +LockableMap<Map>::~LockableMap() = default; template<typename Map> bool @@ -98,16 +69,16 @@ LockableMap<Map>::operator<(const LockableMap<Map>& other) const } template<typename Map> -typename Map::size_type -LockableMap<Map>::size() const +size_t +LockableMap<Map>::size() const noexcept { std::lock_guard<std::mutex> guard(_lock); return _map.size(); } template<typename Map> -typename Map::size_type -LockableMap<Map>::getMemoryUsage() const +size_t +LockableMap<Map>::getMemoryUsage() const noexcept { std::lock_guard<std::mutex> guard(_lock); return _map.getMemoryUsage() + _lockedKeys.getMemoryUsage() + @@ -116,7 +87,7 @@ LockableMap<Map>::getMemoryUsage() const template<typename Map> bool -LockableMap<Map>::empty() const +LockableMap<Map>::empty() const noexcept { std::lock_guard<std::mutex> guard(_lock); return _map.empty(); @@ -145,9 +116,7 @@ void LockableMap<Map>::acquireKey(const LockId & lid, std::unique_lock<std::mute template<typename Map> typename LockableMap<Map>::WrappedEntry -LockableMap<Map>::get(const key_type& key, const char* clientId, - bool createIfNonExisting, - bool lockIfNonExistingAndNotCreating) +LockableMap<Map>::get(const key_type& key, const char* clientId, bool createIfNonExisting) { LockId lid(key, clientId); std::unique_lock<std::mutex> guard(_lock); @@ -157,71 +126,34 @@ LockableMap<Map>::get(const key_type& key, const char* clientId, _map.find(key, createIfNonExisting, preExisted); if (it == _map.end()) { - if (lockIfNonExistingAndNotCreating) { - return WrappedEntry(*this, key, clientId); - } else { - return WrappedEntry(); - } + return WrappedEntry(); } _lockedKeys.insert(lid); return WrappedEntry(*this, key, it->second, clientId, preExisted); } -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - -namespace bucketdb { -struct StorageBucketInfo; -struct BucketInfo; -} - -namespace debug { - -template <typename T> struct TypeTag {}; -// Storage -void logBucketDbInsert(uint64_t key, const bucketdb::StorageBucketInfo& entry); -void logBucketDbErase(uint64_t key, const TypeTag<bucketdb::StorageBucketInfo>&); - -// Distributor -void logBucketDbInsert(uint64_t key, const bucketdb::BucketInfo& entry); -void logBucketDbErase(uint64_t key, const TypeTag<bucketdb::BucketInfo>&); - -template <typename DummyValue> -inline void logBucketDbErase(uint64_t, const TypeTag<DummyValue>&) {} -template <typename DummyKey, typename DummyValue> -inline void logBucketDbInsert(const DummyKey&, const DummyValue&) {} - -} - -#endif // ENABLE_BUCKET_OPERATION_LOGGING - template<typename Map> bool -LockableMap<Map>::erase(const key_type& key, const char* clientId, bool haslock) +LockableMap<Map>::erase(const key_type& key, const char* client_id, bool has_lock) { - LockId lid(key, clientId); + LockId lid(key, client_id); std::unique_lock<std::mutex> guard(_lock); - if (!haslock) { + if (!has_lock) { acquireKey(lid, guard); } -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - debug::logBucketDbErase(key, debug::TypeTag<mapped_type>()); -#endif return _map.erase(key); } template<typename Map> void LockableMap<Map>::insert(const key_type& key, const mapped_type& value, - const char* clientId, bool haslock, bool& preExisted) + const char* client_id, bool has_lock, bool& preExisted) { - LockId lid(key, clientId); + LockId lid(key, client_id); std::unique_lock<std::mutex> guard(_lock); - if (!haslock) { + if (!has_lock) { acquireKey(lid, guard); } -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - debug::logBucketDbInsert(key, value); -#endif _map.insert(key, value, preExisted); } @@ -242,12 +174,14 @@ LockableMap<Map>::findNextKey(key_type& key, mapped_type& val, // Wait for next value to unlock. typename Map::iterator it(_map.lower_bound(key)); while (it != _map.end() && _lockedKeys.exist(LockId(it->first, ""))) { - typename LockWaiters::Key waitId(_lockWaiters.insert(LockId(it->first, clientId))); + auto wait_id = _lockWaiters.insert(LockId(it->first, clientId)); _cond.wait(guard); - _lockWaiters.erase(waitId); + _lockWaiters.erase(wait_id); it = _map.lower_bound(key); } - if (it == _map.end()) return true; + if (it == _map.end()) { + return true; + } key = it->first; val = it->second; return false; @@ -260,127 +194,60 @@ LockableMap<Map>::handleDecision(key_type& key, mapped_type& val, { bool b; switch (decision) { - case UPDATE: _map.insert(key, val, b); - break; - case REMOVE: _map.erase(key); - break; - case ABORT: return true; - case CONTINUE: break; - default: - HDR_ABORT("should not be reached"); + case Decision::UPDATE: + _map.insert(key, val, b); + break; + case Decision::REMOVE: + _map.erase(key); + break; + case Decision::ABORT: + return true; + case Decision::CONTINUE: + break; + default: + HDR_ABORT("should not be reached"); } return false; } template<typename Map> -template<typename Functor> -void -LockableMap<Map>::each(Functor& functor, const char* clientId, - const key_type& first, const key_type& last) -{ - key_type key = first; - mapped_type val; - Decision decision; - { - std::unique_lock<std::mutex> guard(_lock); - if (findNextKey(key, val, clientId, guard) || key > last) return; - _lockedKeys.insert(LockId(key, clientId)); - } - try{ - while (true) { - decision = functor(const_cast<const key_type&>(key), val); - std::unique_lock<std::mutex> guard(_lock); - _lockedKeys.erase(LockId(key, clientId)); - _cond.notify_all(); - if (handleDecision(key, val, decision)) return; - ++key; - if (findNextKey(key, val, clientId, guard) || key > last) return; - _lockedKeys.insert(LockId(key, clientId)); - } - } catch (...) { - // Assuming only the functor call can throw exceptions, we need - // to unlock the current key before exiting - std::lock_guard<std::mutex> guard(_lock); - _lockedKeys.erase(LockId(key, clientId)); - _cond.notify_all(); - throw; - } -} - -template<typename Map> -template<typename Functor> -void -LockableMap<Map>::each(const Functor& functor, const char* clientId, - const key_type& first, const key_type& last) -{ - key_type key = first; - mapped_type val; - Decision decision; - { - std::unique_lock<std::mutex> guard(_lock); - if (findNextKey(key, val, clientId, guard) || key > last) return; - _lockedKeys.insert(LockId(key, clientId)); - } - try{ - while (true) { - decision = functor(const_cast<const key_type&>(key), val); - std::unique_lock<std::mutex> guard(_lock); - _lockedKeys.erase(LockId(key, clientId)); - _cond.notify_all(); - if (handleDecision(key, val, decision)) return; - ++key; - if (findNextKey(key, val, clientId, guard) || key > last) return; - _lockedKeys.insert(LockId(key, clientId)); - } - } catch (...) { - // Assuming only the functor call can throw exceptions, we need - // to unlock the current key before exiting - std::lock_guard<std::mutex> guard(_lock); - _lockedKeys.erase(LockId(key, clientId)); - _cond.notify_all(); - throw; - } -} - -template<typename Map> -template<typename Functor> -void -LockableMap<Map>::all(Functor& functor, const char* clientId, - const key_type& first, const key_type& last) +void LockableMap<Map>::do_for_each_mutable(std::function<Decision(uint64_t, mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) { key_type key = first; mapped_type val; std::unique_lock<std::mutex> guard(_lock); while (true) { if (findNextKey(key, val, clientId, guard) || key > last) return; - Decision d(functor(const_cast<const key_type&>(key), val)); + Decision d(func(const_cast<const key_type&>(key), val)); if (handleDecision(key, val, d)) return; ++key; } } template<typename Map> -template<typename Functor> -void -LockableMap<Map>::all(const Functor& functor, const char* clientId, - const key_type& first, const key_type& last) +void LockableMap<Map>::do_for_each(std::function<Decision(uint64_t, const mapped_type&)> func, + const char* clientId, + const key_type& first, + const key_type& last) { key_type key = first; mapped_type val; std::unique_lock<std::mutex> guard(_lock); while (true) { if (findNextKey(key, val, clientId, guard) || key > last) return; - Decision d(functor(const_cast<const key_type&>(key), val)); - assert(d == ABORT || d == CONTINUE); + Decision d(func(const_cast<const key_type&>(key), val)); + assert(d == Decision::ABORT || d == Decision::CONTINUE); if (handleDecision(key, val, d)) return; ++key; } } template <typename Map> -template <typename Functor> bool -LockableMap<Map>::processNextChunk(Functor& functor, +LockableMap<Map>::processNextChunk(std::function<Decision(uint64_t, mapped_type&)>& func, key_type& key, const char* clientId, const uint32_t chunkSize) @@ -391,7 +258,7 @@ LockableMap<Map>::processNextChunk(Functor& functor, if (findNextKey(key, val, clientId, guard)) { return false; } - Decision d(functor(const_cast<const key_type&>(key), val)); + Decision d(func(const_cast<const key_type&>(key), val)); if (handleDecision(key, val, d)) { return false; } @@ -401,15 +268,13 @@ LockableMap<Map>::processNextChunk(Functor& functor, } template <typename Map> -template <typename Functor> -void -LockableMap<Map>::chunkedAll(Functor& functor, - const char* clientId, - vespalib::duration yieldTime, - uint32_t chunkSize) +void LockableMap<Map>::do_for_each_chunked(std::function<Decision(uint64_t, mapped_type&)> func, + const char* clientId, + vespalib::duration yieldTime, + uint32_t chunkSize) { key_type key{}; - while (processNextChunk(functor, key, clientId, chunkSize)) { + while (processNextChunk(func, key, clientId, chunkSize)) { // Rationale: delay iteration for as short a time as possible while // allowing another thread blocked on the main DB mutex to acquire it // in the meantime. Simply yielding the thread does not have the @@ -591,7 +456,7 @@ LockableMap<Map>::addAndLockResults( uint8_t getMinDiffBits(uint16_t minBits, const document::BucketId& a, const document::BucketId& b); template<typename Map> -std::map<document::BucketId, typename LockableMap<Map>::WrappedEntry> +typename LockableMap<Map>::EntryMap LockableMap<Map>::getContained(const BucketId& bucket, const char* clientId) { @@ -626,7 +491,6 @@ LockableMap<Map>::getContained(const BucketId& bucket, template<typename Map> void LockableMap<Map>::getAllWithoutLocking(const BucketId& bucket, - const BucketId& sibling, std::vector<BucketId::Type>& keys) { BucketId result; @@ -674,26 +538,21 @@ LockableMap<Map>::getAllWithoutLocking(const BucketId& bucket, break; } } - - if (sibling.getRawId() != 0) { - keys.push_back(sibling.toKey()); - } } /** * Returns the given bucket, its super buckets and its sub buckets. */ template<typename Map> -std::map<document::BucketId, typename LockableMap<Map>::WrappedEntry> -LockableMap<Map>::getAll(const BucketId& bucket, const char* clientId, - const BucketId& sibling) +typename LockableMap<Map>::EntryMap +LockableMap<Map>::getAll(const BucketId& bucket, const char* clientId) { std::unique_lock<std::mutex> guard(_lock); std::map<BucketId, WrappedEntry> results; std::vector<BucketId::Type> keys; - getAllWithoutLocking(bucket, sibling, keys); + getAllWithoutLocking(bucket, keys); addAndLockResults(keys, clientId, results, guard); @@ -706,10 +565,9 @@ LockableMap<Map>::isConsistent(const typename LockableMap<Map>::WrappedEntry& en { std::lock_guard<std::mutex> guard(_lock); - BucketId sibling(0); std::vector<BucketId::Type> keys; - getAllWithoutLocking(entry.getBucketId(), sibling, keys); + getAllWithoutLocking(entry.getBucketId(), keys); assert(keys.size() >= 1); assert(keys.size() != 1 || keys[0] == entry.getKey()); diff --git a/storage/src/vespa/storage/bucketdb/stdmapwrapper.h b/storage/src/vespa/storage/bucketdb/stdmapwrapper.h deleted file mode 100644 index 889227f1747..00000000000 --- a/storage/src/vespa/storage/bucketdb/stdmapwrapper.h +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class StdMapWrapper - * @ingroup bucketdb - * - * @brief Wrapper for std::map to add functionality in JudyMultiMap. - * - * To remove the need for partial template specialization in lockablemap - */ - -#pragma once - -#include <map> -#include <vespa/vespalib/util/printable.h> -#include <ostream> - -namespace storage { - -template<typename Key, typename Value> -class StdMapWrapper : public std::map<Key, Value>, - public vespalib::Printable -{ -public: - StdMapWrapper() {} - - virtual void print(std::ostream& out, bool verbose, - const std::string& indent) const; - - typename std::map<Key, Value>::iterator find(Key key); - - typename std::map<Key, Value>::iterator find(Key key, bool insert, bool&); - - void insert(Key key, const Value& val, bool&); - - uint32_t getMemoryUsage() const; -}; - -template<class Key, class Value> -uint32_t -StdMapWrapper<Key, Value>::getMemoryUsage() const -{ - Value val; - - return (32 + sizeof(val)) * this->size(); -} - -template<class Key, class Value> -void -StdMapWrapper<Key, Value>::print(std::ostream& out, - bool, - const std::string& indent) const -{ - out << "StdMapWrapper("; - for (typename std::map<Key, Value>::const_iterator i = this->begin(); - i != this->end(); ++i) - { - out << "\n" << indent << " " << "Key: " << i->first << ", Value: " - << i->second; - } - out << ")"; -} - -template<class Key, class Value> -inline typename std::map<Key, Value>::iterator -StdMapWrapper<Key, Value>:: -find(Key key) -{ - bool tmp; - return find(key, false, tmp); -} - -template<class Key, class Value> -inline typename std::map<Key, Value>::iterator -StdMapWrapper<Key, Value>:: -find(Key key, bool insertIfNonExisting, bool&) -{ - if (insertIfNonExisting) { - std::pair<typename std::map<Key, Value>::iterator, bool> result - = std::map<Key, Value>::insert(std::pair<Key, Value>(key, Value())); - return result.first; - } else { - return std::map<Key, Value>::find(key); - } -} - -template<class Key, class Value> -void -StdMapWrapper<Key, Value>:: -insert(Key key, const Value& val, bool&) -{ - this->operator[](key) = val; -} - -} - diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp index 84352df5ec9..ed83ad268e5 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp @@ -462,13 +462,13 @@ namespace { _next(), _alreadySet(0) {} StorBucketDatabase::Decision operator()( - uint64_t revBucket, StorBucketDatabase::Entry& entry) + uint64_t revBucket, const StorBucketDatabase::Entry& entry) { BucketId bucket(BucketId::keyToBucketId(revBucket)); if (bucket == _iterator) { //LOG(spam, "Ignoring bucket %s as it has value of current " // "iterator", bucket.toString().c_str()); - return StorBucketDatabase::CONTINUE; + return StorBucketDatabase::Decision::CONTINUE; } _iterator = bucket; if (entry.disk != _disk) { @@ -487,10 +487,10 @@ namespace { LOG(spam, "Aborting iterating for disk %u as we have " "enough results. Leaving iterator at %s", uint32_t(_disk), _iterator.toString().c_str()); - return StorBucketDatabase::ABORT; + return StorBucketDatabase::Decision::ABORT; } } - return StorBucketDatabase::CONTINUE; + return StorBucketDatabase::Decision::CONTINUE; } }; } @@ -513,9 +513,10 @@ StorageBucketDBInitializer::sendReadBucketInfo(spi::PartitionId disk, document:: NextBucketOnDiskFinder finder(disk, state._databaseIterator, count); LOG(spam, "Iterating bucket db further. Starting at iterator %s", state._databaseIterator.toString().c_str()); - _system.getBucketDatabase(bucketSpace).all(finder, - "StorageBucketDBInitializer::readBucketInfo", - state._databaseIterator.stripUnused().toKey()); + _system.getBucketDatabase(bucketSpace).for_each( + std::ref(finder), + "StorageBucketDBInitializer::readBucketInfo", + state._databaseIterator.stripUnused().toKey()); if (finder._alreadySet > 0) { _metrics._infoSetByLoad.inc(finder._alreadySet); _state._infoSetByLoad += finder._alreadySet; diff --git a/storage/src/vespa/storage/bucketdb/storbucketdb.cpp b/storage/src/vespa/storage/bucketdb/storbucketdb.cpp index df9e8e4e5b5..351fac8a69f 100644 --- a/storage/src/vespa/storage/bucketdb/storbucketdb.cpp +++ b/storage/src/vespa/storage/bucketdb/storbucketdb.cpp @@ -1,11 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storbucketdb.h" +#include "btree_lockable_map.h" #include "judymultimap.hpp" +#include "lockablemap.h" #include <vespa/log/log.h> LOG_SETUP(".storage.bucketdb.stor_bucket_db"); +using document::BucketId; + namespace storage { namespace bucketdb { @@ -16,15 +20,15 @@ print(std::ostream& out, bool, const std::string&) const out << info << ", disk " << disk; } -bool StorageBucketInfo::operator == (const StorageBucketInfo & b) const { +bool StorageBucketInfo::operator==(const StorageBucketInfo& b) const { return disk == b.disk; } -bool StorageBucketInfo::operator != (const StorageBucketInfo & b) const { +bool StorageBucketInfo::operator!=(const StorageBucketInfo& b) const { return !(*this == b); } -bool StorageBucketInfo::operator < (const StorageBucketInfo & b) const { +bool StorageBucketInfo::operator<(const StorageBucketInfo& b) const { return disk < b.disk; } @@ -34,8 +38,17 @@ operator<<(std::ostream& out, const StorageBucketInfo& info) { return out; } +std::unique_ptr<AbstractBucketMap<StorageBucketInfo>> make_default_db() { + return std::make_unique<BTreeLockableMap<StorageBucketInfo>>(); + //return std::make_unique<LockableMap<JudyMultiMap<StorageBucketInfo>>>(); +} + } // bucketdb +StorBucketDatabase::StorBucketDatabase() + : _impl(bucketdb::make_default_db()) +{} + void StorBucketDatabase::insert(const document::BucketId& bucket, const bucketdb::StorageBucketInfo& entry, @@ -43,26 +56,14 @@ StorBucketDatabase::insert(const document::BucketId& bucket, { assert(entry.disk != 0xff); bool preExisted; -#if __WORDSIZE == 64 - return LockableMap<JudyMultiMap<Entry> >::insert( - bucket.toKey(), entry, clientId, preExisted); -#else - return LockableMap<StdMapWrapper<document::BucketId::Type, Entry> >::insert( - bucket.toKey(), entry, clientId, preExisted); -#endif + return _impl->insert(bucket.toKey(), entry, clientId, false, preExisted); } bool StorBucketDatabase::erase(const document::BucketId& bucket, const char* clientId) { -#if __WORDSIZE == 64 - return LockableMap<JudyMultiMap<Entry> >::erase( - bucket.stripUnused().toKey(), clientId); -#else - return LockableMap<StdMapWrapper<document::BucketId::Type, Entry> >::erase( - bucket.stripUnused().toKey(), clientId); -#endif + return _impl->erase(bucket.stripUnused().toKey(), clientId, false); } StorBucketDatabase::WrappedEntry @@ -71,16 +72,60 @@ StorBucketDatabase::get(const document::BucketId& bucket, Flag flags) { bool createIfNonExisting = (flags & CREATE_IF_NONEXISTING); - bool lockIfNonExisting = (flags & LOCK_IF_NONEXISTING_AND_NOT_CREATING); -#if __WORDSIZE == 64 - return LockableMap<JudyMultiMap<Entry> >::get( - bucket.stripUnused().toKey(), clientId, createIfNonExisting, - lockIfNonExisting); -#else - return LockableMap<StdMapWrapper<document::BucketId::Type, Entry> >::get( - bucket.stripUnused().toKey(), clientId, - createIfNonExisting, lockIfNonExisting); -#endif + return _impl->get(bucket.stripUnused().toKey(), clientId, createIfNonExisting); +} + +size_t StorBucketDatabase::size() const { + return _impl->size(); +} + +size_t StorBucketDatabase::getMemoryUsage() const { + return _impl->getMemoryUsage(); +} + +void StorBucketDatabase::showLockClients(vespalib::asciistream& out) const { + _impl->showLockClients(out); +} + +StorBucketDatabase::EntryMap +StorBucketDatabase::getAll(const BucketId& bucketId, const char* clientId) { + return _impl->getAll(bucketId, clientId); +} + +StorBucketDatabase::EntryMap +StorBucketDatabase::getContained(const BucketId& bucketId, const char* clientId) { + return _impl->getContained(bucketId, clientId); +} + +bool StorBucketDatabase::isConsistent(const WrappedEntry& entry) { + return _impl->isConsistent(entry); +} + +void StorBucketDatabase::for_each_chunked( + std::function<Decision(uint64_t, const bucketdb::StorageBucketInfo&)> func, + const char* clientId, + vespalib::duration yieldTime, + uint32_t chunkSize) +{ + _impl->for_each_chunked(std::move(func), clientId, yieldTime, chunkSize); +} + +void StorBucketDatabase::for_each_mutable( + std::function<Decision(uint64_t, bucketdb::StorageBucketInfo&)> func, + const char* clientId, + const key_type& first, + const key_type& last) +{ + _impl->for_each_mutable(std::move(func), clientId, first, last); +} + +void StorBucketDatabase::for_each( + std::function<Decision(uint64_t, const bucketdb::StorageBucketInfo&)> func, + const char* clientId, + const key_type& first, + const key_type& last) +{ + _impl->for_each(std::move(func), clientId, first, last); } template class JudyMultiMap<bucketdb::StorageBucketInfo>; diff --git a/storage/src/vespa/storage/bucketdb/storbucketdb.h b/storage/src/vespa/storage/bucketdb/storbucketdb.h index 15d1004a00d..49349efbaaa 100644 --- a/storage/src/vespa/storage/bucketdb/storbucketdb.h +++ b/storage/src/vespa/storage/bucketdb/storbucketdb.h @@ -1,51 +1,84 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class StorageBucketInfo - * \ingroup bucketdb - * - * \brief An entry in the storage bucket database. - * - * \class StorBucketDatabase - * \ingroup bucketdb - * - * \brief The storage bucket database. - */ #pragma once -#include "judymultimap.h" -#include "lockablemap.h" -#include "stdmapwrapper.h" +#include "abstract_bucket_map.h" #include "storagebucketinfo.h" #include <vespa/storageapi/defs.h> +#include <memory> namespace storage { - -class StorBucketDatabase -#if __WORDSIZE == 64 - : public LockableMap<JudyMultiMap<bucketdb::StorageBucketInfo> > -#else -# warning Bucket database cannot use Judy on non-64 bit platforms - : public LockableMap<StdMapWrapper<document::BucketId::Type, bucketdb::StorageBucketInfo> > -#endif -{ +class StorBucketDatabase { + std::unique_ptr<bucketdb::AbstractBucketMap<bucketdb::StorageBucketInfo>> _impl; public: + using Entry = bucketdb::StorageBucketInfo; + using key_type = bucketdb::AbstractBucketMap<Entry>::key_type; + using Decision = bucketdb::AbstractBucketMap<Entry>::Decision; + using WrappedEntry = bucketdb::AbstractBucketMap<Entry>::WrappedEntry; + using EntryMap = bucketdb::AbstractBucketMap<Entry>::EntryMap; + using BucketId = document::BucketId; + enum Flag { NONE = 0, - CREATE_IF_NONEXISTING = 1, - LOCK_IF_NONEXISTING_AND_NOT_CREATING = 2 + CREATE_IF_NONEXISTING = 1 }; - typedef bucketdb::StorageBucketInfo Entry; - StorBucketDatabase() {}; + StorBucketDatabase(); void insert(const document::BucketId&, const bucketdb::StorageBucketInfo&, const char* clientId); bool erase(const document::BucketId&, const char* clientId); - WrappedEntry get(const document::BucketId& bucket, const char* clientId, - Flag flags = NONE); + WrappedEntry get(const document::BucketId& bucket, const char* clientId, Flag flags = NONE); + + size_t size() const; + + /** + * Returns all buckets in the bucket database that can contain the given + * bucket, and all buckets that that bucket contains. + */ + EntryMap getAll(const BucketId& bucketId, const char* clientId); + + /** + * Returns all buckets in the bucket database that can contain the given + * bucket. Usually, there should be only one such bucket, but in the case + * of inconsistent splitting, there may be more than one. + */ + EntryMap getContained(const BucketId& bucketId, const char* clientId); + + /** + * Iterate over the entire database contents, holding the global database + * mutex for `chunkSize` processed entries at a time, yielding the current + * thread between each such such to allow other threads to get a chance + * at acquiring a bucket lock. + */ + void for_each_chunked(std::function<Decision(uint64_t, const bucketdb::StorageBucketInfo&)> func, + const char* clientId, + vespalib::duration yieldTime = 10us, + uint32_t chunkSize = bucketdb::AbstractBucketMap<bucketdb::StorageBucketInfo>::DEFAULT_CHUNK_SIZE); + + void for_each_mutable(std::function<Decision(uint64_t, bucketdb::StorageBucketInfo&)> func, + const char* clientId, + const key_type& first = key_type(), + const key_type& last = key_type() - 1); + + void for_each(std::function<Decision(uint64_t, const bucketdb::StorageBucketInfo&)> func, + const char* clientId, + const key_type& first = key_type(), + const key_type& last = key_type() - 1); + + /** + * Returns true iff bucket has no superbuckets or sub-buckets in the + * database. Usage assumption is that any operation that can cause the + * bucket to become inconsistent will require taking its lock, so by + * requiring the lock to be provided here we avoid race conditions. + */ + bool isConsistent(const WrappedEntry& entry); + + size_t getMemoryUsage() const; + void showLockClients(vespalib::asciistream & out) const; + }; } // storage diff --git a/storage/src/vespa/storage/common/content_bucket_space_repo.h b/storage/src/vespa/storage/common/content_bucket_space_repo.h index 0d4ddb86bcf..3fe99e831bc 100644 --- a/storage/src/vespa/storage/common/content_bucket_space_repo.h +++ b/storage/src/vespa/storage/common/content_bucket_space_repo.h @@ -31,7 +31,7 @@ public: void forEachBucket(Functor &functor, const char *clientId) const { for (const auto &elem : _map) { - elem.second->bucketDatabase().all(functor, clientId); + elem.second->bucketDatabase().for_each(std::ref(functor), clientId); } } @@ -39,7 +39,7 @@ public: void forEachBucketChunked(Functor &functor, const char *clientId) const { for (const auto &elem : _map) { - elem.second->bucketDatabase().chunkedAll(functor, clientId); + elem.second->bucketDatabase().for_each_chunked(std::ref(functor), clientId); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index eec8ac3e327..021c94464df 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -848,7 +848,7 @@ namespace { StorBucketDatabase::Decision operator()(document::BucketId::Type, StorBucketDatabase::Entry& data) { data.info.setActive(false); - return StorBucketDatabase::UPDATE; + return StorBucketDatabase::Decision::UPDATE; } }; } @@ -870,7 +870,8 @@ FileStorManager::updateState() if (contentBucketSpace.getNodeUpInLastNodeStateSeenByProvider() && !nodeUp) { LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database for bucket space %s", bucketSpace.toString().c_str()); Deactivator deactivator; - contentBucketSpace.bucketDatabase().all(deactivator, "FileStorManager::updateState"); + contentBucketSpace.bucketDatabase().for_each_mutable( + std::ref(deactivator), "FileStorManager::updateState"); } contentBucketSpace.setNodeUpInLastNodeStateSeenByProvider(nodeUp); spi::ClusterState spiState(*derivedClusterState, _component.getIndex(), *contentBucketSpace.getDistribution()); diff --git a/storageserver/src/vespa/storageserver/app/distributorprocess.h b/storageserver/src/vespa/storageserver/app/distributorprocess.h index 48fa331ba54..e416f285268 100644 --- a/storageserver/src/vespa/storageserver/app/distributorprocess.h +++ b/storageserver/src/vespa/storageserver/app/distributorprocess.h @@ -12,7 +12,7 @@ namespace storage { -class DistributorProcess : public Process { +class DistributorProcess final : public Process { DistributorNodeContext _context; DistributorNode::NeedActiveState _activeFlag; bool _use_btree_database; @@ -23,7 +23,7 @@ class DistributorProcess : public Process { _visitDispatcherConfigHandler; public: - DistributorProcess(const config::ConfigUri & configUri); + explicit DistributorProcess(const config::ConfigUri & configUri); ~DistributorProcess() override; void shutdown() override; diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h index 27c2db8ed6f..b24640cbbd7 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h @@ -36,8 +36,8 @@ protected: ServiceLayerNodeContext _context; public: - ServiceLayerProcess(const config::ConfigUri & configUri); - ~ServiceLayerProcess(); + explicit ServiceLayerProcess(const config::ConfigUri & configUri); + ~ServiceLayerProcess() override; void shutdown() override; |