aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xcontainer-disc/src/main/sh/vespa-start-container-daemon.sh23
-rw-r--r--searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp8
-rw-r--r--searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp118
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp21
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h2
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp17
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_graph.h3
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp113
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index.h12
-rw-r--r--searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h8
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp143
-rw-r--r--slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h55
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp5
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.h1
-rwxr-xr-xvespabase/src/common-env.sh27
-rwxr-xr-xvespabase/src/start-cbinaries.sh18
-rw-r--r--vespalib/src/tests/datastore/array_store/array_store_test.cpp44
-rw-r--r--vespalib/src/vespa/vespalib/datastore/array_store.hpp14
-rw-r--r--vespalib/src/vespa/vespalib/datastore/i_compaction_context.h1
20 files changed, 493 insertions, 144 deletions
diff --git a/container-disc/src/main/sh/vespa-start-container-daemon.sh b/container-disc/src/main/sh/vespa-start-container-daemon.sh
index a0cbd9d9186..223124c29d6 100755
--- a/container-disc/src/main/sh/vespa-start-container-daemon.sh
+++ b/container-disc/src/main/sh/vespa-start-container-daemon.sh
@@ -110,8 +110,27 @@ configure_cpu() {
}
configure_numactl() {
- numactlcmd=$(get_numa_ctl_cmd)
- log_message debug "starting ${VESPA_SERVICE_NAME} for ${VESPA_CONFIG_ID} with numactl command : $numactlcmd"
+ log_message debug "starting ${VESPA_SERVICE_NAME} for ${VESPA_CONFIG_ID}"
+ if numactl --interleave all true &> /dev/null; then
+ # We are allowed to use numactl
+ numnodes=$(numactl --hardware |
+ grep available |
+ awk '$3 == "nodes" { print $2 }')
+ if [ "$VESPA_AFFINITY_CPU_SOCKET" ] &&
+ [ "$numnodes" -gt 1 ]
+ then
+ node=$(($VESPA_AFFINITY_CPU_SOCKET % $numnodes))
+ log_message debug "with affinity to $VESPA_AFFINITY_CPU_SOCKET out of $numnodes cpu sockets"
+ numactlcmd="numactl --cpunodebind=$node --membind=$node"
+ else
+ log_message debug "with memory interleaving on all nodes"
+ numactlcmd="numactl --interleave all"
+ fi
+ else
+ log_message debug "without numactl (no permission or not available)"
+ numactlcmd=""
+ fi
+ log_message debug "numactlcmd: $numactlcmd"
}
configure_gcopts() {
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
index a49816f1a52..084bdf90a14 100644
--- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
+++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
@@ -36,6 +36,7 @@ LOG_SETUP("tensorattribute_test");
using document::WrongTensorTypeException;
using search::AttributeGuard;
using search::AttributeVector;
+using search::CompactionStrategy;
using search::attribute::DistanceMetric;
using search::attribute::HnswIndexParams;
using search::queryeval::GlobalFilter;
@@ -199,11 +200,18 @@ public:
void trim_hold_lists(generation_t first_used_gen) override {
_trim_gen = first_used_gen;
}
+ bool consider_compact(const CompactionStrategy&) override {
+ return false;
+ }
+ vespalib::MemoryUsage update_stat() override {
+ return vespalib::MemoryUsage();
+ }
vespalib::MemoryUsage memory_usage() const override {
++_memory_usage_cnt;
return vespalib::MemoryUsage();
}
void get_state(const vespalib::slime::Inserter&) const override {}
+ void shrink_lid_space(uint32_t) override { }
std::unique_ptr<NearestNeighborIndexSaver> make_saver() const override {
if (_index_value != 0) {
return std::make_unique<MockIndexSaver>(_index_value);
diff --git a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp
index 3f7ec140781..9e7f082e237 100644
--- a/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp
+++ b/searchlib/src/tests/tensor/hnsw_index/hnsw_index_test.cpp
@@ -1,5 +1,6 @@
// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchcommon/common/compaction_strategy.h>
#include <vespa/searchlib/common/bitvector.h>
#include <vespa/searchlib/tensor/distance_functions.h>
#include <vespa/searchlib/tensor/doc_vector_access.h>
@@ -20,7 +21,7 @@ using namespace search::tensor;
using namespace vespalib::slime;
using vespalib::Slime;
using search::BitVector;
-
+using search::CompactionStrategy;
template <typename FloatType>
class MyDocVectorAccess : public DocVectorAccess {
@@ -42,6 +43,8 @@ public:
ArrayRef ref(_vectors[docid]);
return vespalib::eval::TypedCells(ref);
}
+
+ void clear() { _vectors.clear(); }
};
struct LevelGenerator : public RandomLevelGenerator {
@@ -111,6 +114,10 @@ public:
MemoryUsage memory_usage() const {
return index->memory_usage();
}
+ MemoryUsage commit_and_update_stat() {
+ commit();
+ return index->update_stat();
+ }
void expect_entry_point(uint32_t exp_docid, uint32_t exp_level) {
EXPECT_EQ(exp_docid, index->get_entry_docid());
EXPECT_EQ(exp_level, index->get_entry_level());
@@ -166,6 +173,8 @@ public:
docid, hit.docid, hit.distance, thr);
}
}
+
+ FloatVectors& get_vectors() { return vectors; }
};
@@ -533,6 +542,113 @@ TEST_F(HnswIndexTest, shrink_called_heuristic)
EXPECT_TRUE(index->check_link_symmetry());
}
+namespace {
+
+template <class ResultGraph>
+ResultGraph
+make_graph_helper(HnswIndex& index)
+{
+ using LevelArrayRef = HnswGraph::LevelArrayRef;
+ using LinkArrayRef = HnswGraph::LinkArrayRef;
+ auto& graph = index.get_graph();
+ ResultGraph result(graph.size());
+ assert(!graph.get_node_ref(0).valid());
+ for (uint32_t doc_id = 1; doc_id < graph.size(); ++doc_id) {
+ auto& node = result[doc_id];
+ auto node_ref = graph.get_node_ref(doc_id);
+ if constexpr (std::is_same_v<std::remove_reference_t<decltype(node)>, uint32_t>) {
+ node = node_ref.ref();
+ } else {
+ LevelArrayRef level_array(graph.get_level_array(node_ref));
+ for (uint32_t level = 0; level < level_array.size(); ++level) {
+ if constexpr (std::is_same_v<std::remove_reference_t<decltype(node)>, std::vector<uint32_t>>) {
+ node.emplace_back(level_array[level].load_relaxed().ref());
+ } else {
+ LinkArrayRef link_array(graph.get_link_array(level_array, level));
+ node.emplace_back(std::vector<uint32_t>(link_array.begin(), link_array.end()));
+ }
+ }
+ }
+ }
+ return result;
+}
+
+using LinkGraph = std::vector<std::vector<std::vector<uint32_t>>>;
+
+LinkGraph
+make_link_graph(HnswIndex& index)
+{
+ return make_graph_helper<LinkGraph>(index);
+}
+
+using LinkArrayRefGraph = std::vector<std::vector<uint32_t>>;
+
+LinkArrayRefGraph
+make_link_array_refs(HnswIndex& index)
+{
+ return make_graph_helper<LinkArrayRefGraph>(index);
+}
+
+using LevelArrayRefGraph = std::vector<uint32_t>;
+
+LevelArrayRefGraph
+make_level_array_refs(HnswIndex& index)
+{
+ return make_graph_helper<LevelArrayRefGraph>(index);
+}
+
+}
+
+TEST_F(HnswIndexTest, hnsw_graph_is_compacted)
+{
+ init(true);
+ get_vectors().clear();
+ uint32_t doc_id = 1;
+ for (uint32_t x = 0; x < 100; ++x) {
+ for (uint32_t y = 0; y < 50; ++y) {
+ get_vectors().set(doc_id, { float(x), float(y) });
+ ++doc_id;
+ }
+ }
+ uint32_t doc_id_end = doc_id;
+ for (doc_id = 1; doc_id < doc_id_end; ++doc_id) {
+ add_document(doc_id);
+ }
+ for (doc_id = 10; doc_id < doc_id_end; ++doc_id) {
+ remove_document(doc_id);
+ }
+ auto mem_1 = commit_and_update_stat();
+ auto link_graph_1 = make_link_graph(*index);
+ auto link_array_refs_1 = make_link_array_refs(*index);
+ auto level_array_refs_1 = make_level_array_refs(*index);
+ // Normal compaction
+ EXPECT_TRUE(index->consider_compact(CompactionStrategy()));
+ auto mem_2 = commit_and_update_stat();
+ EXPECT_LT(mem_2.usedBytes(), mem_1.usedBytes());
+ for (uint32_t i = 0; i < 10; ++i) {
+ mem_1 = mem_2;
+ // Forced compaction to move things around
+ index->compact_link_arrays(true, false);
+ index->compact_level_arrays(true, false);
+ commit();
+ index->update_stat();
+ mem_2 = commit_and_update_stat();
+ EXPECT_LE(mem_2.usedBytes(), mem_1.usedBytes());
+ if (mem_2.usedBytes() == mem_1.usedBytes()) {
+ break;
+ }
+ }
+ auto link_graph_2 = make_link_graph(*index);
+ auto link_array_refs_2 = make_link_array_refs(*index);
+ auto level_array_refs_2 = make_level_array_refs(*index);
+ EXPECT_EQ(link_graph_1, link_graph_2);
+ EXPECT_NE(link_array_refs_1, link_array_refs_2);
+ EXPECT_NE(level_array_refs_1, level_array_refs_2);
+ index->shrink_lid_space(10);
+ auto mem_3 = commit_and_update_stat();
+ EXPECT_LT(mem_3.usedBytes(), mem_2.usedBytes());
+}
+
TEST(LevelGeneratorTest, gives_various_levels)
{
InvLogLevelGenerator generator(4);
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
index 62a1072de48..1639f5f8113 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
@@ -400,6 +400,18 @@ DenseTensorAttribute::getVersion() const
}
void
+DenseTensorAttribute::onCommit()
+{
+ TensorAttribute::onCommit();
+ if (_index) {
+ if (_index->consider_compact(getConfig().getCompactionStrategy())) {
+ incGeneration();
+ updateStat(true);
+ }
+ }
+}
+
+void
DenseTensorAttribute::onGenerationChange(generation_t next_gen)
{
// TODO: Change onGenerationChange() to send current generation instead of next generation.
@@ -430,6 +442,15 @@ DenseTensorAttribute::get_state(const vespalib::slime::Inserter& inserter) const
}
}
+void
+DenseTensorAttribute::onShrinkLidSpace()
+{
+ TensorAttribute::onShrinkLidSpace();
+ if (_index) {
+ _index->shrink_lid_space(getCommittedDocIdLimit());
+ }
+}
+
vespalib::eval::TypedCells
DenseTensorAttribute::get_vector(uint32_t docid) const
{
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h
index 752db849b68..8899b1e4bd1 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h
@@ -43,9 +43,11 @@ public:
std::unique_ptr<AttributeSaver> onInitSave(vespalib::stringref fileName) override;
void compactWorst() override;
uint32_t getVersion() const override;
+ void onCommit() override;
void onGenerationChange(generation_t next_gen) override;
void removeOldGenerations(generation_t first_used_gen) override;
void get_state(const vespalib::slime::Inserter& inserter) const override;
+ void onShrinkLidSpace() override;
// Implements DocVectorAccess
vespalib::eval::TypedCells get_vector(uint32_t docid) const override;
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp
index b1545d587b8..f58dbcafbec 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp
@@ -9,6 +9,7 @@ namespace search::tensor {
HnswGraph::HnswGraph()
: node_refs(),
+ node_refs_size(1u),
nodes(HnswIndex::make_default_node_store_config()),
links(HnswIndex::make_default_link_store_config()),
entry_docid_and_level()
@@ -30,6 +31,9 @@ HnswGraph::make_node_for_document(uint32_t docid, uint32_t num_levels)
vespalib::Array<AtomicEntryRef> levels(num_levels, AtomicEntryRef());
auto node_ref = nodes.add(levels);
node_refs[docid].store_release(node_ref);
+ if (docid >= node_refs_size.load(std::memory_order_relaxed)) {
+ node_refs_size.store(docid + 1, std::memory_order_release);
+ }
return node_ref;
}
@@ -47,6 +51,19 @@ HnswGraph::remove_node_for_document(uint32_t docid)
auto old_links_ref = levels[i].load_acquire();
links.remove(old_links_ref);
}
+ if (docid + 1 == node_refs_size.load(std::memory_order_relaxed)) {
+ trim_node_refs_size();
+ }
+}
+
+void
+HnswGraph::trim_node_refs_size()
+{
+ uint32_t check_doc_id = node_refs_size.load(std::memory_order_relaxed) - 1;
+ while (check_doc_id > 0u && !node_refs[check_doc_id].load_relaxed().valid()) {
+ --check_doc_id;
+ }
+ node_refs_size.store(check_doc_id + 1, std::memory_order_release);
}
void
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h
index 4d07f74f8e3..57826088ca5 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h
@@ -40,6 +40,7 @@ struct HnswGraph {
using LinkArrayRef = LinkStore::ConstArrayRef;
NodeRefVector node_refs;
+ std::atomic<uint32_t> node_refs_size;
NodeStore nodes;
LinkStore links;
@@ -52,6 +53,8 @@ struct HnswGraph {
void remove_node_for_document(uint32_t docid);
+ void trim_node_refs_size();
+
NodeRef get_node_ref(uint32_t docid) const {
return node_refs[docid].load_acquire();
}
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
index 8183a7caf3d..ca5f522457a 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp
@@ -5,6 +5,7 @@
#include "hnsw_index_loader.h"
#include "hnsw_index_saver.h"
#include "random_level_generator.h"
+#include <vespa/searchcommon/common/compaction_strategy.h>
#include <vespa/searchlib/util/state_explorer_utils.h>
#include <vespa/vespalib/data/slime/cursor.h>
#include <vespa/vespalib/data/slime/inserter.h>
@@ -228,7 +229,7 @@ HnswIndex::search_layer(const TypedCells& input, uint32_t neighbors_to_find,
FurthestPriQ& best_neighbors, uint32_t level, const search::BitVector *filter) const
{
NearestPriQ candidates;
- uint32_t doc_id_limit = _graph.node_refs.size();
+ uint32_t doc_id_limit = _graph.node_refs_size.load(std::memory_order_acquire);
if (filter) {
doc_id_limit = std::min(filter->size(), doc_id_limit);
}
@@ -253,9 +254,11 @@ HnswIndex::search_layer(const TypedCells& input, uint32_t neighbors_to_find,
}
candidates.pop();
for (uint32_t neighbor_docid : _graph.get_link_array(cand.node_ref, level)) {
+ if (neighbor_docid >= doc_id_limit) {
+ continue;
+ }
auto neighbor_ref = _graph.get_node_ref(neighbor_docid);
if ((! neighbor_ref.valid())
- || (neighbor_docid >= doc_id_limit)
|| visited.is_marked(neighbor_docid))
{
continue;
@@ -282,7 +285,12 @@ HnswIndex::HnswIndex(const DocVectorAccess& vectors, DistanceFunction::UP distan
_vectors(vectors),
_distance_func(std::move(distance_func)),
_level_generator(std::move(level_generator)),
- _cfg(cfg)
+ _cfg(cfg),
+ _visited_set_pool(),
+ _cached_level_arrays_memory_usage(),
+ _cached_level_arrays_address_space_usage(0, 0, (1ull << 32)),
+ _cached_link_arrays_memory_usage(),
+ _cached_link_arrays_address_space_usage(0, 0, (1ull << 32))
{
assert(_distance_func);
}
@@ -472,6 +480,93 @@ HnswIndex::trim_hold_lists(generation_t first_used_gen)
_graph.links.trimHoldLists(first_used_gen);
}
+void
+HnswIndex::compact_level_arrays(bool compact_memory, bool compact_address_space)
+{
+ auto context = _graph.nodes.compactWorst(compact_memory, compact_address_space);
+ uint32_t doc_id_limit = _graph.node_refs.size();
+ vespalib::ArrayRef<AtomicEntryRef> refs(&_graph.node_refs[0], doc_id_limit);
+ context->compact(refs);
+}
+
+void
+HnswIndex::compact_link_arrays(bool compact_memory, bool compact_address_space)
+{
+ auto context = _graph.links.compactWorst(compact_memory, compact_address_space);
+ uint32_t doc_id_limit = _graph.node_refs.size();
+ for (uint32_t doc_id = 1; doc_id < doc_id_limit; ++doc_id) {
+ EntryRef level_ref = _graph.node_refs[doc_id].load_relaxed();
+ if (level_ref.valid()) {
+ vespalib::ArrayRef<AtomicEntryRef> refs(_graph.nodes.get_writable(level_ref));
+ context->compact(refs);
+ }
+ }
+}
+
+namespace {
+
+bool
+consider_compact_arrays(const CompactionStrategy& compaction_strategy, vespalib::MemoryUsage& memory_usage, vespalib::AddressSpace& address_space_usage, std::function<void(bool,bool)> compact_arrays)
+{
+ size_t used_bytes = memory_usage.usedBytes();
+ size_t dead_bytes = memory_usage.deadBytes();
+ bool compact_memory = compaction_strategy.should_compact_memory(used_bytes, dead_bytes);
+ size_t used_address_space = address_space_usage.used();
+ size_t dead_address_space = address_space_usage.dead();
+ bool compact_address_space = compaction_strategy.should_compact_address_space(used_address_space, dead_address_space);
+ if (compact_memory || compact_address_space) {
+ compact_arrays(compact_memory, compact_address_space);
+ return true;
+ }
+ return false;
+}
+
+}
+
+bool
+HnswIndex::consider_compact_level_arrays(const CompactionStrategy& compaction_strategy)
+{
+ return consider_compact_arrays(compaction_strategy, _cached_level_arrays_memory_usage, _cached_level_arrays_address_space_usage,
+ [this](bool compact_memory, bool compact_address_space)
+ { compact_level_arrays(compact_memory, compact_address_space); });
+}
+
+bool
+HnswIndex::consider_compact_link_arrays(const CompactionStrategy& compaction_strategy)
+{
+ return consider_compact_arrays(compaction_strategy, _cached_link_arrays_memory_usage, _cached_link_arrays_address_space_usage,
+ [this](bool compact_memory, bool compact_address_space)
+ { compact_link_arrays(compact_memory, compact_address_space); });
+}
+
+bool
+HnswIndex::consider_compact(const CompactionStrategy& compaction_strategy)
+{
+ bool result = false;
+ if (consider_compact_level_arrays(compaction_strategy)) {
+ result = true;
+ }
+ if (consider_compact_link_arrays(compaction_strategy)) {
+ result = true;
+ }
+ return result;
+}
+
+vespalib::MemoryUsage
+HnswIndex::update_stat()
+{
+ vespalib::MemoryUsage result;
+ result.merge(_graph.node_refs.getMemoryUsage());
+ _cached_level_arrays_memory_usage = _graph.nodes.getMemoryUsage();
+ _cached_level_arrays_address_space_usage = _graph.nodes.addressSpaceUsage();
+ result.merge(_cached_level_arrays_memory_usage);
+ _cached_link_arrays_memory_usage = _graph.links.getMemoryUsage();
+ _cached_link_arrays_address_space_usage = _graph.links.addressSpaceUsage();
+ result.merge(_cached_link_arrays_memory_usage);
+ result.merge(_visited_set_pool.memory_usage());
+ return result;
+}
+
vespalib::MemoryUsage
HnswIndex::memory_usage() const
{
@@ -526,6 +621,18 @@ HnswIndex::get_state(const vespalib::slime::Inserter& inserter) const
_cfg.neighbors_to_explore_at_construction());
}
+void
+HnswIndex::shrink_lid_space(uint32_t doc_id_limit)
+{
+ assert(doc_id_limit >= 1u);
+ assert(doc_id_limit >= _graph.node_refs_size.load(std::memory_order_relaxed));
+ uint32_t old_doc_id_limit = _graph.node_refs.size();
+ if (doc_id_limit >= old_doc_id_limit) {
+ return;
+ }
+ _graph.node_refs.shrink(doc_id_limit);
+}
+
std::unique_ptr<NearestNeighborIndexSaver>
HnswIndex::make_saver() const
{
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
index ef0f38c2263..6f6d213d6cc 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h
@@ -80,6 +80,10 @@ protected:
RandomLevelGenerator::UP _level_generator;
Config _cfg;
mutable vespalib::ReusableSetPool _visited_set_pool;
+ vespalib::MemoryUsage _cached_level_arrays_memory_usage;
+ vespalib::AddressSpace _cached_level_arrays_address_space_usage;
+ vespalib::MemoryUsage _cached_link_arrays_memory_usage;
+ vespalib::AddressSpace _cached_link_arrays_address_space_usage;
uint32_t max_links_for_level(uint32_t level) const;
void add_link_to(uint32_t docid, uint32_t level, const LinkArrayRef& old_links, uint32_t new_link) {
@@ -161,8 +165,15 @@ public:
void remove_document(uint32_t docid) override;
void transfer_hold_lists(generation_t current_gen) override;
void trim_hold_lists(generation_t first_used_gen) override;
+ void compact_level_arrays(bool compact_memory, bool compact_addreess_space);
+ void compact_link_arrays(bool compact_memory, bool compact_address_space);
+ bool consider_compact_level_arrays(const CompactionStrategy& compaction_strategy);
+ bool consider_compact_link_arrays(const CompactionStrategy& compaction_strategy);
+ bool consider_compact(const CompactionStrategy& compaction_strategy) override;
+ vespalib::MemoryUsage update_stat() override;
vespalib::MemoryUsage memory_usage() const override;
void get_state(const vespalib::slime::Inserter& inserter) const override;
+ void shrink_lid_space(uint32_t doc_id_limit) override;
std::unique_ptr<NearestNeighborIndexSaver> make_saver() const override;
bool load(const fileutil::LoadedBuffer& buf) override;
@@ -184,6 +195,7 @@ public:
void set_node(uint32_t docid, const HnswNode &node);
bool check_link_symmetry() const;
std::pair<uint32_t, bool> count_reachable_nodes() const;
+ HnswGraph& get_graph() { return _graph; }
static vespalib::datastore::ArrayStoreConfig make_default_node_store_config();
static vespalib::datastore::ArrayStoreConfig make_default_link_store_config();
diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp
index ac98b28d105..c0aec9ff91a 100644
--- a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp
@@ -38,7 +38,9 @@ HnswIndexLoader::load(const fileutil::LoadedBuffer& buf)
}
}
if (_failed) return false;
- _graph.node_refs.ensure_size(num_nodes);
+ _graph.node_refs.ensure_size(std::max(num_nodes, 1u));
+ _graph.node_refs_size.store(std::max(num_nodes, 1u), std::memory_order_release);
+ _graph.trim_node_refs_size();
auto entry_node_ref = _graph.get_node_ref(entry_docid);
_graph.set_entry_node({entry_docid, entry_node_ref, entry_level});
return true;
diff --git a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h
index fd37cf80720..0122738e173 100644
--- a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h
+++ b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h
@@ -14,7 +14,10 @@ namespace vespalib::slime { struct Inserter; }
namespace search::fileutil { class LoadedBuffer; }
-namespace search { class BitVector; }
+namespace search {
+class BitVector;
+class CompactionStrategy;
+}
namespace search::tensor {
@@ -59,8 +62,11 @@ public:
virtual void remove_document(uint32_t docid) = 0;
virtual void transfer_hold_lists(generation_t current_gen) = 0;
virtual void trim_hold_lists(generation_t first_used_gen) = 0;
+ virtual bool consider_compact(const CompactionStrategy& compaction_strategy) = 0;
+ virtual vespalib::MemoryUsage update_stat() = 0;
virtual vespalib::MemoryUsage memory_usage() const = 0;
virtual void get_state(const vespalib::slime::Inserter& inserter) const = 0;
+ virtual void shrink_lid_space(uint32_t doc_id_limit) = 0;
/**
* Creates a saver that is used to save the index to binary form.
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
index 485e6d27a1c..70bbab37e3e 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.cpp
@@ -11,10 +11,6 @@ namespace slobrok {
#pragma GCC diagnostic ignored "-Winline"
void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
- {
- std::vector<MUP> deleteAfterSwap;
- std::swap(deleteAfterSwap, _deleteList);
- }
std::vector<Event> todo;
std::swap(todo, _queue);
for (const auto & entry : todo) {
@@ -29,29 +25,57 @@ void LocalRpcMonitorMap::DelayedTasks::PerformTask() {
}
}
-LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor)
+LocalRpcMonitorMap::LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+ MappingMonitorFactory mappingMonitorFactory)
: _delayedTasks(supervisor.GetScheduler(), *this),
_map(),
_dispatcher(),
_history(),
- _supervisor(supervisor),
+ _mappingMonitor(mappingMonitorFactory(*this)),
_subscription(MapSubscription::subscribe(_dispatcher, _history))
{
}
LocalRpcMonitorMap::~LocalRpcMonitorMap() = default;
-LocalRpcMonitorMap::PerService *
-LocalRpcMonitorMap::lookup(ManagedRpcServer *rpcsrv) {
- auto iter = _map.find(rpcsrv->getName());
+LocalRpcMonitorMap::PerService &
+LocalRpcMonitorMap::lookup(const ServiceMapping &mapping) {
+ LOG(spam, "lookup %s->%s", mapping.name.c_str(), mapping.spec.c_str());
+ auto iter = _map.find(mapping.name);
if (iter == _map.end()) {
- return nullptr;
+ LOG_ABORT("not in map");
}
PerService & psd = iter->second;
- if (psd.srv.get() != rpcsrv) {
- return nullptr;
+ if (psd.spec != mapping.spec) {
+ LOG_ABORT("conflict in map: %s->%s");
}
- return &psd;
+ LOG(spam, "found in map: %s->%s [%s,%s]",
+ iter->first.c_str(), psd.spec.c_str(),
+ psd.up ? "up" : "down",
+ psd.localOnly ? "local" : "global");
+ return psd;
+}
+
+void LocalRpcMonitorMap::addToMap(const ServiceMapping &mapping, PerService psd) {
+ auto [ iter, was_inserted ] =
+ _map.try_emplace(mapping.name, std::move(psd));
+ LOG_ASSERT(was_inserted);
+ _mappingMonitor->start(mapping);
+}
+
+LocalRpcMonitorMap::RemovedData
+LocalRpcMonitorMap::removeFromMap(Map::iterator iter) {
+ auto name = iter->first;
+ PerService psd = std::move(iter->second);
+ ServiceMapping mapping{iter->first, psd.spec};
+ _mappingMonitor->stop(mapping);
+ _map.erase(iter);
+ return RemovedData {
+ .mapping = mapping,
+ .up = psd.up,
+ .localOnly = psd.localOnly,
+ .inflight = std::move(psd.inflight)
+ };
}
ServiceMapHistory & LocalRpcMonitorMap::history() {
@@ -66,7 +90,7 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
auto old = _map.find(mapping.name);
if (old != _map.end()) {
const PerService & exists = old->second;
- if (exists.spec() == mapping.spec) {
+ if (exists.spec == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
inflight->doneHandler(OkState(0, "already registered"));
@@ -74,13 +98,11 @@ void LocalRpcMonitorMap::addLocal(const ServiceMapping &mapping,
}
LOG(warning, "tried addLocal for mapping %s->%s, but already had conflicting mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str(),
- exists.name().c_str(), exists.spec().c_str());
+ mapping.name.c_str(), exists.spec.c_str());
inflight->doneHandler(OkState(FRTE_RPC_METHOD_FAILED, "conflict"));
return;
}
- auto [ iter, was_inserted ] =
- _map.try_emplace(mapping.name, localService(mapping, std::move(inflight)));
- LOG_ASSERT(was_inserted);
+ addToMap(mapping, localService(mapping, std::move(inflight)));
}
void LocalRpcMonitorMap::add(const ServiceMapping &mapping) {
@@ -97,41 +119,37 @@ void LocalRpcMonitorMap::doAdd(const ServiceMapping &mapping) {
auto old = _map.find(mapping.name);
if (old != _map.end()) {
PerService & exists = old->second;
- if (exists.spec() == mapping.spec) {
+ if (exists.spec == mapping.spec) {
LOG(debug, "added mapping %s->%s was already present",
mapping.name.c_str(), mapping.spec.c_str());
exists.localOnly = false;
return;
}
- PerService removed = std::move(exists);
- _map.erase(old);
+ auto removed = removeFromMap(old);
LOG(warning, "added mapping %s->%s, but already had conflicting mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str(),
- removed.name().c_str(), removed.spec().c_str());
+ removed.mapping.name.c_str(), removed.mapping.spec.c_str());
if (removed.inflight) {
auto target = std::move(removed.inflight);
target->doneHandler(OkState(13, "conflict during initialization"));
}
if (removed.up) {
- _dispatcher.remove(removed.mapping());
+ _dispatcher.remove(removed.mapping);
}
- _delayedTasks.deleteLater(std::move(removed.srv));
}
- auto [ iter, was_inserted ] =
- _map.try_emplace(mapping.name, globalService(mapping));
- LOG_ASSERT(was_inserted);
+ addToMap(mapping, globalService(mapping));
}
void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) {
auto iter = _map.find(mapping.name);
if (iter != _map.end()) {
- PerService removed = std::move(iter->second);
- _map.erase(iter);
- LOG(debug, "remove: mapping %s->%s", mapping.name.c_str(), mapping.spec.c_str());
- if (mapping.spec != removed.spec()) {
+ auto removed = removeFromMap(iter);
+ LOG(debug, "remove: mapping %s->%s",
+ removed.mapping.name.c_str(), removed.mapping.spec.c_str());
+ if (mapping.spec != removed.mapping.spec) {
LOG(warning, "inconsistent specs for name '%s': had '%s', but was asked to remove '%s'",
mapping.name.c_str(),
- removed.spec().c_str(),
+ removed.mapping.spec.c_str(),
mapping.spec.c_str());
}
if (removed.inflight) {
@@ -139,53 +157,44 @@ void LocalRpcMonitorMap::doRemove(const ServiceMapping &mapping) {
target->doneHandler(OkState(13, "removed during initialization"));
}
if (removed.up) {
- _dispatcher.remove(removed.mapping());
+ _dispatcher.remove(removed.mapping);
}
- _delayedTasks.deleteLater(std::move(removed.srv));
} else {
LOG(debug, "tried to remove non-existing mapping %s->%s",
mapping.name.c_str(), mapping.spec.c_str());
}
}
-void LocalRpcMonitorMap::notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string) {
- if (auto *psd = lookup(rpcsrv)) {
- LOG(debug, "failed: %s->%s", psd->name().c_str(), psd->spec().c_str());
- if (psd->inflight) {
- auto target = std::move(psd->inflight);
- target->doneHandler(OkState(13, "failed check using listNames callback"));
- }
- if (psd->localOnly) {
- PerService removed = std::move(*psd);
- auto iter = _map.find(removed.name());
- _map.erase(iter);
- if (removed.up) {
- _dispatcher.remove(removed.mapping());
- }
- _delayedTasks.deleteLater(std::move(removed.srv));
- } else if (psd->up) {
- psd->up = false;
- _dispatcher.remove(psd->mapping());
- }
+void LocalRpcMonitorMap::down(const ServiceMapping& mapping) {
+ PerService &psd = lookup(mapping);
+ LOG(debug, "failed: %s->%s", mapping.name.c_str(), psd.spec.c_str());
+ if (psd.inflight) {
+ auto target = std::move(psd.inflight);
+ target->doneHandler(OkState(13, "failed check using listNames callback"));
}
-}
-
-void LocalRpcMonitorMap::notifyOkRpcSrv(ManagedRpcServer *rpcsrv) {
- if (auto *psd = lookup(rpcsrv)) {
- LOG(debug, "ok: %s->%s", psd->name().c_str(), psd->spec().c_str());
- if (psd->inflight) {
- auto target = std::move(psd->inflight);
- target->doneHandler(OkState());
- }
- if (! psd->up) {
- psd->up = true;
- _dispatcher.add(psd->mapping());
+ if (psd.localOnly) {
+ auto iter = _map.find(mapping.name);
+ auto removed = removeFromMap(iter);
+ if (removed.up) {
+ _dispatcher.remove(removed.mapping);
}
+ } else if (psd.up) {
+ psd.up = false;
+ _dispatcher.remove(mapping);
}
}
-FRT_Supervisor * LocalRpcMonitorMap::getSupervisor() {
- return &_supervisor;
+void LocalRpcMonitorMap::up(const ServiceMapping& mapping) {
+ PerService &psd = lookup(mapping);
+ LOG(debug, "ok: %s->%s", mapping.name.c_str(), psd.spec.c_str());
+ if (psd.inflight) {
+ auto target = std::move(psd.inflight);
+ target->doneHandler(OkState());
+ }
+ if (! psd.up) {
+ psd.up = true;
+ _dispatcher.add(mapping);
+ }
}
} // namespace slobrok
diff --git a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
index 8961e21f386..8df2ca882de 100644
--- a/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
+++ b/slobrok/src/vespa/slobrok/server/local_rpc_monitor_map.h
@@ -2,10 +2,10 @@
#pragma once
#include "cmd.h"
-#include "i_rpc_server_manager.h"
#include "managed_rpc_server.h"
#include "map_listener.h"
#include "map_source.h"
+#include "mapping_monitor.h"
#include "named_service.h"
#include "proxy_map_source.h"
#include "service_map_history.h"
@@ -24,8 +24,8 @@ namespace slobrok {
* Tracks up/down status for name->spec combinations
* that are considered for publication locally.
**/
-class LocalRpcMonitorMap : public IRpcServerManager,
- public MapListener
+class LocalRpcMonitorMap : public MapListener,
+ public MappingMonitorOwner
{
private:
enum class EventType { ADD, REMOVE };
@@ -42,16 +42,9 @@ private:
};
class DelayedTasks : public FNET_Task {
- using MUP = std::unique_ptr<ManagedRpcServer>;
- std::vector<MUP> _deleteList;
std::vector<Event> _queue;
LocalRpcMonitorMap &_target;
public:
- void deleteLater(MUP rpcsrv) {
- _deleteList.emplace_back(std::move(rpcsrv));
- ScheduleNow();
- }
-
void handleLater(Event event) {
_queue.emplace_back(std::move(event));
ScheduleNow();
@@ -61,7 +54,6 @@ private:
DelayedTasks(FNET_Scheduler *scheduler, LocalRpcMonitorMap &target)
: FNET_Task(scheduler),
- _deleteList(),
_queue(),
_target(target)
{}
@@ -75,17 +67,9 @@ private:
bool up;
bool localOnly;
std::unique_ptr<ScriptCommand> inflight;
- std::unique_ptr<ManagedRpcServer> srv;
-
- vespalib::string name() const { return srv->getName(); }
- vespalib::string spec() const { return srv->getSpec(); }
- ServiceMapping mapping() const { return ServiceMapping{srv->getName(), srv->getSpec()}; }
+ vespalib::string spec;
};
- std::unique_ptr<ManagedRpcServer> managedFor(const ServiceMapping &mapping) {
- return std::make_unique<ManagedRpcServer>(mapping.name, mapping.spec, *this);
- }
-
PerService localService(const ServiceMapping &mapping,
std::unique_ptr<ScriptCommand> inflight)
{
@@ -93,7 +77,7 @@ private:
.up = false,
.localOnly = true,
.inflight = std::move(inflight),
- .srv = managedFor(mapping)
+ .spec = mapping.spec
};
}
@@ -102,7 +86,7 @@ private:
.up = false,
.localOnly = false,
.inflight = {},
- .srv = managedFor(mapping)
+ .spec = mapping.spec
};
}
@@ -111,16 +95,28 @@ private:
Map _map;
ProxyMapSource _dispatcher;
ServiceMapHistory _history;
- FRT_Supervisor &_supervisor;
+ MappingMonitor::UP _mappingMonitor;
std::unique_ptr<MapSubscription> _subscription;
- PerService *lookup(ManagedRpcServer *rpcsrv);
-
void doAdd(const ServiceMapping &mapping);
void doRemove(const ServiceMapping &mapping);
-
+
+ PerService & lookup(const ServiceMapping &mapping);
+
+ void addToMap(const ServiceMapping &mapping, PerService psd);
+
+ struct RemovedData {
+ ServiceMapping mapping;
+ bool up;
+ bool localOnly;
+ std::unique_ptr<ScriptCommand> inflight;
+ };
+
+ RemovedData removeFromMap(Map::iterator iter);
+
public:
- LocalRpcMonitorMap(FRT_Supervisor &_supervisor);
+ LocalRpcMonitorMap(FRT_Supervisor &supervisor,
+ MappingMonitorFactory mappingMonitorFactory);
~LocalRpcMonitorMap();
MapSource &dispatcher() { return _dispatcher; }
@@ -133,9 +129,8 @@ public:
void add(const ServiceMapping &mapping) override;
void remove(const ServiceMapping &mapping) override;
- void notifyFailedRpcSrv(ManagedRpcServer *rpcsrv, std::string errmsg) override;
- void notifyOkRpcSrv(ManagedRpcServer *rpcsrv) override;
- FRT_Supervisor *getSupervisor() override;
+ void up(const ServiceMapping& mapping) override;
+ void down(const ServiceMapping& mapping) override;
};
//-----------------------------------------------------------------------------
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index cf612a187bb..1f54716c29c 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -110,7 +110,10 @@ SBEnv::SBEnv(const ConfigShim &shim)
_health(),
_metrics(_rpcHooks, *_transport),
_components(),
- _localRpcMonitorMap(*_supervisor),
+ _localRpcMonitorMap(*_supervisor,
+ [this] (MappingMonitorOwner &owner) {
+ return std::make_unique<RpcMappingMonitor>(*_supervisor, owner);
+ }),
_rpcsrvmanager(*this),
_exchanger(*this, _rpcsrvmap),
_rpcsrvmap()
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.h b/slobrok/src/vespa/slobrok/server/sbenv.h
index 7bed910936f..44b7305814c 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.h
+++ b/slobrok/src/vespa/slobrok/server/sbenv.h
@@ -2,6 +2,7 @@
#pragma once
#include "named_service.h"
+#include "rpc_mapping_monitor.h"
#include "rpc_server_map.h"
#include "rpc_server_manager.h"
#include "remote_slobrok.h"
diff --git a/vespabase/src/common-env.sh b/vespabase/src/common-env.sh
index d5f8381432f..eb60154071c 100755
--- a/vespabase/src/common-env.sh
+++ b/vespabase/src/common-env.sh
@@ -292,30 +292,3 @@ log_debug_message () {
log_warning_message () {
log_message "warning" "$*" 1>&2
}
-
-get_numa_ctl_cmd () {
- if ! type numactl &> /dev/null; then
- echo "FATAL: Could not find required program numactl."
- exit 1
- fi
-
- numnodes=$(numactl --hardware 2>/dev/null |
- grep available |
- awk '$3 == "nodes" { print $2 }')
-
- if [ -n "$numanodes" ]; then
- # We are allowed to use numactl and have NUMA nodes
- if [ "$VESPA_AFFINITY_CPU_SOCKET" ] &&
- [ "$numnodes" -gt 1 ]
- then
- node=$(($VESPA_AFFINITY_CPU_SOCKET % $numnodes))
- numactlcmd="numactl --cpunodebind=$node --membind=$node"
- else
- numactlcmd="numactl --interleave all"
- fi
- else
- numactlcmd=""
- fi
-
- echo $numactlcmd
-}
diff --git a/vespabase/src/start-cbinaries.sh b/vespabase/src/start-cbinaries.sh
index 1ef45a71dec..f17829aa081 100755
--- a/vespabase/src/start-cbinaries.sh
+++ b/vespabase/src/start-cbinaries.sh
@@ -163,13 +163,29 @@ configure_vespa_malloc () {
fi
}
+configure_numa_ctl () {
+ numactl=""
+ if numactl --interleave all true &> /dev/null; then
+ # We are allowed to use numactl
+ numactl="numactl --interleave all"
+ if [ "$VESPA_AFFINITY_CPU_SOCKET" ]; then
+ numcpu=`numactl --hardware 2>/dev/null | grep available | cut -d' ' -f2`
+ if [ "$numcpu" ] && [ "$numcpu" -gt 1 ]; then
+ log_debug_message "Starting $0 with affinity $VESPA_AFFINITY_CPU_SOCKET out of $numcpu"
+ node=$(($VESPA_AFFINITY_CPU_SOCKET % $numcpu))
+ numactl="numactl --cpunodebind=$node --membind=$node"
+ fi
+ fi
+ fi
+}
+
configure_valgrind
configure_huge_pages
configure_use_madvise
configure_vespa_malloc
if $no_valgrind ; then
- numactl=$(get_numa_ctl_cmd)
+ configure_numa_ctl
ulimit -c unlimited
log_debug_message "Starting $0 with : " \
$numactl env LD_PRELOAD=$LD_PRELOAD $0-bin "$@"
diff --git a/vespalib/src/tests/datastore/array_store/array_store_test.cpp b/vespalib/src/tests/datastore/array_store/array_store_test.cpp
index 562ecaaecfa..0de9b83935f 100644
--- a/vespalib/src/tests/datastore/array_store/array_store_test.cpp
+++ b/vespalib/src/tests/datastore/array_store/array_store_test.cpp
@@ -18,8 +18,15 @@ using generation_t = vespalib::GenerationHandler::generation_t;
using MemStats = vespalib::datastore::test::MemStats;
using BufferStats = vespalib::datastore::test::BufferStats;
+namespace {
+
constexpr float ALLOC_GROW_FACTOR = 0.2;
+EntryRef as_entry_ref(const EntryRef& ref) noexcept { return ref; }
+EntryRef as_entry_ref(const AtomicEntryRef& ref) noexcept { return ref.load_relaxed(); }
+
+}
+
template <typename EntryT, typename RefT = EntryRefT<19> >
struct Fixture
{
@@ -115,19 +122,20 @@ struct Fixture
store.transferHoldLists(generation++);
store.trimHoldLists(generation);
}
+ template <typename TestedRefType>
void compactWorst(bool compactMemory, bool compactAddressSpace) {
ICompactionContext::UP ctx = store.compactWorst(compactMemory, compactAddressSpace);
- std::vector<EntryRef> refs;
+ std::vector<TestedRefType> refs;
for (auto itr = refStore.begin(); itr != refStore.end(); ++itr) {
- refs.push_back(itr->first);
+ refs.emplace_back(itr->first);
}
- std::vector<EntryRef> compactedRefs = refs;
- ctx->compact(ArrayRef<EntryRef>(compactedRefs));
+ std::vector<TestedRefType> compactedRefs = refs;
+ ctx->compact(ArrayRef<TestedRefType>(compactedRefs));
ReferenceStore compactedRefStore;
for (size_t i = 0; i < refs.size(); ++i) {
- ASSERT_EQUAL(0u, compactedRefStore.count(compactedRefs[i]));
- ASSERT_EQUAL(1u, refStore.count(refs[i]));
- compactedRefStore.insert(std::make_pair(compactedRefs[i], refStore[refs[i]]));
+ ASSERT_EQUAL(0u, compactedRefStore.count(as_entry_ref(compactedRefs[i])));
+ ASSERT_EQUAL(1u, refStore.count(as_entry_ref(refs[i])));
+ compactedRefStore.insert(std::make_pair(as_entry_ref(compactedRefs[i]), refStore[as_entry_ref(refs[i])]));
}
refStore = compactedRefStore;
}
@@ -252,7 +260,11 @@ TEST_F("require that new underlying buffer is allocated when current is full", S
TEST_DO(f.assertStoreContent());
}
-TEST_F("require that the buffer with most dead space is compacted", NumberFixture(2))
+namespace {
+
+template <typename TestedRefType>
+void
+test_compaction(NumberFixture &f)
{
EntryRef size1Ref = f.add({1});
EntryRef size2Ref = f.add({2,2});
@@ -267,7 +279,7 @@ TEST_F("require that the buffer with most dead space is compacted", NumberFixtur
uint32_t size3BufferId = f.getBufferId(size3Ref);
EXPECT_EQUAL(3u, f.refStore.size());
- f.compactWorst(true, false);
+ f.compactWorst<TestedRefType>(true, false);
EXPECT_EQUAL(3u, f.refStore.size());
f.assertStoreContent();
@@ -281,6 +293,18 @@ TEST_F("require that the buffer with most dead space is compacted", NumberFixtur
EXPECT_TRUE(f.store.bufferState(size2Ref).isFree());
}
+}
+
+TEST_F("require that the buffer with most dead space is compacted (EntryRef vector)", NumberFixture(2))
+{
+ test_compaction<EntryRef>(f);
+}
+
+TEST_F("require that the buffer with most dead space is compacted (AtomicEntryRef vector)", NumberFixture(2))
+{
+ test_compaction<AtomicEntryRef>(f);
+}
+
namespace {
void testCompaction(NumberFixture &f, bool compactMemory, bool compactAddressSpace)
@@ -300,7 +324,7 @@ void testCompaction(NumberFixture &f, bool compactMemory, bool compactAddressSpa
uint32_t size3BufferId = f.getBufferId(size3Ref);
EXPECT_EQUAL(3u, f.refStore.size());
- f.compactWorst(compactMemory, compactAddressSpace);
+ f.compactWorst<EntryRef>(compactMemory, compactAddressSpace);
EXPECT_EQUAL(3u, f.refStore.size());
f.assertStoreContent();
diff --git a/vespalib/src/vespa/vespalib/datastore/array_store.hpp b/vespalib/src/vespa/vespalib/datastore/array_store.hpp
index 5409c21594c..29db72c2ed3 100644
--- a/vespalib/src/vespa/vespalib/datastore/array_store.hpp
+++ b/vespalib/src/vespa/vespalib/datastore/array_store.hpp
@@ -157,6 +157,20 @@ public:
}
}
}
+ void compact(vespalib::ArrayRef<AtomicEntryRef> refs) override {
+ if (!_bufferIdsToCompact.empty()) {
+ for (auto &ref : refs) {
+ if (ref.load_relaxed().valid()) {
+ RefT internalRef(ref.load_relaxed());
+ if (compactingBuffer(internalRef.bufferId())) {
+ EntryRef newRef = _store.add(_store.get(ref.load_relaxed()));
+ std::atomic_thread_fence(std::memory_order_release);
+ ref.store_release(newRef);
+ }
+ }
+ }
+ }
+ }
};
}
diff --git a/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h b/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h
index 72b473adf4a..291d751082a 100644
--- a/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h
+++ b/vespalib/src/vespa/vespalib/datastore/i_compaction_context.h
@@ -16,6 +16,7 @@ struct ICompactionContext {
using UP = std::unique_ptr<ICompactionContext>;
virtual ~ICompactionContext() {}
virtual void compact(vespalib::ArrayRef<EntryRef> refs) = 0;
+ virtual void compact(vespalib::ArrayRef<AtomicEntryRef> refs) = 0;
};
}