diff options
20 files changed, 493 insertions, 144 deletions
diff --git a/container-disc/src/main/sh/vespa-start-container-daemon.sh b/container-disc/src/main/sh/vespa-start-container-daemon.sh index a0cbd9d9186..223124c29d6 100755 --- a/container-disc/src/main/sh/vespa-start-container-daemon.sh +++ b/container-disc/src/main/sh/vespa-start-container-daemon.sh @@ -110,8 +110,27 @@ configure_cpu() { } configure_numactl() { - numactlcmd=$(get_numa_ctl_cmd) - log_message debug "starting ${VESPA_SERVICE_NAME} for ${VESPA_CONFIG_ID} with numactl command : $numactlcmd" + log_message debug "starting ${VESPA_SERVICE_NAME} for ${VESPA_CONFIG_ID}" + if numactl --interleave all true &> /dev/null; then + # We are allowed to use numactl + numnodes=$(numactl --hardware | + grep available | + awk '$3 == "nodes" { print $2 }') + if [ "$VESPA_AFFINITY_CPU_SOCKET" ] && + [ "$numnodes" -gt 1 ] + then + node=$(($VESPA_AFFINITY_CPU_SOCKET % $numnodes)) + log_message debug "with affinity to $VESPA_AFFINITY_CPU_SOCKET out of $numnodes cpu sockets" + numactlcmd="numactl --cpunodebind=$node --membind=$node" + else + log_message debug "with memory interleaving on all nodes" + numactlcmd="numactl --interleave all" + fi + else + log_message debug "without numactl (no permission or not available)" + numactlcmd="" + fi + log_message debug "numactlcmd: $numactlcmd" } configure_gcopts() { diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp index a49816f1a52..084bdf90a14 100644 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp +++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp @@ -36,6 +36,7 @@ LOG_SETUP("tensorattribute_test"); using document::WrongTensorTypeException; using search::AttributeGuard; using search::AttributeVector; +using search::CompactionStrategy; using search::attribute::DistanceMetric; using search::attribute::HnswIndexParams; using search::queryeval::GlobalFilter; @@ -199,11 +200,18 @@ public: void trim_hold_lists(generation_t first_used_gen) override { _trim_gen = first_used_gen; } + bool consider_compact(const CompactionStrategy&) override { + return false; + } + vespalib::MemoryUsage update_stat() override { + return vespalib::MemoryUsage(); + } vespalib::MemoryUsage memory_usage() const override { ++_memory_usage_cnt; return vespalib::MemoryUsage(); } void get_state(const vespalib::slime::Inserter&) const override {} + void shrink_lid_space(uint32_t) override { } std::unique_ptr<NearestNeighborIndexSaver> make_saver() const override { if (_index_value != 0) { return std::make_unique<MockIndexSaver>(_index_value); diff --git a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp index 3f7ec140781..9e7f082e237 100644 --- a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp +++ b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp @@ -1,5 +1,6 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchcommon/common/compaction_strategy.h> #include <vespa/searchlib/common/bitvector.h> #include <vespa/searchlib/tensor/distance_functions.h> #include <vespa/searchlib/tensor/doc_vector_access.h> @@ -20,7 +21,7 @@ using namespace search::tensor; using namespace vespalib::slime; using vespalib::Slime; using search::BitVector; - +using search::CompactionStrategy; template <typename FloatType> class MyDocVectorAccess : public DocVectorAccess { @@ -42,6 +43,8 @@ public: ArrayRef ref(_vectors[docid]); return vespalib::eval::TypedCells(ref); } + + void clear() { _vectors.clear(); } }; struct LevelGenerator : public RandomLevelGenerator { @@ -111,6 +114,10 @@ public: MemoryUsage memory_usage() const { return index->memory_usage(); } + MemoryUsage commit_and_update_stat() { + commit(); + return index->update_stat(); + } void expect_entry_point(uint32_t exp_docid, uint32_t exp_level) { EXPECT_EQ(exp_docid, index->get_entry_docid()); EXPECT_EQ(exp_level, index->get_entry_level()); @@ -166,6 +173,8 @@ public: docid, hit.docid, hit.distance, thr); } } + + FloatVectors& get_vectors() { return vectors; } }; @@ -533,6 +542,113 @@ TEST_F(HnswIndexTest, shrink_called_heuristic) EXPECT_TRUE(index->check_link_symmetry()); } +namespace { + +template <class ResultGraph> +ResultGraph +make_graph_helper(HnswIndex& index) +{ + using LevelArrayRef = HnswGraph::LevelArrayRef; + using LinkArrayRef = HnswGraph::LinkArrayRef; + auto& graph = index.get_graph(); + ResultGraph result(graph.size()); + assert(!graph.get_node_ref(0).valid()); + for (uint32_t doc_id = 1; doc_id < graph.size(); ++doc_id) { + auto& node = result[doc_id]; + auto node_ref = graph.get_node_ref(doc_id); + if constexpr (std::is_same_v<std::remove_reference_t<decltype(node)>, uint32_t>) { + node = node_ref.ref(); + } else { + LevelArrayRef level_array(graph.get_level_array(node_ref)); + for (uint32_t level = 0; level < level_array.size(); ++level) { + if constexpr (std::is_same_v<std::remove_reference_t<decltype(node)>, std::vector<uint32_t>>) { + node.emplace_back(level_array[level].load_relaxed().ref()); + } else { + LinkArrayRef link_array(graph.get_link_array(level_array, level)); + node.emplace_back(std::vector<uint32_t>(link_array.begin(), link_array.end())); + } + } + } + } + return result; +} + +using LinkGraph = std::vector<std::vector<std::vector<uint32_t>>>; + +LinkGraph +make_link_graph(HnswIndex& index) +{ + return make_graph_helper<LinkGraph>(index); +} + +using LinkArrayRefGraph = std::vector<std::vector<uint32_t>>; + +LinkArrayRefGraph +make_link_array_refs(HnswIndex& index) +{ + return make_graph_helper<LinkArrayRefGraph>(index); +} + +using LevelArrayRefGraph = std::vector<uint32_t>; + +LevelArrayRefGraph +make_level_array_refs(HnswIndex& index) +{ + return make_graph_helper<LevelArrayRefGraph>(index); +} + +} + +TEST_F(HnswIndexTest, hnsw_graph_is_compacted) +{ + init(true); + get_vectors().clear(); + uint32_t doc_id = 1; + for (uint32_t x = 0; x < 100; ++x) { + for (uint32_t y = 0; y < 50; ++y) { + get_vectors().set(doc_id, { float(x), float(y) }); + ++doc_id; + } + } + uint32_t doc_id_end = doc_id; + for (doc_id = 1; doc_id < doc_id_end; ++doc_id) { + add_document(doc_id); + } + for (doc_id = 10; doc_id < doc_id_end; ++doc_id) { + remove_document(doc_id); + } + auto mem_1 = commit_and_update_stat(); + auto link_graph_1 = make_link_graph(*index); + auto link_array_refs_1 = make_link_array_refs(*index); + auto level_array_refs_1 = make_level_array_refs(*index); + // Normal compaction + EXPECT_TRUE(index->consider_compact(CompactionStrategy())); + auto mem_2 = commit_and_update_stat(); + EXPECT_LT(mem_2.usedBytes(), mem_1.usedBytes()); + for (uint32_t i = 0; i < 10; ++i) { + mem_1 = mem_2; + // Forced compaction to move things around + index->compact_link_arrays(true, false); + index->compact_level_arrays(true, false); + commit(); + index->update_stat(); + mem_2 = commit_and_update_stat(); + EXPECT_LE(mem_2.usedBytes(), mem_1.usedBytes()); + if (mem_2.usedBytes() == mem_1.usedBytes()) { + break; + } + } + auto link_graph_2 = make_link_graph(*index); + auto link_array_refs_2 = make_link_array_refs(*index); + auto level_array_refs_2 = make_level_array_refs(*index); + EXPECT_EQ(link_graph_1, link_graph_2); + EXPECT_NE(link_array_refs_1, link_array_refs_2); + EXPECT_NE(level_array_refs_1, level_array_refs_2); + index->shrink_lid_space(10); + auto mem_3 = commit_and_update_stat(); + EXPECT_LT(mem_3.usedBytes(), mem_2.usedBytes()); +} + TEST(LevelGeneratorTest, gives_various_levels) { InvLogLevelGenerator generator(4); diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 62a1072de48..1639f5f8113 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -400,6 +400,18 @@ DenseTensorAttribute::getVersion() const } void +DenseTensorAttribute::onCommit() +{ + TensorAttribute::onCommit(); + if (_index) { + if (_index->consider_compact(getConfig().getCompactionStrategy())) { + incGeneration(); + updateStat(true); + } + } +} + +void DenseTensorAttribute::onGenerationChange(generation_t next_gen) { // TODO: Change onGenerationChange() to send current generation instead of next generation. @@ -430,6 +442,15 @@ DenseTensorAttribute::get_state(const vespalib::slime::Inserter& inserter) const } } +void +DenseTensorAttribute::onShrinkLidSpace() +{ + TensorAttribute::onShrinkLidSpace(); + if (_index) { + _index->shrink_lid_space(getCommittedDocIdLimit()); + } +} + vespalib::eval::TypedCells DenseTensorAttribute::get_vector(uint32_t docid) const { diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h index 752db849b68..8899b1e4bd1 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h @@ -43,9 +43,11 @@ public: std::unique_ptr<AttributeSaver> onInitSave(vespalib::stringref fileName) override; void compactWorst() override; uint32_t getVersion() const override; + void onCommit() override; void onGenerationChange(generation_t next_gen) override; void removeOldGenerations(generation_t first_used_gen) override; void get_state(const vespalib::slime::Inserter& inserter) const override; + void onShrinkLidSpace() override; // Implements DocVectorAccess vespalib::eval::TypedCells get_vector(uint32_t docid) const override; diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp index b1545d587b8..f58dbcafbec 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp @@ -9,6 +9,7 @@ namespace search::tensor { HnswGraph::HnswGraph() : node_refs(), + node_refs_size(1u), nodes(HnswIndex::make_default_node_store_config()), links(HnswIndex::make_default_link_store_config()), entry_docid_and_level() @@ -30,6 +31,9 @@ HnswGraph::make_node_for_document(uint32_t docid, uint32_t num_levels) vespalib::Array<AtomicEntryRef> levels(num_levels, AtomicEntryRef()); auto node_ref = nodes.add(levels); node_refs[docid].store_release(node_ref); + if (docid >= node_refs_size.load(std::memory_order_relaxed)) { + node_refs_size.store(docid + 1, std::memory_order_release); + } return node_ref; } @@ -47,6 +51,19 @@ HnswGraph::remove_node_for_document(uint32_t docid) auto old_links_ref = levels[i].load_acquire(); links.remove(old_links_ref); } + if (docid + 1 == node_refs_size.load(std::memory_order_relaxed)) { + trim_node_refs_size(); + } +} + +void +HnswGraph::trim_node_refs_size() +{ + uint32_t check_doc_id = node_refs_size.load(std::memory_order_relaxed) - 1; + while (check_doc_id > 0u && !node_refs[check_doc_id].load_relaxed().valid()) { + --check_doc_id; + } + node_refs_size.store(check_doc_id + 1, std::memory_order_release); } void diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h index 4d07f74f8e3..57826088ca5 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h @@ -40,6 +40,7 @@ struct HnswGraph { using LinkArrayRef = LinkStore::ConstArrayRef; NodeRefVector node_refs; + std::atomic<uint32_t> node_refs_size; NodeStore nodes; LinkStore links; @@ -52,6 +53,8 @@ struct HnswGraph { void remove_node_for_document(uint32_t docid); + void trim_node_refs_size(); + NodeRef get_node_ref(uint32_t docid) const { return node_refs[docid].load_acquire(); } diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp index 8183a7caf3d..ca5f522457a 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp @@ -5,6 +5,7 @@ #include "hnsw_index_loader.h" #include "hnsw_index_saver.h" #include "random_level_generator.h" +#include <vespa/searchcommon/common/compaction_strategy.h> #include <vespa/searchlib/util/state_explorer_utils.h> #include <vespa/vespalib/data/slime/cursor.h> #include <vespa/vespalib/data/slime/inserter.h> @@ -228,7 +229,7 @@ HnswIndex::search_layer(const TypedCells& input, uint32_t neighbors_to_find, FurthestPriQ& best_neighbors, uint32_t level, const search::BitVector *filter) const { NearestPriQ candidates; - uint32_t doc_id_limit = _graph.node_refs.size(); + uint32_t doc_id_limit = _graph.node_refs_size.load(std::memory_order_acquire); if (filter) { doc_id_limit = std::min(filter->size(), doc_id_limit); } @@ -253,9 +254,11 @@ HnswIndex::search_layer(const TypedCells& input, uint32_t neighbors_to_find, } candidates.pop(); for (uint32_t neighbor_docid : _graph.get_link_array(cand.node_ref, level)) { + if (neighbor_docid >= doc_id_limit) { + continue; + } auto neighbor_ref = _graph.get_node_ref(neighbor_docid); if ((! neighbor_ref.valid()) - || (neighbor_docid >= doc_id_limit) || visited.is_marked(neighbor_docid)) { continue; @@ -282,7 +285,12 @@ HnswIndex::HnswIndex(const DocVectorAccess& vectors, DistanceFunction::UP distan _vectors(vectors), _distance_func(std::move(distance_func)), _level_generator(std::move(level_generator)), - _cfg(cfg) + _cfg(cfg), + _visited_set_pool(), + _cached_level_arrays_memory_usage(), + _cached_level_arrays_address_space_usage(0, 0, (1ull << 32)), + _cached_link_arrays_memory_usage(), + _cached_link_arrays_address_space_usage(0, 0, (1ull << 32)) { assert(_distance_func); } @@ -472,6 +480,93 @@ HnswIndex::trim_hold_lists(generation_t first_used_gen) _graph.links.trimHoldLists(first_used_gen); } +void +HnswIndex::compact_level_arrays(bool compact_memory, bool compact_address_space) +{ + auto context = _graph.nodes.compactWorst(compact_memory, compact_address_space); + uint32_t doc_id_limit = _graph.node_refs.size(); + vespalib::ArrayRef<AtomicEntryRef> refs(&_graph.node_refs[0], doc_id_limit); + context->compact(refs); +} + +void +HnswIndex::compact_link_arrays(bool compact_memory, bool compact_address_space) +{ + auto context = _graph.links.compactWorst(compact_memory, compact_address_space); + uint32_t doc_id_limit = _graph.node_refs.size(); + for (uint32_t doc_id = 1; doc_id < doc_id_limit; ++doc_id) { + EntryRef level_ref = _graph.node_refs[doc_id].load_relaxed(); + if (level_ref.valid()) { + vespalib::ArrayRef<AtomicEntryRef> refs(_graph.nodes.get_writable(level_ref)); + context->compact(refs); + } + } +} + +namespace { + +bool +consider_compact_arrays(const CompactionStrategy& compaction_strategy, vespalib::MemoryUsage& memory_usage, vespalib::AddressSpace& address_space_usage, std::function<void(bool,bool)> compact_arrays) +{ + size_t used_bytes = memory_usage.usedBytes(); + size_t dead_bytes = memory_usage.deadBytes(); + bool compact_memory = compaction_strategy.should_compact_memory(used_bytes, dead_bytes); + size_t used_address_space = address_space_usage.used(); + size_t dead_address_space = address_space_usage.dead(); + bool compact_address_space = compaction_strategy.should_compact_address_space(used_address_space, dead_address_space); + if (compact_memory || compact_address_space) { + compact_arrays(compact_memory, compact_address_space); + return true; + } + return false; +} + +} + +bool +HnswIndex::consider_compact_level_arrays(const CompactionStrategy& compaction_strategy) +{ + return consider_compact_arrays(compaction_strategy, _cached_level_arrays_memory_usage, _cached_level_arrays_address_space_usage, + [this](bool compact_memory, bool compact_address_space) + { compact_level_arrays(compact_memory, compact_address_space); }); +} + +bool +HnswIndex::consider_compact_link_arrays(const CompactionStrategy& compaction_strategy) +{ + return consider_compact_arrays(compaction_strategy, _cached_link_arrays_memory_usage, _cached_link_arrays_address_space_usage, + [this](bool compact_memory, bool compact_address_space) + { compact_link_arrays(compact_memory, compact_address_space); }); +} + +bool +HnswIndex::consider_compact(const CompactionStrategy& compaction_strategy) +{ + bool result = false; + if (consider_compact_level_arrays(compaction_strategy)) { + result = true; + } + if (consider_compact_link_arrays(compaction_strategy)) { + result = true; + } + return result; +} + +vespalib::MemoryUsage +HnswIndex::update_stat() +{ + vespalib::MemoryUsage result; + result.merge(_graph.node_refs.getMemoryUsage()); + _cached_level_arrays_memory_usage = _graph.nodes.getMemoryUsage(); + _cached_level_arrays_address_space_usage = _graph.nodes.addressSpaceUsage(); + result.merge(_cached_level_arrays_memory_usage); + _cached_link_arrays_memory_usage = _graph.links.getMemoryUsage(); + _cached_link_arrays_address_space_usage = _graph.links.addressSpaceUsage(); + result.merge(_cached_link_arrays_memory_usage); + result.merge(_visited_set_pool.memory_usage()); + return result; +} + vespalib::MemoryUsage HnswIndex::memory_usage() const { @@ -526,6 +621,18 @@ HnswIndex::get_state(const vespalib::slime::Inserter& inserter) const _cfg.neighbors_to_explore_at_construction()); } +void +HnswIndex::shrink_lid_space(uint32_t doc_id_limit) +{ + assert(doc_id_limit >= 1u); + assert(doc_id_limit >= _graph.node_refs_size.load(std::memory_order_relaxed)); + uint32_t old_doc_id_limit = _graph.node_refs.size(); + if (doc_id_limit >= old_doc_id_limit) { + return; + } + _graph.node_refs.shrink(doc_id_limit); +} + std::unique_ptr<NearestNeighborIndexSaver> HnswIndex::make_saver() const { diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h index ef0f38c2263..6f6d213d6cc 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h @@ -80,6 +80,10 @@ protected: RandomLevelGenerator::UP _level_generator; Config _cfg; mutable vespalib::ReusableSetPool _visited_set_pool; + vespalib::MemoryUsage _cached_level_arrays_memory_usage; + vespalib::AddressSpace _cached_level_arrays_address_space_usage; + vespalib::MemoryUsage _cached_link_arrays_memory_usage; + vespalib::AddressSpace _cached_link_arrays_address_space_usage; uint32_t max_links_for_level(uint32_t level) const; void add_link_to(uint32_t docid, uint32_t level, const LinkArrayRef& old_links, uint32_t new_link) { @@ -161,8 +165,15 @@ public: void remove_document(uint32_t docid) override; void transfer_hold_lists(generation_t current_gen) override; void trim_hold_lists(generation_t first_used_gen) override; + void compact_level_arrays(bool compact_memory, bool compact_addreess_space); + void compact_link_arrays(bool compact_memory, bool compact_address_space); + bool consider_compact_level_arrays(const CompactionStrategy& compaction_strategy); + bool consider_compact_link_arrays(const CompactionStrategy& compaction_strategy); + bool consider_compact(const CompactionStrategy& compaction_strategy) override; + vespalib::MemoryUsage update_stat() override; vespalib::MemoryUsage memory_usage() const override; void get_state(const vespalib::slime::Inserter& inserter) const override; + void shrink_lid_space(uint32_t doc_id_limit) override; std::unique_ptr<NearestNeighborIndexSaver> make_saver() const override; bool load(const fileutil::LoadedBuffer& buf) override; @@ -184,6 +195,7 @@ public: void set_node(uint32_t docid, const HnswNode &node); bool check_link_symmetry() const; std::pair<uint32_t, bool> count_reachable_nodes() const; + HnswGraph& get_graph() { return _graph; } static vespalib::datastore::ArrayStoreConfig make_default_node_store_config(); static vespalib::datastore::ArrayStoreConfig make_default_link_store_config(); diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp index ac98b28d105..c0aec9ff91a 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp @@ -38,7 +38,9 @@ HnswIndexLoader::load(const fileutil::LoadedBuffer& buf) } } if (_failed) return false; - _graph.node_refs.ensure_size(num_nodes); + _graph.node_refs.ensure_size(std::max(num_nodes, 1u)); + _graph.node_refs_size.store(std::max(num_nodes, 1u), std::memory_order_release); + _graph.trim_node_refs_size(); auto entry_node_ref = _graph.get_node_ref(entry_docid); _graph.set_entry_node({entry_docid, entry_node_ref, entry_level}); return true; diff --git a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h index fd37cf80720..0122738e173 100644 --- a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h +++ b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h @@ -14,7 +14,10 @@ namespace vespalib::slime { struct Inserter; } namespace search::fileutil { class LoadedBuffer; } -namespace search { class BitVector; } +namespace search { +class BitVector; +class CompactionStrategy; +} namespace search::tensor { @@ -59,8 +62,11 @@ public: virtual void remove_document(uint32_t docid) = 0; virtual void transfer_hold_lists(generation_t current_gen) = 0; virtual void trim_hold_lists(generation_t first_used_gen) = 0; + virtual bool consider_compact(const CompactionStrategy& compaction_strategy) = 0; + virtual vespalib::MemoryUsage update_stat() = 0; virtual vespalib::MemoryUsage memory_usage() const = 0; virtual void get_state(const vespalib::slime::Inserter& inserter) const = 0; + virtual void shrink_lid_space(uint32_t doc_id_limit) = 0; /** * Creates a saver that is used to save the index to binary form. diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp index 485e6d27a1c..70bbab37e3e 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp @@ -11,10 +11,6 @@ namespace slobrok { #pragma GCC diagnostic ignored "-Winline" void LocalRpcMonitorMap::DelayedTasks::PerformTask() { - { - std::vector<MUP> deleteAfterSwap; - std::swap(deleteAfterSwap, _deleteList); - } std::vector<Event> todo; std::swap(todo, _queue); for (const auto & entry : todo) { @@ -29,29 +25,57 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() { } } -LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor) +LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor, + MappingMonitorFactory mappingMonitorFactory) : _delayedTasks(supervisor.GetScheduler(), *this), _map(), _dispatcher(), _history(), - _supervisor(supervisor), + _mappingMonitor(mappingMonitorFactory(*this)), _subscription(MapSubscription::subscribe(_dispatcher, _history)) { } LocalRpcMonitorMap::~LocalRpcMonitorMap() = default; -LocalRpcMonitorMap::PerService * -LocalRpcMonitorMap::lookup(ManagedRpcServer *rpcsrv) { - auto iter = _map.find(rpcsrv->getName()); +LocalRpcMonitorMap::PerService & +LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) { + LOG(spam, "lookup %s->%s", mapping.name.c_str(), mapping.spec.c_str()); + auto iter = _map.find(mapping.name); if (iter == _map.end()) { - return nullptr; + LOG_ABORT("not in map"); } PerService & psd = iter->second; - if (psd.srv.get() != rpcsrv) { - return nullptr; + if (psd.spec != mapping.spec) { + LOG_ABORT("conflict in map: %s->%s"); } - return &psd; + LOG(spam, "found in map: %s->%s [%s,%s]", + iter->first.c_str(), psd.spec.c_str(), + psd.up ? "up" : "down", + psd.localOnly ? "local" : "global"); + return psd; +} + +void LocalRpcMonitorMap::addToMap(const ServiceMapping &mapping, PerService psd) { + auto [ iter, was_inserted ] = + _map.try_emplace(mapping.name, std::move(psd)); + LOG_ASSERT(was_inserted); + _mappingMonitor->start(mapping); +} + +LocalRpcMonitorMap::RemovedData +LocalRpcMonitorMap::removeFromMap(Map::iterator iter) { + auto name = iter->first; + PerService psd = std::move(iter->second); + ServiceMapping mapping{iter->first, psd.spec}; + _mappingMonitor->stop(mapping); + _map.erase(iter); + return RemovedData { + .mapping = mapping, + .up = psd.up, + .localOnly = psd.localOnly, + .inflight = std::move(psd.inflight) + }; } ServiceMapHistory & LocalRpcMonitorMap::history() { @@ -66,7 +90,7 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, auto old = _map.find(mapping.name); if (old != _map.end()) { const PerService & exists = old->second; - if (exists.spec() == mapping.spec) { + if (exists.spec == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); inflight->doneHandler(OkState(0, "already registered")); @@ -74,13 +98,11 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping, } LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str(), - exists.name().c_str(), exists.spec().c_str()); + mapping.name.c_str(), exists.spec.c_str()); inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict")); return; } - auto [ iter, was_inserted ] = - _map.try_emplace(mapping.name, localService(mapping, std::move(inflight))); - LOG_ASSERT(was_inserted); + addToMap(mapping, localService(mapping, std::move(inflight))); } void LocalRpcMonitorMap::add(const ServiceMapping &mapping) { @@ -97,41 +119,37 @@ void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) { auto old = _map.find(mapping.name); if (old != _map.end()) { PerService & exists = old->second; - if (exists.spec() == mapping.spec) { + if (exists.spec == mapping.spec) { LOG(debug, "added mapping %s->%s was already present", mapping.name.c_str(), mapping.spec.c_str()); exists.localOnly = false; return; } - PerService removed = std::move(exists); - _map.erase(old); + auto removed = removeFromMap(old); LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str(), - removed.name().c_str(), removed.spec().c_str()); + removed.mapping.name.c_str(), removed.mapping.spec.c_str()); if (removed.inflight) { auto target = std::move(removed.inflight); target->doneHandler(OkState(13, "conflict during initialization")); } if (removed.up) { - _dispatcher.remove(removed.mapping()); + _dispatcher.remove(removed.mapping); } - _delayedTasks.deleteLater(std::move(removed.srv)); } - auto [ iter, was_inserted ] = - _map.try_emplace(mapping.name, globalService(mapping)); - LOG_ASSERT(was_inserted); + addToMap(mapping, globalService(mapping)); } void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) { auto iter = _map.find(mapping.name); if (iter != _map.end()) { - PerService removed = std::move(iter->second); - _map.erase(iter); - LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); - if (mapping.spec != removed.spec()) { + auto removed = removeFromMap(iter); + LOG(debug, "remove: mapping %s->%s", + removed.mapping.name.c_str(), removed.mapping.spec.c_str()); + if (mapping.spec != removed.mapping.spec) { LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'", mapping.name.c_str(), - removed.spec().c_str(), + removed.mapping.spec.c_str(), mapping.spec.c_str()); } if (removed.inflight) { @@ -139,53 +157,44 @@ void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) { target->doneHandler(OkState(13, "removed during initialization")); } if (removed.up) { - _dispatcher.remove(removed.mapping()); + _dispatcher.remove(removed.mapping); } - _delayedTasks.deleteLater(std::move(removed.srv)); } else { LOG(debug, "tried to remove non-existing mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str()); } } -void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) { - if (auto *psd = lookup(rpcsrv)) { - LOG(debug, "failed: %s->%s", psd->name().c_str(), psd->spec().c_str()); - if (psd->inflight) { - auto target = std::move(psd->inflight); - target->doneHandler(OkState(13, "failed check using listNames callback")); - } - if (psd->localOnly) { - PerService removed = std::move(*psd); - auto iter = _map.find(removed.name()); - _map.erase(iter); - if (removed.up) { - _dispatcher.remove(removed.mapping()); - } - _delayedTasks.deleteLater(std::move(removed.srv)); - } else if (psd->up) { - psd->up = false; - _dispatcher.remove(psd->mapping()); - } +void LocalRpcMonitorMap::down(const ServiceMapping& mapping) { + PerService &psd = lookup(mapping); + LOG(debug, "failed: %s->%s", mapping.name.c_str(), psd.spec.c_str()); + if (psd.inflight) { + auto target = std::move(psd.inflight); + target->doneHandler(OkState(13, "failed check using listNames callback")); } -} - -void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) { - if (auto *psd = lookup(rpcsrv)) { - LOG(debug, "ok: %s->%s", psd->name().c_str(), psd->spec().c_str()); - if (psd->inflight) { - auto target = std::move(psd->inflight); - target->doneHandler(OkState()); - } - if (! psd->up) { - psd->up = true; - _dispatcher.add(psd->mapping()); + if (psd.localOnly) { + auto iter = _map.find(mapping.name); + auto removed = removeFromMap(iter); + if (removed.up) { + _dispatcher.remove(removed.mapping); } + } else if (psd.up) { + psd.up = false; + _dispatcher.remove(mapping); } } -FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() { - return &_supervisor; +void LocalRpcMonitorMap::up(const ServiceMapping& mapping) { + PerService &psd = lookup(mapping); + LOG(debug, "ok: %s->%s", mapping.name.c_str(), psd.spec.c_str()); + if (psd.inflight) { + auto target = std::move(psd.inflight); + target->doneHandler(OkState()); + } + if (! psd.up) { + psd.up = true; + _dispatcher.add(mapping); + } } } // namespace slobrok diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h index 8961e21f386..8df2ca882de 100644 --- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h +++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h @@ -2,10 +2,10 @@ #pragma once #include "cmd.h" -#include "i_rpc_server_manager.h" #include "managed_rpc_server.h" #include "map_listener.h" #include "map_source.h" +#include "mapping_monitor.h" #include "named_service.h" #include "proxy_map_source.h" #include "service_map_history.h" @@ -24,8 +24,8 @@ namespace slobrok { * Tracks up/down status for name->spec combinations * that are considered for publication locally. **/ -class LocalRpcMonitorMap : public IRpcServerManager, - public MapListener +class LocalRpcMonitorMap : public MapListener, + public MappingMonitorOwner { private: enum class EventType { ADD, REMOVE }; @@ -42,16 +42,9 @@ private: }; class DelayedTasks : public FNET_Task { - using MUP = std::unique_ptr<ManagedRpcServer>; - std::vector<MUP> _deleteList; std::vector<Event> _queue; LocalRpcMonitorMap &_target; public: - void deleteLater(MUP rpcsrv) { - _deleteList.emplace_back(std::move(rpcsrv)); - ScheduleNow(); - } - void handleLater(Event event) { _queue.emplace_back(std::move(event)); ScheduleNow(); @@ -61,7 +54,6 @@ private: DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target) : FNET_Task(scheduler), - _deleteList(), _queue(), _target(target) {} @@ -75,17 +67,9 @@ private: bool up; bool localOnly; std::unique_ptr<ScriptCommand> inflight; - std::unique_ptr<ManagedRpcServer> srv; - - vespalib::string name() const { return srv->getName(); } - vespalib::string spec() const { return srv->getSpec(); } - ServiceMapping mapping() const { return ServiceMapping{srv->getName(), srv->getSpec()}; } + vespalib::string spec; }; - std::unique_ptr<ManagedRpcServer> managedFor(const ServiceMapping &mapping) { - return std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this); - } - PerService localService(const ServiceMapping &mapping, std::unique_ptr<ScriptCommand> inflight) { @@ -93,7 +77,7 @@ private: .up = false, .localOnly = true, .inflight = std::move(inflight), - .srv = managedFor(mapping) + .spec = mapping.spec }; } @@ -102,7 +86,7 @@ private: .up = false, .localOnly = false, .inflight = {}, - .srv = managedFor(mapping) + .spec = mapping.spec }; } @@ -111,16 +95,28 @@ private: Map _map; ProxyMapSource _dispatcher; ServiceMapHistory _history; - FRT_Supervisor &_supervisor; + MappingMonitor::UP _mappingMonitor; std::unique_ptr<MapSubscription> _subscription; - PerService *lookup(ManagedRpcServer *rpcsrv); - void doAdd(const ServiceMapping &mapping); void doRemove(const ServiceMapping &mapping); - + + PerService & lookup(const ServiceMapping &mapping); + + void addToMap(const ServiceMapping &mapping, PerService psd); + + struct RemovedData { + ServiceMapping mapping; + bool up; + bool localOnly; + std::unique_ptr<ScriptCommand> inflight; + }; + + RemovedData removeFromMap(Map::iterator iter); + public: - LocalRpcMonitorMap(FRT_Supervisor &_supervisor); + LocalRpcMonitorMap(FRT_Supervisor &supervisor, + MappingMonitorFactory mappingMonitorFactory); ~LocalRpcMonitorMap(); MapSource &dispatcher() { return _dispatcher; } @@ -133,9 +129,8 @@ public: void add(const ServiceMapping &mapping) override; void remove(const ServiceMapping &mapping) override; - void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override; - void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override; - FRT_Supervisor *getSupervisor() override; + void up(const ServiceMapping& mapping) override; + void down(const ServiceMapping& mapping) override; }; //----------------------------------------------------------------------------- diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index cf612a187bb..1f54716c29c 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -110,7 +110,10 @@ SBEnv::SBEnv(const ConfigShim &shim) _health(), _metrics(_rpcHooks, *_transport), _components(), - _localRpcMonitorMap(*_supervisor), + _localRpcMonitorMap(*_supervisor, + [this] (MappingMonitorOwner &owner) { + return std::make_unique<RpcMappingMonitor>(*_supervisor, owner); + }), _rpcsrvmanager(*this), _exchanger(*this, _rpcsrvmap), _rpcsrvmap() diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h index 7bed910936f..44b7305814c 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.h +++ b/slobrok/src/vespa/slobrok/server/sbenv.h @@ -2,6 +2,7 @@ #pragma once #include "named_service.h" +#include "rpc_mapping_monitor.h" #include "rpc_server_map.h" #include "rpc_server_manager.h" #include "remote_slobrok.h" diff --git a/vespabase/src/common-env.sh b/vespabase/src/common-env.sh index d5f8381432f..eb60154071c 100755 --- a/vespabase/src/common-env.sh +++ b/vespabase/src/common-env.sh @@ -292,30 +292,3 @@ log_debug_message () { log_warning_message () { log_message "warning" "$*" 1>&2 } - -get_numa_ctl_cmd () { - if ! type numactl &> /dev/null; then - echo "FATAL: Could not find required program numactl." - exit 1 - fi - - numnodes=$(numactl --hardware 2>/dev/null | - grep available | - awk '$3 == "nodes" { print $2 }') - - if [ -n "$numanodes" ]; then - # We are allowed to use numactl and have NUMA nodes - if [ "$VESPA_AFFINITY_CPU_SOCKET" ] && - [ "$numnodes" -gt 1 ] - then - node=$(($VESPA_AFFINITY_CPU_SOCKET % $numnodes)) - numactlcmd="numactl --cpunodebind=$node --membind=$node" - else - numactlcmd="numactl --interleave all" - fi - else - numactlcmd="" - fi - - echo $numactlcmd -} diff --git a/vespabase/src/start-cbinaries.sh b/vespabase/src/start-cbinaries.sh index 1ef45a71dec..f17829aa081 100755 --- a/vespabase/src/start-cbinaries.sh +++ b/vespabase/src/start-cbinaries.sh @@ -163,13 +163,29 @@ configure_vespa_malloc () { fi } +configure_numa_ctl () { + numactl="" + if numactl --interleave all true &> /dev/null; then + # We are allowed to use numactl + numactl="numactl --interleave all" + if [ "$VESPA_AFFINITY_CPU_SOCKET" ]; then + numcpu=`numactl --hardware 2>/dev/null | grep available | cut -d' ' -f2` + if [ "$numcpu" ] && [ "$numcpu" -gt 1 ]; then + log_debug_message "Starting $0 with affinity $VESPA_AFFINITY_CPU_SOCKET out of $numcpu" + node=$(($VESPA_AFFINITY_CPU_SOCKET % $numcpu)) + numactl="numactl --cpunodebind=$node --membind=$node" + fi + fi + fi +} + configure_valgrind configure_huge_pages configure_use_madvise configure_vespa_malloc if $no_valgrind ; then - numactl=$(get_numa_ctl_cmd) + configure_numa_ctl ulimit -c unlimited log_debug_message "Starting $0 with : " \ $numactl env LD_PRELOAD=$LD_PRELOAD $0-bin "$@" diff --git a/vespalib/src/tests/datastore/array_store/array_store_test.cpp b/vespalib/src/tests/datastore/array_store/array_store_test.cpp index 562ecaaecfa..0de9b83935f 100644 --- a/vespalib/src/tests/datastore/array_store/array_store_test.cpp +++ b/vespalib/src/tests/datastore/array_store/array_store_test.cpp @@ -18,8 +18,15 @@ using generation_t = vespalib::GenerationHandler::generation_t; using MemStats = vespalib::datastore::test::MemStats; using BufferStats = vespalib::datastore::test::BufferStats; +namespace { + constexpr float ALLOC_GROW_FACTOR = 0.2; +EntryRef as_entry_ref(const EntryRef& ref) noexcept { return ref; } +EntryRef as_entry_ref(const AtomicEntryRef& ref) noexcept { return ref.load_relaxed(); } + +} + template <typename EntryT, typename RefT = EntryRefT<19> > struct Fixture { @@ -115,19 +122,20 @@ struct Fixture store.transferHoldLists(generation++); store.trimHoldLists(generation); } + template <typename TestedRefType> void compactWorst(bool compactMemory, bool compactAddressSpace) { ICompactionContext::UP ctx = store.compactWorst(compactMemory, compactAddressSpace); - std::vector<EntryRef> refs; + std::vector<TestedRefType> refs; for (auto itr = refStore.begin(); itr != refStore.end(); ++itr) { - refs.push_back(itr->first); + refs.emplace_back(itr->first); } - std::vector<EntryRef> compactedRefs = refs; - ctx->compact(ArrayRef<EntryRef>(compactedRefs)); + std::vector<TestedRefType> compactedRefs = refs; + ctx->compact(ArrayRef<TestedRefType>(compactedRefs)); ReferenceStore compactedRefStore; for (size_t i = 0; i < refs.size(); ++i) { - ASSERT_EQUAL(0u, compactedRefStore.count(compactedRefs[i])); - ASSERT_EQUAL(1u, refStore.count(refs[i])); - compactedRefStore.insert(std::make_pair(compactedRefs[i], refStore[refs[i]])); + ASSERT_EQUAL(0u, compactedRefStore.count(as_entry_ref(compactedRefs[i]))); + ASSERT_EQUAL(1u, refStore.count(as_entry_ref(refs[i]))); + compactedRefStore.insert(std::make_pair(as_entry_ref(compactedRefs[i]), refStore[as_entry_ref(refs[i])])); } refStore = compactedRefStore; } @@ -252,7 +260,11 @@ TEST_F("require that new underlying buffer is allocated when current is full", S TEST_DO(f.assertStoreContent()); } -TEST_F("require that the buffer with most dead space is compacted", NumberFixture(2)) +namespace { + +template <typename TestedRefType> +void +test_compaction(NumberFixture &f) { EntryRef size1Ref = f.add({1}); EntryRef size2Ref = f.add({2,2}); @@ -267,7 +279,7 @@ TEST_F("require that the buffer with most dead space is compacted", NumberFixtur uint32_t size3BufferId = f.getBufferId(size3Ref); EXPECT_EQUAL(3u, f.refStore.size()); - f.compactWorst(true, false); + f.compactWorst<TestedRefType>(true, false); EXPECT_EQUAL(3u, f.refStore.size()); f.assertStoreContent(); @@ -281,6 +293,18 @@ TEST_F("require that the buffer with most dead space is compacted", NumberFixtur EXPECT_TRUE(f.store.bufferState(size2Ref).isFree()); } +} + +TEST_F("require that the buffer with most dead space is compacted (EntryRef vector)", NumberFixture(2)) +{ + test_compaction<EntryRef>(f); +} + +TEST_F("require that the buffer with most dead space is compacted (AtomicEntryRef vector)", NumberFixture(2)) +{ + test_compaction<AtomicEntryRef>(f); +} + namespace { void testCompaction(NumberFixture &f, bool compactMemory, bool compactAddressSpace) @@ -300,7 +324,7 @@ void testCompaction(NumberFixture &f, bool compactMemory, bool compactAddressSpa uint32_t size3BufferId = f.getBufferId(size3Ref); EXPECT_EQUAL(3u, f.refStore.size()); - f.compactWorst(compactMemory, compactAddressSpace); + f.compactWorst<EntryRef>(compactMemory, compactAddressSpace); EXPECT_EQUAL(3u, f.refStore.size()); f.assertStoreContent(); diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.hpp b/vespalib/src/vespa/vespalib/datastore/array_store.hpp index 5409c21594c..29db72c2ed3 100644 --- a/vespalib/src/vespa/vespalib/datastore/array_store.hpp +++ b/vespalib/src/vespa/vespalib/datastore/array_store.hpp @@ -157,6 +157,20 @@ public: } } } + void compact(vespalib::ArrayRef<AtomicEntryRef> refs) override { + if (!_bufferIdsToCompact.empty()) { + for (auto &ref : refs) { + if (ref.load_relaxed().valid()) { + RefT internalRef(ref.load_relaxed()); + if (compactingBuffer(internalRef.bufferId())) { + EntryRef newRef = _store.add(_store.get(ref.load_relaxed())); + std::atomic_thread_fence(std::memory_order_release); + ref.store_release(newRef); + } + } + } + } + } }; } diff --git a/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h b/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h index 72b473adf4a..291d751082a 100644 --- a/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h +++ b/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h @@ -16,6 +16,7 @@ struct ICompactionContext { using UP = std::unique_ptr<ICompactionContext>; virtual ~ICompactionContext() {} virtual void compact(vespalib::ArrayRef<EntryRef> refs) = 0; + virtual void compact(vespalib::ArrayRef<AtomicEntryRef> refs) = 0; }; } |