diff options
Diffstat (limited to 'searchlib')
59 files changed, 2123 insertions, 450 deletions
diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt index aaf8f91387e..a76baeced04 100644 --- a/searchlib/CMakeLists.txt +++ b/searchlib/CMakeLists.txt @@ -90,6 +90,7 @@ vespa_define_module( src/tests/attribute/postinglist src/tests/attribute/postinglistattribute src/tests/attribute/reference_attribute + src/tests/attribute/save_target src/tests/attribute/searchable src/tests/attribute/searchcontext src/tests/attribute/sourceselector @@ -214,6 +215,7 @@ vespa_define_module( src/tests/tensor/dense_tensor_store src/tests/tensor/distance_functions src/tests/tensor/hnsw_index + src/tests/tensor/hnsw_saver src/tests/transactionlog src/tests/transactionlogstress src/tests/true diff --git a/searchlib/src/tests/attribute/enum_attribute_compaction/enum_attribute_compaction_test.cpp b/searchlib/src/tests/attribute/enum_attribute_compaction/enum_attribute_compaction_test.cpp index 31af5945337..45d432c29be 100644 --- a/searchlib/src/tests/attribute/enum_attribute_compaction/enum_attribute_compaction_test.cpp +++ b/searchlib/src/tests/attribute/enum_attribute_compaction/enum_attribute_compaction_test.cpp @@ -221,7 +221,7 @@ TEST_P(IntegerCompactionTest, compact) test_enum_store_compaction(); } -INSTANTIATE_TEST_CASE_P(IntegerCompactionTestSet, IntegerCompactionTest, ::testing::Values(CollectionType::SINGLE, CollectionType::ARRAY, CollectionType::WSET)); +VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(IntegerCompactionTestSet, IntegerCompactionTest, ::testing::Values(CollectionType::SINGLE, CollectionType::ARRAY, CollectionType::WSET)); using StringCompactionTest = CompactionTest<StringAttribute>; @@ -230,6 +230,6 @@ TEST_P(StringCompactionTest, compact) test_enum_store_compaction(); } -INSTANTIATE_TEST_CASE_P(StringCompactionTestSet, StringCompactionTest, ::testing::Values(CollectionType::SINGLE, CollectionType::ARRAY, CollectionType::WSET)); +VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(StringCompactionTestSet, StringCompactionTest, ::testing::Values(CollectionType::SINGLE, CollectionType::ARRAY, CollectionType::WSET)); GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchlib/src/tests/attribute/enumeratedsave/enumeratedsave_test.cpp b/searchlib/src/tests/attribute/enumeratedsave/enumeratedsave_test.cpp index bf829f6607a..41313fc7c53 100644 --- a/searchlib/src/tests/attribute/enumeratedsave/enumeratedsave_test.cpp +++ b/searchlib/src/tests/attribute/enumeratedsave/enumeratedsave_test.cpp @@ -108,6 +108,17 @@ public: } IAttributeFileWriter &udatWriter() override { return _udatWriter; } + bool setup_writer(const vespalib::string& file_suffix, + const vespalib::string& desc) override { + (void) file_suffix; + (void) desc; + abort(); + } + IAttributeFileWriter& get_writer(const vespalib::string& file_suffix) override { + (void) file_suffix; + abort(); + } + bool bufEqual(const Buffer &lhs, const Buffer &rhs) const; bool operator==(const MemAttr &rhs) const; diff --git a/searchlib/src/tests/attribute/enumstore/enumstore_test.cpp b/searchlib/src/tests/attribute/enumstore/enumstore_test.cpp index 3a885dda233..43e694f0bcd 100644 --- a/searchlib/src/tests/attribute/enumstore/enumstore_test.cpp +++ b/searchlib/src/tests/attribute/enumstore/enumstore_test.cpp @@ -72,7 +72,7 @@ public: #endif using FloatEnumStoreTestTypes = ::testing::Types<FloatEnumStore, DoubleEnumStore>; -TYPED_TEST_CASE(FloatEnumStoreTest, FloatEnumStoreTestTypes); +VESPA_GTEST_TYPED_TEST_SUITE(FloatEnumStoreTest, FloatEnumStoreTestTypes); TYPED_TEST(FloatEnumStoreTest, numbers_can_be_inserted_and_retrieved) { @@ -452,7 +452,7 @@ LoaderTest<StringEnumStore>::load_values(enumstore::EnumeratedLoaderBase& loader #endif using LoaderTestTypes = ::testing::Types<NumericEnumStore, FloatEnumStore, StringEnumStore>; -TYPED_TEST_CASE(LoaderTest, LoaderTestTypes); +VESPA_GTEST_TYPED_TEST_SUITE(LoaderTest, LoaderTestTypes); TYPED_TEST(LoaderTest, store_is_instantiated_with_enumerated_loader) { diff --git a/searchlib/src/tests/attribute/save_target/CMakeLists.txt b/searchlib/src/tests/attribute/save_target/CMakeLists.txt new file mode 100644 index 00000000000..e127f66579e --- /dev/null +++ b/searchlib/src/tests/attribute/save_target/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchlib_attribute_save_target_test_app TEST + SOURCES + attribute_save_target_test.cpp + DEPENDS + searchlib + gtest +) +vespa_add_test(NAME searchlib_attribute_save_target_test_app COMMAND searchlib_attribute_save_target_test_app) diff --git a/searchlib/src/tests/attribute/save_target/attribute_save_target_test.cpp b/searchlib/src/tests/attribute/save_target/attribute_save_target_test.cpp new file mode 100644 index 00000000000..c746a0aa120 --- /dev/null +++ b/searchlib/src/tests/attribute/save_target/attribute_save_target_test.cpp @@ -0,0 +1,148 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchlib/attribute/attributefilesavetarget.h> +#include <vespa/searchlib/attribute/attributememorysavetarget.h> +#include <vespa/searchlib/common/tunefileinfo.h> +#include <vespa/searchlib/index/dummyfileheadercontext.h> +#include <vespa/searchlib/test/directory_handler.h> +#include <vespa/searchlib/util/fileutil.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/bufferwriter.h> +#include <vespa/vespalib/util/exceptions.h> + +#include <vespa/log/log.h> +LOG_SETUP("attribute_save_target_test"); + +using namespace search; +using namespace search::attribute; + +using search::index::DummyFileHeaderContext; +using search::test::DirectoryHandler; + +const vespalib::string test_dir = "test_data/"; + +class SaveTargetTest : public ::testing::Test { +public: + DirectoryHandler dir_handler; + TuneFileAttributes tune_file; + DummyFileHeaderContext file_header_ctx; + IAttributeSaveTarget& target; + vespalib::string base_file_name; + + SaveTargetTest(IAttributeSaveTarget& target_in) + : dir_handler(test_dir), + tune_file(), + file_header_ctx(), + target(target_in), + base_file_name(test_dir + "test_file") + { + } + ~SaveTargetTest() {} + void set_header(const vespalib::string& file_name) { + target.setHeader(AttributeHeader(file_name)); + } + IAttributeFileWriter& setup_writer(const vespalib::string& file_suffix, + const vespalib::string& desc) { + bool res = target.setup_writer(file_suffix, desc); + assert(res); + return target.get_writer(file_suffix); + } + void setup_writer_and_fill(const vespalib::string& file_suffix, + const vespalib::string& desc, + int value) { + auto& writer = setup_writer(file_suffix, desc); + auto buf = writer.allocBufferWriter(); + buf->write(&value, sizeof(int)); + buf->flush(); + } + void validate_loaded_file(const vespalib::string& file_suffix, + const vespalib::string& exp_desc, + int exp_value) + { + vespalib::string file_name = base_file_name + "." + file_suffix; + EXPECT_TRUE(vespalib::fileExists(file_name)); + auto loaded = FileUtil::loadFile(file_name); + EXPECT_FALSE(loaded->empty()); + + const auto& header = loaded->getHeader(); + EXPECT_EQ(file_name, header.getTag("fileName").asString()); + EXPECT_EQ(exp_desc, header.getTag("desc").asString()); + + EXPECT_EQ(sizeof(int), loaded->size()); + int act_value = (reinterpret_cast<const int*>(loaded->buffer()))[0]; + EXPECT_EQ(exp_value, act_value); + } +}; + +class FileSaveTargetTest : public SaveTargetTest { +public: + AttributeFileSaveTarget file_target; + + FileSaveTargetTest() + : SaveTargetTest(file_target), + file_target(tune_file, file_header_ctx) + { + set_header(base_file_name); + } +}; + +TEST_F(FileSaveTargetTest, can_setup_and_return_writers) +{ + setup_writer_and_fill("my1", "desc 1", 123); + setup_writer_and_fill("my2", "desc 2", 456); + target.close(); + + validate_loaded_file("my1", "desc 1", 123); + validate_loaded_file("my2", "desc 2", 456); +} + +TEST_F(FileSaveTargetTest, setup_fails_if_writer_already_exists) +{ + setup_writer("my", "my desc"); + EXPECT_FALSE(target.setup_writer("my", "my desc")); +} + +TEST_F(FileSaveTargetTest, get_throws_if_writer_does_not_exists) +{ + EXPECT_THROW(target.get_writer("na"), vespalib::IllegalArgumentException); +} + +class MemorySaveTargetTest : public SaveTargetTest { +public: + AttributeMemorySaveTarget memory_target; + + MemorySaveTargetTest() + : SaveTargetTest(memory_target), + memory_target() + { + set_header(base_file_name); + } + void write_to_file() { + bool res = memory_target.writeToFile(tune_file, file_header_ctx); + ASSERT_TRUE(res); + } +}; + +TEST_F(MemorySaveTargetTest, can_setup_and_return_writers) +{ + setup_writer_and_fill("my1", "desc 1", 123); + setup_writer_and_fill("my2", "desc 2", 456); + write_to_file(); + + validate_loaded_file("my1", "desc 1", 123); + validate_loaded_file("my2", "desc 2", 456); +} + +TEST_F(MemorySaveTargetTest, setup_fails_if_writer_already_exists) +{ + setup_writer("my", "my desc"); + EXPECT_FALSE(target.setup_writer("my", "my desc")); +} + +TEST_F(MemorySaveTargetTest, get_throws_if_writer_does_not_exists) +{ + EXPECT_THROW(target.get_writer("na"), vespalib::IllegalArgumentException); +} + +GTEST_MAIN_RUN_ALL_TESTS() + diff --git a/searchlib/src/tests/attribute/tensorattribute/CMakeLists.txt b/searchlib/src/tests/attribute/tensorattribute/CMakeLists.txt index 3794fd88fc3..44ff45d02d3 100644 --- a/searchlib/src/tests/attribute/tensorattribute/CMakeLists.txt +++ b/searchlib/src/tests/attribute/tensorattribute/CMakeLists.txt @@ -5,5 +5,4 @@ vespa_add_executable(searchlib_tensorattribute_test_app TEST DEPENDS searchlib ) -vespa_add_test(NAME searchlib_tensorattribute_test_app COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/tensorattribute_test.sh - DEPENDS searchlib_tensorattribute_test_app) +vespa_add_test(NAME searchlib_tensorattribute_test_app COMMAND searchlib_tensorattribute_test_app) diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp index d9a4431f89b..39a7e53ca8c 100644 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp +++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp @@ -5,8 +5,8 @@ #include <vespa/eval/tensor/dense/dense_tensor.h> #include <vespa/eval/tensor/tensor.h> #include <vespa/fastos/file.h> -#include <vespa/searchlib/attribute/attributeguard.h> #include <vespa/searchlib/attribute/attribute_read_guard.h> +#include <vespa/searchlib/attribute/attributeguard.h> #include <vespa/searchlib/tensor/default_nearest_neighbor_index_factory.h> #include <vespa/searchlib/tensor/dense_tensor_attribute.h> #include <vespa/searchlib/tensor/doc_vector_access.h> @@ -14,11 +14,15 @@ #include <vespa/searchlib/tensor/hnsw_index.h> #include <vespa/searchlib/tensor/nearest_neighbor_index.h> #include <vespa/searchlib/tensor/nearest_neighbor_index_factory.h> +#include <vespa/searchlib/tensor/nearest_neighbor_index_saver.h> #include <vespa/searchlib/tensor/tensor_attribute.h> +#include <vespa/searchlib/test/directory_handler.h> +#include <vespa/searchlib/util/fileutil.h> #include <vespa/vespalib/data/fileheader.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/test/insertion_operators.h> #include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/bufferwriter.h> #include <vespa/log/log.h> LOG_SETUP("tensorattribute_test"); @@ -33,8 +37,10 @@ using search::tensor::DenseTensorAttribute; using search::tensor::DocVectorAccess; using search::tensor::GenericTensorAttribute; using search::tensor::HnswIndex; +using search::tensor::HnswNode; using search::tensor::NearestNeighborIndex; using search::tensor::NearestNeighborIndexFactory; +using search::tensor::NearestNeighborIndexSaver; using search::tensor::TensorAttribute; using vespalib::eval::TensorSpec; using vespalib::eval::ValueType; @@ -75,6 +81,18 @@ vec_2d(double x0, double x1) return TensorSpec(vec_2d_spec).add({{"x", 0}}, x0).add({{"x", 1}}, x1); } +class MockIndexSaver : public NearestNeighborIndexSaver { +private: + int _index_value; + +public: + MockIndexSaver(int index_value) : _index_value(index_value) {} + void save(search::BufferWriter& writer) const override { + writer.write(&_index_value, sizeof(int)); + writer.flush(); + } +}; + class MockNearestNeighborIndex : public NearestNeighborIndex { private: using Entry = std::pair<uint32_t, DoubleVector>; @@ -86,6 +104,7 @@ private: generation_t _transfer_gen; generation_t _trim_gen; mutable size_t _memory_usage_cnt; + int _index_value; public: MockNearestNeighborIndex(const DocVectorAccess& vectors) @@ -94,13 +113,20 @@ public: _removes(), _transfer_gen(std::numeric_limits<generation_t>::max()), _trim_gen(std::numeric_limits<generation_t>::max()), - _memory_usage_cnt(0) + _memory_usage_cnt(0), + _index_value(0) { } void clear() { _adds.clear(); _removes.clear(); } + int get_index_value() const { + return _index_value; + } + void save_index_with_value(int value) { + _index_value = value; + } void expect_empty_add() const { EXPECT_TRUE(_adds.empty()); } @@ -143,6 +169,17 @@ public: return vespalib::MemoryUsage(); } void get_state(const vespalib::slime::Inserter&) const override {} + std::unique_ptr<NearestNeighborIndexSaver> make_saver() const override { + if (_index_value != 0) { + return std::make_unique<MockIndexSaver>(_index_value); + } + return std::unique_ptr<NearestNeighborIndexSaver>(); + } + bool load(const search::fileutil::LoadedBuffer& buf) override { + ASSERT_EQUAL(sizeof(int), buf.size()); + _index_value = (reinterpret_cast<const int*>(buf.buffer()))[0]; + return true; + } std::vector<Neighbor> find_top_k(uint32_t k, vespalib::tensor::TypedCells vector, uint32_t explore_k) const override { (void) k; (void) vector; @@ -166,12 +203,15 @@ class MockNearestNeighborIndexFactory : public NearestNeighborIndexFactory { } }; -struct Fixture -{ +const vespalib::string test_dir = "test_data/"; +const vespalib::string attr_name = test_dir + "my_attr"; + +struct Fixture { using BasicType = search::attribute::BasicType; using CollectionType = search::attribute::CollectionType; using Config = search::attribute::Config; + search::test::DirectoryHandler _dir_handler; Config _cfg; vespalib::string _name; vespalib::string _typeSpec; @@ -185,8 +225,9 @@ struct Fixture bool useDenseTensorAttribute = false, bool enable_hnsw_index = false, bool use_mock_index = false) - : _cfg(BasicType::TENSOR, CollectionType::SINGLE), - _name("test"), + : _dir_handler(test_dir), + _cfg(BasicType::TENSOR, CollectionType::SINGLE), + _name(attr_name), _typeSpec(typeSpec), _index_factory(std::make_unique<DefaultNearestNeighborIndexFactory>()), _tensorAttr(), @@ -225,11 +266,20 @@ struct Fixture return *result; } - MockNearestNeighborIndex& mock_index() { + template <typename IndexType> + IndexType& get_nearest_neighbor_index() { assert(as_dense_tensor().nearest_neighbor_index() != nullptr); - auto mock_index = dynamic_cast<const MockNearestNeighborIndex*>(as_dense_tensor().nearest_neighbor_index()); - assert(mock_index != nullptr); - return *const_cast<MockNearestNeighborIndex*>(mock_index); + auto index = dynamic_cast<const IndexType*>(as_dense_tensor().nearest_neighbor_index()); + assert(index != nullptr); + return *const_cast<IndexType*>(index); + } + + HnswIndex& hnsw_index() { + return get_nearest_neighbor_index<HnswIndex>(); + } + + MockNearestNeighborIndex& mock_index() { + return get_nearest_neighbor_index<MockNearestNeighborIndex>(); } void ensureSpace(uint32_t docId) { @@ -322,7 +372,6 @@ struct Fixture void testEmptyTensor(); }; - void Fixture::testEmptyAttribute() { @@ -383,7 +432,6 @@ Fixture::testSaveLoad() TEST_DO(assertGetNoTensor(2)); } - void Fixture::testCompaction() { @@ -438,7 +486,8 @@ Fixture::testTensorTypeFileHeaderTag() vespalib::FileHeader header; FastOS_File file; - EXPECT_TRUE(file.OpenReadOnly("test.dat")); + vespalib::string file_name = attr_name + ".dat"; + EXPECT_TRUE(file.OpenReadOnly(file_name.c_str())); (void) header.readFile(file); file.Close(); EXPECT_TRUE(header.hasTag("tensortype")); @@ -450,7 +499,6 @@ Fixture::testTensorTypeFileHeaderTag() } } - void Fixture::testEmptyTensor() { @@ -465,7 +513,6 @@ Fixture::testEmptyTensor() } } - template <class MakeFixture> void testAll(MakeFixture &&f) { @@ -499,21 +546,49 @@ TEST_F("Hnsw index is NOT instantiated in dense tensor attribute by default", EXPECT_TRUE(tensor.nearest_neighbor_index() == nullptr); } -TEST_F("Hnsw index is instantiated in dense tensor attribute when specified in config", - Fixture(vec_2d_spec, true, true)) +class DenseTensorAttributeHnswIndex : public Fixture { +public: + DenseTensorAttributeHnswIndex() : Fixture(vec_2d_spec, true, true, false) {} +}; + +TEST_F("Hnsw index is instantiated in dense tensor attribute when specified in config", DenseTensorAttributeHnswIndex) { - const auto& tensor = f.as_dense_tensor(); - ASSERT_TRUE(tensor.nearest_neighbor_index() != nullptr); - auto hnsw_index = dynamic_cast<const HnswIndex*>(tensor.nearest_neighbor_index()); - ASSERT_TRUE(hnsw_index != nullptr); + auto& index = f.hnsw_index(); - const auto& cfg = hnsw_index->config(); + const auto& cfg = index.config(); EXPECT_EQUAL(8u, cfg.max_links_at_level_0()); EXPECT_EQUAL(4u, cfg.max_links_on_inserts()); EXPECT_EQUAL(20u, cfg.neighbors_to_explore_at_construction()); EXPECT_TRUE(cfg.heuristic_select_neighbors()); } +void +expect_level_0(uint32_t exp_docid, const HnswNode& node) +{ + ASSERT_GREATER_EQUAL(node.size(), 1u); + ASSERT_EQUAL(1u, node.level(0).size()); + EXPECT_EQUAL(exp_docid, node.level(0)[0]); +} + +TEST_F("Hnsw index is integrated in dense tensor attribute and can be saved and loaded", DenseTensorAttributeHnswIndex) +{ + // Set two points that will be linked together in level 0 of the hnsw graph. + f.set_tensor(1, vec_2d(3, 5)); + f.set_tensor(2, vec_2d(7, 9)); + + auto &index_a = f.hnsw_index(); + expect_level_0(2, index_a.get_node(1)); + expect_level_0(1, index_a.get_node(2)); + f.save(); + EXPECT_TRUE(vespalib::fileExists(attr_name + ".nnidx")); + + f.load(); + auto &index_b = f.hnsw_index(); + EXPECT_NOT_EQUAL(&index_a, &index_b); + expect_level_0(2, index_b.get_node(1)); + expect_level_0(1, index_b.get_node(2)); +} + class DenseTensorAttributeMockIndex : public Fixture { public: DenseTensorAttributeMockIndex() : Fixture(vec_2d_spec, true, true, true) {} @@ -551,17 +626,6 @@ TEST_F("clearDoc() updates nearest neighbor index", DenseTensorAttributeMockInde index.expect_empty_add(); } -TEST_F("onLoad() updates nearest neighbor index", DenseTensorAttributeMockIndex) -{ - f.set_tensor(1, vec_2d(3, 5)); - f.set_tensor(2, vec_2d(7, 9)); - f.save(); - f.load(); - auto& index = f.mock_index(); - index.expect_adds({{1, {3, 5}}, {2, {7, 9}}}); -} - - TEST_F("commit() ensures transfer and trim hold lists on nearest neighbor index", DenseTensorAttributeMockIndex) { auto& index = f.mock_index(); @@ -598,4 +662,32 @@ TEST_F("Memory usage is extracted from index when updating stats on attribute", EXPECT_EQUAL(before + 1, after); } -TEST_MAIN() { TEST_RUN_ALL(); vespalib::unlink("test.dat"); } +TEST_F("Nearest neighbor index can be saved to disk and then loaded from file", DenseTensorAttributeMockIndex) +{ + f.set_tensor(1, vec_2d(3, 5)); + f.set_tensor(2, vec_2d(7, 9)); + f.mock_index().save_index_with_value(123); + f.save(); + EXPECT_TRUE(vespalib::fileExists(attr_name + ".nnidx")); + + f.load(); // index is loaded from saved file + auto& index = f.mock_index(); + EXPECT_EQUAL(123, index.get_index_value()); + index.expect_adds({}); +} + +TEST_F("onLoad() reconstructs nearest neighbor index if save file does not exists", DenseTensorAttributeMockIndex) +{ + f.set_tensor(1, vec_2d(3, 5)); + f.set_tensor(2, vec_2d(7, 9)); + f.save(); + EXPECT_FALSE(vespalib::fileExists(attr_name + ".nnidx")); + + f.load(); // index is reconstructed by adding all loaded tensors + auto& index = f.mock_index(); + EXPECT_EQUAL(0, index.get_index_value()); + index.expect_adds({{1, {3, 5}}, {2, {7, 9}}}); +} + +TEST_MAIN() { TEST_RUN_ALL(); } + diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.sh b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.sh deleted file mode 100755 index dd9399dea78..00000000000 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -set -e -$VALGRIND ./searchlib_tensorattribute_test_app -rm -rf *.dat diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt b/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt index 6c593d20683..fd6cd9efc43 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt +++ b/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt @@ -13,3 +13,11 @@ vespa_add_executable(searchlib_sequencedtaskexecutor_test_app TEST searchlib ) vespa_add_test(NAME searchlib_sequencedtaskexecutor_test_app COMMAND searchlib_sequencedtaskexecutor_test_app) + +vespa_add_executable(searchlib_adaptive_sequenced_executor_test_app TEST + SOURCES + adaptive_sequenced_executor_test.cpp + DEPENDS + searchlib +) +vespa_add_test(NAME searchlib_adaptive_sequenced_executor_test_app COMMAND searchlib_adaptive_sequenced_executor_test_app) diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp new file mode 100644 index 00000000000..ba66b28108c --- /dev/null +++ b/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp @@ -0,0 +1,251 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchlib/common/adaptive_sequenced_executor.h> +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/test/insertion_operators.h> + +#include <mutex> +#include <condition_variable> +#include <unistd.h> + +#include <vespa/log/log.h> +LOG_SETUP("adaptive_sequenced_executor_test"); + +namespace search::common { + + +class Fixture +{ +public: + AdaptiveSequencedExecutor _threads; + + Fixture() : _threads(2, 2, 0, 1000) { } +}; + + +class TestObj +{ +public: + std::mutex _m; + std::condition_variable _cv; + int _done; + int _fail; + int _val; + + TestObj() + : _m(), + _cv(), + _done(0), + _fail(0), + _val(0) + { + } + + void + modify(int oldValue, int newValue) + { + { + std::lock_guard<std::mutex> guard(_m); + if (_val == oldValue) { + _val = newValue; + } else { + ++_fail; + } + ++_done; + } + _cv.notify_all(); + } + + void + wait(int wantDone) + { + std::unique_lock<std::mutex> guard(_m); + _cv.wait(guard, [&] { return this->_done >= wantDone; }); + } +}; + +TEST_F("testExecute", Fixture) { + std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(1, [&]() { tv->modify(0, 42); }); + tv->wait(1); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + + +TEST_F("require that task with same component id are serialized", Fixture) +{ + std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(0, [&]() { tv->modify(14, 42); }); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + +TEST_F("require that task with different component ids are not serialized", Fixture) +{ + int tryCnt = 0; + for (tryCnt = 0; tryCnt < 100; ++tryCnt) { + std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(2, [&]() { tv->modify(14, 42); }); + tv->wait(2); + if (tv->_fail != 1) { + continue; + } + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + break; + } + EXPECT_TRUE(tryCnt < 100); +} + + +TEST_F("require that task with same string component id are serialized", Fixture) +{ + std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); + EXPECT_EQUAL(0, tv->_val); + auto test2 = [&]() { tv->modify(14, 42); }; + f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorId("0"), test2); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + +namespace { + +int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) +{ + int tryCnt = 0; + for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { + std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); }); + tv->wait(2); + if (tv->_fail != 1) { + continue; + } + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + break; + } + return tryCnt; +} + +vespalib::string makeAltComponentId(Fixture &f) +{ + int tryCnt = 0; + char altComponentId[20]; + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); + for (tryCnt = 1; tryCnt < 100; ++tryCnt) { + sprintf(altComponentId, "%d", tryCnt); + if (f._threads.getExecutorId(altComponentId) == executorId0) { + break; + } + } + EXPECT_TRUE(tryCnt < 100); + return altComponentId; +} + +} + +TEST_F("require that task with different string component ids are not serialized", Fixture) +{ + int tryCnt = detectSerializeFailure(f, "2", 100); + EXPECT_TRUE(tryCnt < 100); +} + + +TEST_F("require that task with different string component ids mapping to the same executor id are serialized", + Fixture) +{ + vespalib::string altComponentId = makeAltComponentId(f); + LOG(info, "second string component id is \"%s\"", altComponentId.c_str()); + int tryCnt = detectSerializeFailure(f, altComponentId, 100); + EXPECT_TRUE(tryCnt == 100); +} + + +TEST_F("require that execute works with const lambda", Fixture) +{ + int i = 5; + std::vector<int> res; + const auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + f._threads.execute(0, lambda); + f._threads.execute(0, lambda); + f._threads.sync(); + std::vector<int> exp({5, 4, 5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST_F("require that execute works with reference to lambda", Fixture) +{ + int i = 5; + std::vector<int> res; + auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + auto &lambdaref = lambda; + f._threads.execute(0, lambdaref); + f._threads.execute(0, lambdaref); + f._threads.sync(); + std::vector<int> exp({5, 4, 5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST_F("require that executeLambda works", Fixture) +{ + int i = 5; + std::vector<int> res; + const auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); + f._threads.sync(); + std::vector<int> exp({5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST("require that you get correct number of executors") { + AdaptiveSequencedExecutor seven(7, 1, 0, 10); + EXPECT_EQUAL(7u, seven.getNumExecutors()); +} + +TEST("require that you distribute well") { + AdaptiveSequencedExecutor seven(7, 1, 0, 10); + EXPECT_EQUAL(7u, seven.getNumExecutors()); + EXPECT_EQUAL(97u, seven.getComponentHashSize()); + EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize()); + for (uint32_t id=0; id < 1000; id++) { + EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId()); + } + EXPECT_EQUAL(97u, seven.getComponentHashSize()); + EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize()); +} + +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index 9491617c135..362dc28d36a 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -1,30 +1,70 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/searchlib/common/sequencedtaskexecutor.h> +#include <vespa/searchlib/common/adaptive_sequenced_executor.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/time.h> #include <atomic> +using search::ISequencedTaskExecutor; using search::SequencedTaskExecutor; +using search::AdaptiveSequencedExecutor; using ExecutorId = search::ISequencedTaskExecutor::ExecutorId; -int main(int argc, char *argv[]) { - unsigned long numTasks = 1000000; - unsigned numThreads = 4; - unsigned taskLimit = 1000; - vespalib::Executor::OptimizeFor optimize = vespalib::Executor::OptimizeFor::LATENCY; - std::atomic<long> counter(0); - if (argc > 1) - numTasks = atol(argv[1]); - if (argc > 2) - numThreads = atoi(argv[2]); - if (argc > 3) - taskLimit = atoi(argv[3]); - if (argc > 4) - optimize = vespalib::Executor::OptimizeFor::THROUGHPUT; +size_t do_work(size_t size) { + size_t ret = 0; + for (size_t i = 0; i < size; ++i) { + for (size_t j = 0; j < 128; ++j) { + ret = (ret + i) * j; + } + } + return ret; +} - auto executor = SequencedTaskExecutor::create(numThreads, taskLimit, optimize); - for (unsigned long tid(0); tid < numTasks; tid++) { - executor->executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; })); +struct SimpleParams { + int argc; + char **argv; + int idx; + SimpleParams(int argc_in, char **argv_in) : argc(argc_in), argv(argv_in), idx(0) {} + int next(const char *name, int fallback) { + ++idx; + int value = 0; + if (argc > idx) { + value = atoi(argv[idx]); + } else { + value = fallback; + } + fprintf(stderr, "param %s: %d\n", name, value); + return value; + } +}; + +int main(int argc, char **argv) { + SimpleParams params(argc, argv); + bool use_adaptive_executor = params.next("use_adaptive_executor", 0); + bool optimize_for_throughput = params.next("optimize_for_throughput", 0); + size_t num_tasks = params.next("num_tasks", 1000000); + size_t num_strands = params.next("num_strands", 4); + size_t task_limit = params.next("task_limit", 1000); + size_t num_threads = params.next("num_threads", num_strands); + size_t max_waiting = params.next("max_waiting", optimize_for_throughput ? 32 : 0); + size_t work_size = params.next("work_size", 0); + std::atomic<long> counter(0); + std::unique_ptr<ISequencedTaskExecutor> executor; + if (use_adaptive_executor) { + executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit); + } else { + auto optimize = optimize_for_throughput + ? vespalib::Executor::OptimizeFor::THROUGHPUT + : vespalib::Executor::OptimizeFor::LATENCY; + executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize); + } + vespalib::Timer timer; + for (size_t task_id = 0; task_id < num_tasks; ++task_id) { + executor->executeTask(ExecutorId(task_id % num_strands), + vespalib::makeLambdaTask([&counter,work_size] { (void) do_work(work_size); counter++; })); } - return 0; + executor.reset(); + fprintf(stderr, "\ntotal time: %zu ms\n", vespalib::count_ms(timer.elapsed())); + return (size_t(counter) == num_tasks) ? 0 : 1; } diff --git a/searchlib/src/tests/features/prod_features.cpp b/searchlib/src/tests/features/prod_features.cpp index 64a999906d8..1c07c81bc2f 100644 --- a/searchlib/src/tests/features/prod_features.cpp +++ b/searchlib/src/tests/features/prod_features.cpp @@ -1238,10 +1238,11 @@ Test::testDotProduct() assertDotProduct(0, "(f:5,g:5)", 1, "wsextstr"); assertDotProduct(550, "(a:1,b:2,c:3,d:4,e:5)", 1, "wsextstr"); } - for (const char * name : {"wsbyte", "wsint"}) { - assertDotProduct(0, "()", 1, name); - assertDotProduct(0, "(6:5,7:5)", 1, name); - assertDotProduct(55, "(1:1,2:2,3:3,4:4,5:5)", 1, name); + for (const char * name : {"wsbyte", "wsint", "wsint_fast"}) { + TEST_DO(assertDotProduct(0, "()", 1, name)); + TEST_DO(assertDotProduct(0, "(6:5,7:5)", 1, name)); + TEST_DO(assertDotProduct(18, "(4:4.5)", 1, name)); + TEST_DO(assertDotProduct(57, "(1:1,2:2,3:3,4:4.5,5:5)", 1, name)); } for (const char * name : {"arrbyte", "arrint", "arrfloat", "arrint_fast", "arrfloat_fast"}) { assertDotProduct(0, "()", 1, name); @@ -1300,6 +1301,7 @@ Test::setupForDotProductTest(FtFeatureTest & ft) }; std::vector<Config> cfgList = { {"wsint", AVBT::INT32, AVCT::WSET, false}, {"wsbyte", AVBT::INT8, AVCT::WSET, false}, + {"wsint_fast", AVBT::INT8, AVCT::WSET, true}, {"arrbyte", AVBT::INT8, AVCT::ARRAY, false}, {"arrint", AVBT::INT32, AVCT::ARRAY, false}, {"arrfloat", AVBT::FLOAT, AVCT::ARRAY, false}, diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp index c562c0cf29c..7a0c240dea5 100644 --- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp +++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp @@ -517,7 +517,7 @@ struct FieldIndexTest : public ::testing::Test { }; using FieldIndexTestTypes = ::testing::Types<FieldIndex<false>, FieldIndex<true>>; -TYPED_TEST_CASE(FieldIndexTest, FieldIndexTestTypes); +VESPA_GTEST_TYPED_TEST_SUITE(FieldIndexTest, FieldIndexTestTypes); // Disable warnings emitted by gtest generated files when using typed tests #pragma GCC diagnostic push diff --git a/searchlib/src/tests/tensor/hnsw_saver/CMakeLists.txt b/searchlib/src/tests/tensor/hnsw_saver/CMakeLists.txt new file mode 100644 index 00000000000..90202e222a7 --- /dev/null +++ b/searchlib/src/tests/tensor/hnsw_saver/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchlib_hnsw_save_load_test_app TEST + SOURCES + hnsw_save_load_test.cpp + DEPENDS + searchlib + gtest +) +vespa_add_test(NAME searchlib_hnsw_save_load_test_app COMMAND searchlib_hnsw_save_load_test_app) diff --git a/searchlib/src/tests/tensor/hnsw_saver/hnsw_save_load_test.cpp b/searchlib/src/tests/tensor/hnsw_saver/hnsw_save_load_test.cpp new file mode 100644 index 00000000000..b9e27d413f3 --- /dev/null +++ b/searchlib/src/tests/tensor/hnsw_saver/hnsw_save_load_test.cpp @@ -0,0 +1,150 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchlib/tensor/hnsw_graph.h> +#include <vespa/searchlib/tensor/hnsw_index_saver.h> +#include <vespa/searchlib/tensor/hnsw_index_loader.h> +#include <vespa/vespalib/util/bufferwriter.h> +#include <vespa/searchlib/util/fileutil.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vector> + +#include <vespa/log/log.h> +LOG_SETUP("hnsw_save_load_test"); + +using namespace search::tensor; +using search::BufferWriter; +using search::fileutil::LoadedBuffer; + +class VectorBufferWriter : public BufferWriter { +private: + char tmp[1024]; +public: + std::vector<char> output; + VectorBufferWriter() { + setup(tmp, 1024); + } + ~VectorBufferWriter() {} + void flush() override { + for (size_t i = 0; i < usedLen(); ++i) { + output.push_back(tmp[i]); + } + rewind(); + } +}; + +using V = std::vector<uint32_t>; + +void populate(HnswGraph &graph) { + // no 0 + graph.make_node_for_document(1, 1); + graph.make_node_for_document(2, 2); + // no 3 + graph.make_node_for_document(4, 2); + graph.make_node_for_document(5, 0); + graph.make_node_for_document(6, 1); + + graph.set_link_array(1, 0, V{2, 4, 6}); + graph.set_link_array(2, 0, V{1, 4, 6}); + graph.set_link_array(4, 0, V{1, 2, 6}); + graph.set_link_array(6, 0, V{1, 2, 4}); + graph.set_link_array(2, 1, V{4}); + graph.set_link_array(4, 1, V{2}); + graph.set_entry_node(2, 1); +} + +void modify(HnswGraph &graph) { + graph.remove_node_for_document(2); + graph.remove_node_for_document(6); + graph.make_node_for_document(7, 2); + + graph.set_link_array(1, 0, V{7, 4}); + graph.set_link_array(4, 0, V{7, 2}); + graph.set_link_array(7, 0, V{4, 2}); + graph.set_link_array(4, 1, V{7}); + graph.set_link_array(7, 1, V{4}); + + graph.set_entry_node(4, 1); +} + + +class CopyGraphTest : public ::testing::Test { +public: + HnswGraph original; + HnswGraph copy; + + void expect_empty_d(uint32_t docid) const { + EXPECT_FALSE(copy.node_refs[docid].load_acquire().valid()); + } + + void expect_level_0(uint32_t docid, const V& exp_links) const { + auto levels = copy.get_level_array(docid); + EXPECT_GE(levels.size(), 1); + auto links = copy.get_link_array(docid, 0); + EXPECT_EQ(exp_links.size(), links.size()); + for (size_t i = 0; i < exp_links.size() && i < links.size(); ++i) { + EXPECT_EQ(exp_links[i], links[i]); + } + } + + void expect_level_1(uint32_t docid, const V& exp_links) const { + auto levels = copy.get_level_array(docid); + EXPECT_EQ(2, levels.size()); + auto links = copy.get_link_array(docid, 1); + EXPECT_EQ(exp_links.size(), links.size()); + for (size_t i = 0; i < exp_links.size() && i < links.size(); ++i) { + EXPECT_EQ(exp_links[i], links[i]); + } + } + + std::vector<char> save_original() const { + HnswIndexSaver saver(original); + VectorBufferWriter vector_writer; + saver.save(vector_writer); + return vector_writer.output; + } + void load_copy(std::vector<char> data) { + HnswIndexLoader loader(copy); + LoadedBuffer buffer(&data[0], data.size()); + loader.load(buffer); + } + + void expect_copy_as_populated() const { + EXPECT_EQ(copy.size(), 7); + EXPECT_EQ(copy.entry_docid, 2); + EXPECT_EQ(copy.entry_level, 1); + + expect_empty_d(0); + expect_empty_d(3); + expect_empty_d(5); + + expect_level_0(1, {2, 4, 6}); + expect_level_0(2, {1, 4, 6}); + expect_level_0(4, {1, 2, 6}); + expect_level_0(6, {1, 2, 4}); + + expect_level_1(2, {4}); + expect_level_1(4, {2}); + } +}; + +TEST_F(CopyGraphTest, reconstructs_graph) +{ + populate(original); + auto data = save_original(); + load_copy(data); + expect_copy_as_populated(); +} + +TEST_F(CopyGraphTest, later_changes_ignored) +{ + populate(original); + HnswIndexSaver saver(original); + modify(original); + VectorBufferWriter vector_writer; + saver.save(vector_writer); + auto data = vector_writer.output; + load_copy(data); + expect_copy_as_populated(); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchlib/src/vespa/searchlib/attribute/attribute_header.cpp b/searchlib/src/vespa/searchlib/attribute/attribute_header.cpp index 224d5758028..3d7010ba6c3 100644 --- a/searchlib/src/vespa/searchlib/attribute/attribute_header.cpp +++ b/searchlib/src/vespa/searchlib/attribute/attribute_header.cpp @@ -22,7 +22,12 @@ const vespalib::string predicateUpperBoundTag = "predicate.upper_bound"; } AttributeHeader::AttributeHeader() - : _fileName(""), + : AttributeHeader("") +{ +} + +AttributeHeader::AttributeHeader(const vespalib::string &fileName) + : _fileName(fileName), _basicType(attribute::BasicType::Type::NONE), _collectionType(attribute::CollectionType::Type::SINGLE), _tensorType(vespalib::eval::ValueType::error_type()), diff --git a/searchlib/src/vespa/searchlib/attribute/attribute_header.h b/searchlib/src/vespa/searchlib/attribute/attribute_header.h index 303c469e755..24eac8336b4 100644 --- a/searchlib/src/vespa/searchlib/attribute/attribute_header.h +++ b/searchlib/src/vespa/searchlib/attribute/attribute_header.h @@ -35,6 +35,7 @@ private: void internalExtractTags(const vespalib::GenericHeader &header); public: AttributeHeader(); + AttributeHeader(const vespalib::string &fileName); AttributeHeader(const vespalib::string &fileName, BasicType basicType, CollectionType collectionType, diff --git a/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.cpp b/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.cpp index f57094ae592..f284fecbf98 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.cpp +++ b/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.cpp @@ -3,14 +3,16 @@ #include "attributefilesavetarget.h" #include "attributevector.h" #include <vespa/searchlib/common/fileheadercontext.h> -#include <vespa/vespalib/data/fileheader.h> #include <vespa/vespalib/data/databuffer.h> +#include <vespa/vespalib/data/fileheader.h> #include <vespa/vespalib/util/error.h> +#include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> LOG_SETUP(".searchlib.attribute.attributefilesavetarget"); using vespalib::getLastErrorString; +using vespalib::IllegalArgumentException; namespace search { @@ -18,13 +20,16 @@ using common::FileHeaderContext; AttributeFileSaveTarget:: -AttributeFileSaveTarget(const TuneFileAttributes &tuneFileAttributes, - const FileHeaderContext &fileHeaderContext) +AttributeFileSaveTarget(const TuneFileAttributes& tune_file, + const FileHeaderContext& file_header_ctx) : IAttributeSaveTarget(), - _datWriter(tuneFileAttributes, fileHeaderContext, _header, "Attribute vector data file"), - _idxWriter(tuneFileAttributes, fileHeaderContext, _header, "Attribute vector idx file"), - _weightWriter(tuneFileAttributes, fileHeaderContext, _header, "Attribute vector weight file"), - _udatWriter(tuneFileAttributes, fileHeaderContext, _header, "Attribute vector unique data file") + _tune_file(tune_file), + _file_header_ctx(file_header_ctx), + _datWriter(tune_file, file_header_ctx, _header, "Attribute vector data file"), + _idxWriter(tune_file, file_header_ctx, _header, "Attribute vector idx file"), + _weightWriter(tune_file, file_header_ctx, _header, "Attribute vector weight file"), + _udatWriter(tune_file, file_header_ctx, _header, "Attribute vector unique data file"), + _writers() { } @@ -66,23 +71,23 @@ AttributeFileSaveTarget::close() _udatWriter.close(); _idxWriter.close(); _weightWriter.close(); + for (auto& writer : _writers) { + writer.second->close(); + } } - IAttributeFileWriter & AttributeFileSaveTarget::datWriter() { return _datWriter; } - IAttributeFileWriter & AttributeFileSaveTarget::idxWriter() { return _idxWriter; } - IAttributeFileWriter & AttributeFileSaveTarget::weightWriter() { @@ -95,6 +100,33 @@ AttributeFileSaveTarget::udatWriter() return _udatWriter; } +bool +AttributeFileSaveTarget::setup_writer(const vespalib::string& file_suffix, + const vespalib::string& desc) +{ + vespalib::string file_name(_header.getFileName() + "." + file_suffix); + auto writer = std::make_unique<AttributeFileWriter>(_tune_file, _file_header_ctx, + _header, desc); + if (!writer->open(file_name)) { + return false; + } + auto itr = _writers.find(file_suffix); + if (itr != _writers.end()) { + return false; + } + _writers.insert(std::make_pair(file_suffix, std::move(writer))); + return true; +} + +IAttributeFileWriter& +AttributeFileSaveTarget::get_writer(const vespalib::string& file_suffix) +{ + auto itr = _writers.find(file_suffix); + if (itr == _writers.end()) { + throw IllegalArgumentException("File writer with suffix '" + file_suffix + "' does not exist"); + } + return *itr->second; +} } // namespace search diff --git a/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.h b/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.h index acb3daf82e0..9a9d38615ea 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.h +++ b/searchlib/src/vespa/searchlib/attribute/attributefilesavetarget.h @@ -4,24 +4,30 @@ #include "iattributesavetarget.h" #include "attributefilewriter.h" +#include <vespa/vespalib/stllike/hash_fun.h> +#include <unordered_map> -namespace search -{ +namespace search { /** * Class used to save an attribute vector to file(s). **/ -class AttributeFileSaveTarget : public IAttributeSaveTarget -{ +class AttributeFileSaveTarget : public IAttributeSaveTarget { private: + using FileWriterUP = std::unique_ptr<AttributeFileWriter>; + using WriterMap = std::unordered_map<vespalib::string, FileWriterUP, vespalib::hash<vespalib::string>>; + + const TuneFileAttributes& _tune_file; + const search::common::FileHeaderContext& _file_header_ctx; AttributeFileWriter _datWriter; AttributeFileWriter _idxWriter; AttributeFileWriter _weightWriter; AttributeFileWriter _udatWriter; + WriterMap _writers; public: - AttributeFileSaveTarget(const TuneFileAttributes &tuneFileAttributes, - const search::common::FileHeaderContext &fileHeaderContext); + AttributeFileSaveTarget(const TuneFileAttributes& tune_file, + const search::common::FileHeaderContext& file_header_ctx); ~AttributeFileSaveTarget() override; // Implements IAttributeSaveTarget @@ -35,6 +41,11 @@ public: IAttributeFileWriter &idxWriter() override; IAttributeFileWriter &weightWriter() override; IAttributeFileWriter &udatWriter() override; + + bool setup_writer(const vespalib::string& file_suffix, + const vespalib::string& desc) override; + IAttributeFileWriter& get_writer(const vespalib::string& file_suffix) override; + }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.cpp b/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.cpp index 372168143ab..b28887691e5 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.cpp +++ b/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.cpp @@ -1,24 +1,25 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "attributememorysavetarget.h" #include "attributefilesavetarget.h" +#include "attributememorysavetarget.h" #include "attributevector.h" +#include <vespa/vespalib/util/exceptions.h> namespace search { using search::common::FileHeaderContext; +using vespalib::IllegalArgumentException; AttributeMemorySaveTarget::AttributeMemorySaveTarget() : _datWriter(), _idxWriter(), _weightWriter(), - _udatWriter() + _udatWriter(), + _writers() { } -AttributeMemorySaveTarget::~AttributeMemorySaveTarget() { -} - +AttributeMemorySaveTarget::~AttributeMemorySaveTarget() = default; IAttributeFileWriter & AttributeMemorySaveTarget::datWriter() @@ -26,28 +27,24 @@ AttributeMemorySaveTarget::datWriter() return _datWriter; } - IAttributeFileWriter & AttributeMemorySaveTarget::idxWriter() { return _idxWriter; } - IAttributeFileWriter & AttributeMemorySaveTarget::weightWriter() { return _weightWriter; } - IAttributeFileWriter & AttributeMemorySaveTarget::udatWriter() { return _udatWriter; } - bool AttributeMemorySaveTarget:: writeToFile(const TuneFileAttributes &tuneFileAttributes, @@ -68,9 +65,39 @@ writeToFile(const TuneFileAttributes &tuneFileAttributes, _weightWriter.writeTo(saveTarget.weightWriter()); } } + for (const auto& entry : _writers) { + if (!saveTarget.setup_writer(entry.first, entry.second.desc)) { + return false; + } + auto& file_writer = saveTarget.get_writer(entry.first); + entry.second.writer->writeTo(file_writer); + } saveTarget.close(); return true; } +bool +AttributeMemorySaveTarget::setup_writer(const vespalib::string& file_suffix, + const vespalib::string& desc) +{ + auto writer = std::make_unique<AttributeMemoryFileWriter>(); + auto itr = _writers.find(file_suffix); + if (itr != _writers.end()) { + return false; + } + _writers.insert(std::make_pair(file_suffix, WriterEntry(std::move(writer), desc))); + return true; +} + +IAttributeFileWriter& +AttributeMemorySaveTarget::get_writer(const vespalib::string& file_suffix) +{ + auto itr = _writers.find(file_suffix); + if (itr == _writers.end()) { + throw IllegalArgumentException("File writer with suffix '" + file_suffix + "' does not exist"); + } + return *itr->second.writer; +} + } // namespace search diff --git a/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.h b/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.h index f06764fa34b..9533b881099 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.h +++ b/searchlib/src/vespa/searchlib/attribute/attributememorysavetarget.h @@ -2,11 +2,13 @@ #pragma once -#include "iattributesavetarget.h" #include "attributememoryfilewriter.h" +#include "iattributesavetarget.h" +#include <vespa/searchlib/common/tunefileinfo.h> #include <vespa/searchlib/util/rawbuf.h> +#include <vespa/vespalib/stllike/hash_fun.h> #include <memory> -#include <vespa/searchlib/common/tunefileinfo.h> +#include <unordered_map> namespace search::common { class FileHeaderContext; } @@ -16,13 +18,22 @@ class AttributeVector; /** * Class used to save an attribute vector to memory buffer(s). **/ -class AttributeMemorySaveTarget : public IAttributeSaveTarget -{ +class AttributeMemorySaveTarget : public IAttributeSaveTarget { private: + using FileWriterUP = std::unique_ptr<AttributeMemoryFileWriter>; + struct WriterEntry { + FileWriterUP writer; + vespalib::string desc; + WriterEntry(FileWriterUP writer_in, const vespalib::string& desc_in) + : writer(std::move(writer_in)), desc(desc_in) {} + }; + using WriterMap = std::unordered_map<vespalib::string, WriterEntry, vespalib::hash<vespalib::string>>; + AttributeMemoryFileWriter _datWriter; AttributeMemoryFileWriter _idxWriter; AttributeMemoryFileWriter _weightWriter; AttributeMemoryFileWriter _udatWriter; + WriterMap _writers; public: AttributeMemorySaveTarget(); @@ -40,6 +51,11 @@ public: IAttributeFileWriter &idxWriter() override; IAttributeFileWriter &weightWriter() override; IAttributeFileWriter &udatWriter() override; + + bool setup_writer(const vespalib::string& file_suffix, + const vespalib::string& desc) override; + IAttributeFileWriter& get_writer(const vespalib::string& file_suffix) override; + }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/attribute/attributevector.cpp b/searchlib/src/vespa/searchlib/attribute/attributevector.cpp index b043bb4aaf8..ffc62d806e2 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributevector.cpp +++ b/searchlib/src/vespa/searchlib/attribute/attributevector.cpp @@ -266,79 +266,6 @@ const IEnumStore* AttributeVector::getEnumStoreBase() const { return nullptr; } IEnumStore* AttributeVector::getEnumStoreBase() { return nullptr; } const attribute::MultiValueMappingBase * AttributeVector::getMultiValueBase() const { return nullptr; } -std::unique_ptr<FastOS_FileInterface> -AttributeVector::openFile(const char *suffix) -{ - BaseName::string fileName(getBaseFileName()); - fileName += suffix; - return FileUtil::openFile(fileName); -} - - -std::unique_ptr<FastOS_FileInterface> -AttributeVector::openDAT() -{ - return openFile(".dat"); -} - - -std::unique_ptr<FastOS_FileInterface> -AttributeVector::openIDX() -{ - return openFile(".idx"); -} - - -std::unique_ptr<FastOS_FileInterface> -AttributeVector::openWeight() -{ - return openFile(".weight"); -} - - -std::unique_ptr<FastOS_FileInterface> -AttributeVector::openUDAT() -{ - return openFile(".dat"); -} - -fileutil::LoadedBuffer::UP -AttributeVector::loadDAT() -{ - return loadFile(".dat"); -} - - -fileutil::LoadedBuffer::UP -AttributeVector::loadIDX() -{ - return loadFile(".idx"); -} - - -fileutil::LoadedBuffer::UP -AttributeVector::loadWeight() -{ - return loadFile(".weight"); -} - - -fileutil::LoadedBuffer::UP -AttributeVector::loadUDAT() -{ - return loadFile(".udat"); -} - - -fileutil::LoadedBuffer::UP -AttributeVector::loadFile(const char *suffix) -{ - BaseName::string fileName(getBaseFileName()); - fileName += suffix; - return FileUtil::loadFile(fileName); -} - - bool AttributeVector::save(vespalib::stringref fileName) { diff --git a/searchlib/src/vespa/searchlib/attribute/attributevector.h b/searchlib/src/vespa/searchlib/attribute/attributevector.h index a396fb70b7c..4a53f2dd5a2 100644 --- a/searchlib/src/vespa/searchlib/attribute/attributevector.h +++ b/searchlib/src/vespa/searchlib/attribute/attributevector.h @@ -212,11 +212,6 @@ protected: void setNumDocs(uint32_t n) { _status.setNumDocs(n); } void incNumDocs() { _status.incNumDocs(); } - LoadedBufferUP loadDAT(); - LoadedBufferUP loadIDX(); - LoadedBufferUP loadWeight(); - LoadedBufferUP loadUDAT(); - class ValueModifier { public: @@ -269,10 +264,6 @@ protected: } public: - std::unique_ptr<FastOS_FileInterface> openDAT(); - std::unique_ptr<FastOS_FileInterface> openIDX(); - std::unique_ptr<FastOS_FileInterface> openWeight(); - std::unique_ptr<FastOS_FileInterface> openUDAT(); void incGeneration(); void removeAllOldGenerations(); @@ -572,8 +563,6 @@ private: virtual bool applyWeight(DocId doc, const FieldValue &fv, const ArithmeticValueUpdate &wAdjust); virtual void onSave(IAttributeSaveTarget & saveTarget); virtual bool onLoad(); - std::unique_ptr<FastOS_FileInterface> openFile(const char *suffix); - LoadedBufferUP loadFile(const char *suffix); BaseName _baseFileName; diff --git a/searchlib/src/vespa/searchlib/attribute/attrvector.cpp b/searchlib/src/vespa/searchlib/attribute/attrvector.cpp index 0304aa8f38e..59771d7ffae 100644 --- a/searchlib/src/vespa/searchlib/attribute/attrvector.cpp +++ b/searchlib/src/vespa/searchlib/attribute/attrvector.cpp @@ -3,6 +3,7 @@ #include "attrvector.h" #include "attrvector.hpp" #include "iattributesavetarget.h" +#include "load_utils.h" #include <vespa/log/log.h> LOG_SETUP(".searchlib.attribute.attr_vector"); @@ -123,7 +124,7 @@ bool StringDirectAttribute::onLoad() setCommittedDocIdLimit(0); } - fileutil::LoadedBuffer::UP tmpBuffer(loadDAT()); + auto tmpBuffer = attribute::LoadUtils::loadDAT(*this); bool rc(tmpBuffer.get()); if (rc) { if ( ! tmpBuffer->empty()) { @@ -158,7 +159,7 @@ bool StringDirectAttribute::onLoad() } if (hasMultiValue()) { - fileutil::LoadedBuffer::UP tmpIdx(loadIDX()); + auto tmpIdx = attribute::LoadUtils::loadIDX(*this); size_t tmpIdxLen(tmpIdx->size(sizeof(uint32_t))); _idx.clear(); _idx.reserve(tmpIdxLen); diff --git a/searchlib/src/vespa/searchlib/attribute/attrvector.hpp b/searchlib/src/vespa/searchlib/attribute/attrvector.hpp index cdd34725e69..4ce7575b28d 100644 --- a/searchlib/src/vespa/searchlib/attribute/attrvector.hpp +++ b/searchlib/src/vespa/searchlib/attribute/attrvector.hpp @@ -2,6 +2,7 @@ #pragma once #include "attrvector.h" +#include "load_utils.h" #include <vespa/vespalib/util/hdr_abort.h> #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/searchlib/util/filekit.h> @@ -23,7 +24,7 @@ NumericDirectAttribute<B>::~NumericDirectAttribute() = default; template <typename B> bool NumericDirectAttribute<B>::onLoad() { - fileutil::LoadedBuffer::UP dataBuffer(B::loadDAT()); + auto dataBuffer = attribute::LoadUtils::loadDAT(*this); bool rc(dataBuffer.get()); if (rc) { const BaseType * tmpData(static_cast <const BaseType *>(dataBuffer->buffer())); @@ -56,7 +57,7 @@ bool NumericDirectAttribute<B>::onLoad() } dataBuffer.reset(); if (this->hasMultiValue()) { - fileutil::LoadedBuffer::UP idxBuffer(B::loadIDX()); + auto idxBuffer = attribute::LoadUtils::loadIDX(*this); rc = idxBuffer.get(); if (rc) { const uint32_t * tmpIdx(static_cast<const uint32_t *>(idxBuffer->buffer())); diff --git a/searchlib/src/vespa/searchlib/attribute/flagattribute.cpp b/searchlib/src/vespa/searchlib/attribute/flagattribute.cpp index 1e4bba95b4b..895e6a6f4c0 100644 --- a/searchlib/src/vespa/searchlib/attribute/flagattribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/flagattribute.cpp @@ -89,7 +89,7 @@ FlagAttributeT<B>::onLoadEnumerated(ReaderBase &attrReader) if (numValues > 0) _bitVectorSize = numDocs; - fileutil::LoadedBuffer::UP udatBuffer(this->loadUDAT()); + auto udatBuffer = attribute::LoadUtils::loadUDAT(*this); assert((udatBuffer->size() % sizeof(TT)) == 0); vespalib::ConstArrayRef<TT> map(reinterpret_cast<const TT *>(udatBuffer->buffer()), udatBuffer->size() / sizeof(TT)); diff --git a/searchlib/src/vespa/searchlib/attribute/iattributesavetarget.h b/searchlib/src/vespa/searchlib/attribute/iattributesavetarget.h index 9f90544bb83..8946fc2fcdb 100644 --- a/searchlib/src/vespa/searchlib/attribute/iattributesavetarget.h +++ b/searchlib/src/vespa/searchlib/attribute/iattributesavetarget.h @@ -37,6 +37,19 @@ public: virtual IAttributeFileWriter &weightWriter() = 0; virtual IAttributeFileWriter &udatWriter() = 0; + /** + * Setups a custom file writer with the given file suffix and description in the file header. + * Returns false if the file writer cannot be setup or if it already exists, true otherwise. + */ + virtual bool setup_writer(const vespalib::string& file_suffix, + const vespalib::string& desc) = 0; + + /** + * Returns the file writer with the given file suffix. + * Throws vespalib::IllegalArgumentException if the file writer does not exists. + */ + virtual IAttributeFileWriter& get_writer(const vespalib::string& file_suffix) = 0; + virtual ~IAttributeSaveTarget(); }; diff --git a/searchlib/src/vespa/searchlib/attribute/load_utils.cpp b/searchlib/src/vespa/searchlib/attribute/load_utils.cpp index 041daa08cd5..701c8eaf702 100644 --- a/searchlib/src/vespa/searchlib/attribute/load_utils.cpp +++ b/searchlib/src/vespa/searchlib/attribute/load_utils.cpp @@ -5,13 +5,81 @@ #include "loadedenumvalue.h" #include "multi_value_mapping.h" #include "multivalue.h" +#include <vespa/fastos/file.h> +#include <vespa/searchlib/util/fileutil.h> +#include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/array.hpp> using search::multivalue::Value; using search::multivalue::WeightedValue; -namespace search { -namespace attribute { +namespace search::attribute { + +using FileInterfaceUP = LoadUtils::FileInterfaceUP; +using LoadedBufferUP = LoadUtils::LoadedBufferUP; + +FileInterfaceUP +LoadUtils::openFile(const AttributeVector& attr, const vespalib::string& suffix) +{ + return FileUtil::openFile(attr.getBaseFileName() + "." + suffix); +} + + + +FileInterfaceUP +LoadUtils::openDAT(const AttributeVector& attr) +{ + return openFile(attr, "dat"); +} + +FileInterfaceUP +LoadUtils::openIDX(const AttributeVector& attr) +{ + return openFile(attr, "idx"); +} + +FileInterfaceUP +LoadUtils::openWeight(const AttributeVector& attr) +{ + return openFile(attr, "weight"); +} + +bool +LoadUtils::file_exists(const AttributeVector& attr, const vespalib::string& suffix) +{ + return vespalib::fileExists(attr.getBaseFileName() + "." + suffix); +} + +LoadedBufferUP +LoadUtils::loadFile(const AttributeVector& attr, const vespalib::string& suffix) +{ + return FileUtil::loadFile(attr.getBaseFileName() + "." + suffix); +} + +LoadedBufferUP +LoadUtils::loadDAT(const AttributeVector& attr) +{ + return loadFile(attr, "dat"); +} + +LoadedBufferUP +LoadUtils::loadIDX(const AttributeVector& attr) +{ + return loadFile(attr, "idx"); +} + +LoadedBufferUP +LoadUtils::loadWeight(const AttributeVector& attr) +{ + return loadFile(attr, "weight"); +} + +LoadedBufferUP +LoadUtils::loadUDAT(const AttributeVector& attr) +{ + return loadFile(attr, "udat"); +} + #define INSTANTIATE_ARRAY(ValueType, Saver) \ template uint32_t loadFromEnumeratedMultiValue(MultiValueMapping<Value<ValueType>> &, ReaderBase &, vespalib::ConstArrayRef<ValueType>, Saver) @@ -40,5 +108,4 @@ INSTANTIATE_VALUE(int64_t); INSTANTIATE_VALUE(float); INSTANTIATE_VALUE(double); -} // namespace search::attribute -} // namespace search +} diff --git a/searchlib/src/vespa/searchlib/attribute/load_utils.h b/searchlib/src/vespa/searchlib/attribute/load_utils.h index 050d7726ecd..cd9d98084d5 100644 --- a/searchlib/src/vespa/searchlib/attribute/load_utils.h +++ b/searchlib/src/vespa/searchlib/attribute/load_utils.h @@ -6,10 +6,34 @@ #include "readerbase.h" #include <vespa/vespalib/util/arrayref.h> -namespace search { -namespace attribute { +namespace search::attribute { -/* +/** + * Helper functions used to open / load attribute vector data files from disk. + */ +class LoadUtils { +public: + using FileInterfaceUP = std::unique_ptr<FastOS_FileInterface>; + using LoadedBufferUP = std::unique_ptr<fileutil::LoadedBuffer>; + +private: + static FileInterfaceUP openFile(const AttributeVector& attr, const vespalib::string& suffix); + +public: + static FileInterfaceUP openDAT(const AttributeVector& attr); + static FileInterfaceUP openIDX(const AttributeVector& attr); + static FileInterfaceUP openWeight(const AttributeVector& attr); + + static bool file_exists(const AttributeVector& attr, const vespalib::string& suffix); + static LoadedBufferUP loadFile(const AttributeVector& attr, const vespalib::string& suffix); + + static LoadedBufferUP loadDAT(const AttributeVector& attr); + static LoadedBufferUP loadIDX(const AttributeVector& attr); + static LoadedBufferUP loadWeight(const AttributeVector& attr); + static LoadedBufferUP loadUDAT(const AttributeVector& attr); +}; + +/** * Function for loading mapping from document id to array of enum indexes * or values from enumerated attribute reader. */ @@ -20,7 +44,7 @@ loadFromEnumeratedMultiValue(MvMapping &mapping, vespalib::ConstArrayRef<typename MvMapping::MultiValueType::ValueType> enumValueToValueMap, Saver saver) __attribute((noinline)); -/* +/** * Function for loading mapping from document id to enum index or * value from enumerated attribute reader. */ @@ -32,5 +56,4 @@ loadFromEnumeratedSingleValue(Vector &vector, vespalib::ConstArrayRef<typename Vector::ValueType> enumValueToValueMap, Saver saver) __attribute((noinline)); -} // namespace search::attribute -} // namespace search +} diff --git a/searchlib/src/vespa/searchlib/attribute/multinumericattribute.hpp b/searchlib/src/vespa/searchlib/attribute/multinumericattribute.hpp index 1efa2789fcb..3ca7423c38c 100644 --- a/searchlib/src/vespa/searchlib/attribute/multinumericattribute.hpp +++ b/searchlib/src/vespa/searchlib/attribute/multinumericattribute.hpp @@ -117,7 +117,7 @@ MultiValueNumericAttribute<B, M>::onLoadEnumerated(ReaderBase & attrReader) this->setCommittedDocIdLimit(numDocs); this->_mvMapping.reserve(numDocs+1); - LoadedBuffer::UP udatBuffer(this->loadUDAT()); + auto udatBuffer = attribute::LoadUtils::loadUDAT(*this); assert((udatBuffer->size() % sizeof(T)) == 0); vespalib::ConstArrayRef<T> map(reinterpret_cast<const T *>(udatBuffer->buffer()), udatBuffer->size() / sizeof(T)); uint32_t maxvc = attribute::loadFromEnumeratedMultiValue(this->_mvMapping, attrReader, map, attribute::NoSaveLoadedEnum()); diff --git a/searchlib/src/vespa/searchlib/attribute/multinumericenumattribute.hpp b/searchlib/src/vespa/searchlib/attribute/multinumericenumattribute.hpp index 9ee365fc7cc..e17d41a5521 100644 --- a/searchlib/src/vespa/searchlib/attribute/multinumericenumattribute.hpp +++ b/searchlib/src/vespa/searchlib/attribute/multinumericenumattribute.hpp @@ -2,13 +2,14 @@ #pragma once -#include "multinumericenumattribute.h" -#include "loadednumericvalue.h" #include "attributeiterators.hpp" -#include <vespa/searchlib/util/fileutil.hpp> +#include "load_utils.h" +#include "loadednumericvalue.h" +#include "multinumericenumattribute.h" #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/searchlib/query/query_term_simple.h> #include <vespa/searchlib/queryeval/emptysearch.h> +#include <vespa/searchlib/util/fileutil.hpp> namespace search { @@ -52,7 +53,7 @@ template <typename B, typename M> bool MultiValueNumericEnumAttribute<B, M>::onLoadEnumerated(ReaderBase &attrReader) { - LoadedBuffer::UP udatBuffer(this->loadUDAT()); + auto udatBuffer = attribute::LoadUtils::loadUDAT(*this); uint32_t numDocs = attrReader.getNumIdx() - 1; uint64_t numValues = attrReader.getNumValues(); diff --git a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp index b0e4df65c2b..72cc6e38fac 100644 --- a/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/predicate_attribute.cpp @@ -1,12 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "predicate_attribute.h" -#include "iattributesavetarget.h" #include "attribute_header.h" -#include <vespa/searchlib/predicate/predicate_index.h> -#include <vespa/searchlib/util/fileutil.h> +#include "iattributesavetarget.h" +#include "load_utils.h" +#include "predicate_attribute.h" #include <vespa/document/fieldvalue/predicatefieldvalue.h> #include <vespa/document/predicate/predicate.h> +#include <vespa/searchlib/predicate/predicate_index.h> +#include <vespa/searchlib/util/fileutil.h> #include <vespa/vespalib/data/slime/slime.h> #include <vespa/log/log.h> @@ -183,7 +184,7 @@ struct DummyObserver : SimpleIndexDeserializeObserver<> { bool PredicateAttribute::onLoad() { - fileutil::LoadedBuffer::UP loaded_buffer = loadDAT(); + auto loaded_buffer = attribute::LoadUtils::loadDAT(*this); char *rawBuffer = const_cast<char *>(static_cast<const char *>(loaded_buffer->buffer())); size_t size = loaded_buffer->size(); DataBuffer buffer(rawBuffer, size); diff --git a/searchlib/src/vespa/searchlib/attribute/readerbase.cpp b/searchlib/src/vespa/searchlib/attribute/readerbase.cpp index 62936ecaaf4..a396fe9efd8 100644 --- a/searchlib/src/vespa/searchlib/attribute/readerbase.cpp +++ b/searchlib/src/vespa/searchlib/attribute/readerbase.cpp @@ -1,10 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "readerbase.h" #include "attributevector.h" +#include "load_utils.h" +#include "readerbase.h" #include <vespa/fastlib/io/bufferedfile.h> -#include <vespa/vespalib/util/exceptions.h> #include <vespa/searchlib/util/filesizecalculator.h> +#include <vespa/vespalib/util/exceptions.h> #include <vespa/log/log.h> LOG_SETUP(".search.attribute.readerbase"); @@ -12,28 +13,27 @@ LOG_SETUP(".search.attribute.readerbase"); namespace search { namespace { - const vespalib::string versionTag = "version"; - const vespalib::string docIdLimitTag = "docIdLimit"; - const vespalib::string createSerialNumTag = "createSerialNum"; - constexpr size_t DIRECTIO_ALIGNMENT(4096); +const vespalib::string versionTag = "version"; +const vespalib::string docIdLimitTag = "docIdLimit"; +const vespalib::string createSerialNumTag = "createSerialNum"; - uint64_t - extractCreateSerialNum(const vespalib::GenericHeader &header) - { - return (header.hasTag(createSerialNumTag)) ? header.getTag(createSerialNumTag).asInteger() : 0u; - } +constexpr size_t DIRECTIO_ALIGNMENT(4096); +uint64_t +extractCreateSerialNum(const vespalib::GenericHeader &header) +{ + return (header.hasTag(createSerialNumTag)) ? header.getTag(createSerialNumTag).asInteger() : 0u; +} } ReaderBase::ReaderBase(AttributeVector &attr) - : _datFile(attr.openDAT()), + : _datFile(attribute::LoadUtils::openDAT(attr)), _weightFile(attr.hasWeightedSetType() ? - attr.openWeight() : std::unique_ptr<Fast_BufferedFile>()), + attribute::LoadUtils::openWeight(attr) : std::unique_ptr<Fast_BufferedFile>()), _idxFile(attr.hasMultiValue() ? - attr.openIDX() : std::unique_ptr<Fast_BufferedFile>()), - _udatFile(), + attribute::LoadUtils::openIDX(attr) : std::unique_ptr<Fast_BufferedFile>()), _weightReader(*_weightFile), _idxReader(*_idxFile), _enumReader(*_datFile), @@ -41,7 +41,6 @@ ReaderBase::ReaderBase(AttributeVector &attr) _datHeaderLen(0u), _idxHeaderLen(0u), _weightHeaderLen(0u), - _udatHeaderLen(0u), _createSerialNum(0u), _fixedWidth(attr.getFixedWidth()), _enumerated(false), @@ -83,20 +82,12 @@ ReaderBase::ReaderBase(AttributeVector &attr) } if (hasData() && AttributeVector::isEnumerated(_datHeader)) { _enumerated = true; - _udatFile = attr.openUDAT(); - vespalib::FileHeader udatHeader(DIRECTIO_ALIGNMENT); - _udatHeaderLen = udatHeader.readFile(*_udatFile); - _udatFile->SetPosition(_udatHeaderLen); - if (!attr.headerTypeOK(udatHeader)) - _udatFile->Close(); } _hasLoadData = hasData() && (!attr.hasMultiValue() || hasIdx()) && - (!attr.hasWeightedSetType() || hasWeight()) && - (!getEnumerated() || hasUData()); + (!attr.hasWeightedSetType() || hasWeight()); } - ReaderBase::~ReaderBase() = default; bool @@ -115,11 +106,6 @@ ReaderBase::hasData() const { } bool -ReaderBase::hasUData() const { - return _udatFile.get() && _udatFile->IsOpened(); -} - -bool ReaderBase:: extractFileSize(const vespalib::GenericHeader &header, FastOS_FileInterface &file, uint64_t &fileSize) @@ -129,7 +115,6 @@ extractFileSize(const vespalib::GenericHeader &header, file.GetFileName(), fileSize); } - void ReaderBase::rewind() { @@ -142,12 +127,8 @@ ReaderBase::rewind() if (hasWeight()) { _weightFile->SetPosition(_weightHeaderLen); } - if (getEnumerated()) { - _udatFile->SetPosition(_udatHeaderLen); - } } - size_t ReaderBase::getNumValues() { @@ -169,7 +150,6 @@ ReaderBase::getNumValues() } } - uint32_t ReaderBase::getNextValueCount() { diff --git a/searchlib/src/vespa/searchlib/attribute/readerbase.h b/searchlib/src/vespa/searchlib/attribute/readerbase.h index 09db52f5e25..a7685e4532a 100644 --- a/searchlib/src/vespa/searchlib/attribute/readerbase.h +++ b/searchlib/src/vespa/searchlib/attribute/readerbase.h @@ -19,7 +19,6 @@ public: bool hasWeight() const; bool hasIdx() const; bool hasData() const; - bool hasUData() const; uint32_t getNumIdx() const { return (_idxFileSize - _idxHeaderLen) /sizeof(uint32_t); @@ -51,7 +50,6 @@ protected: private: std::unique_ptr<FastOS_FileInterface> _weightFile; std::unique_ptr<FastOS_FileInterface> _idxFile; - std::unique_ptr<FastOS_FileInterface> _udatFile; FileReader<int32_t> _weightReader; FileReader<uint32_t> _idxReader; FileReader<uint32_t> _enumReader; @@ -59,7 +57,6 @@ private: uint32_t _datHeaderLen; uint32_t _idxHeaderLen; uint32_t _weightHeaderLen; - uint32_t _udatHeaderLen; uint64_t _createSerialNum; size_t _fixedWidth; bool _enumerated; diff --git a/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp b/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp index b055af7c084..9421730f335 100644 --- a/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp +++ b/searchlib/src/vespa/searchlib/attribute/reference_attribute.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "attributesaver.h" +#include "load_utils.h" #include "readerbase.h" #include "reference_attribute.h" #include "reference_attribute_saver.h" @@ -223,7 +224,7 @@ ReferenceAttribute::onLoad() uint64_t numValues(0); numValues = attrReader.getEnumCount(); numDocs = numValues; - fileutil::LoadedBuffer::UP udatBuffer(loadUDAT()); + auto udatBuffer = attribute::LoadUtils::loadUDAT(*this); const GenericHeader &header = udatBuffer->getHeader(); uint32_t uniqueValueCount = extractUniqueValueCount(header); assert(uniqueValueCount * sizeof(GlobalId) == udatBuffer->size()); diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp index 69d4e6a5ee9..681c2af1f07 100644 --- a/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp +++ b/searchlib/src/vespa/searchlib/attribute/singlenumericattribute.hpp @@ -1,12 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "singlenumericattribute.h" +#include "attributeiterators.hpp" #include "attributevector.hpp" -#include "singlenumericattributesaver.h" #include "load_utils.h" #include "primitivereader.h" -#include "attributeiterators.hpp" +#include "singlenumericattribute.h" +#include "singlenumericattributesaver.h" #include <vespa/searchlib/query/query_term_simple.h> #include <vespa/searchlib/queryeval/emptysearch.h> @@ -114,7 +114,7 @@ SingleValueNumericAttribute<B>::onLoadEnumerated(ReaderBase &attrReader) this->setCommittedDocIdLimit(numDocs); _data.unsafe_reserve(numDocs); - fileutil::LoadedBuffer::UP udatBuffer(this->loadUDAT()); + auto udatBuffer = attribute::LoadUtils::loadUDAT(*this); assert((udatBuffer->size() % sizeof(T)) == 0); vespalib::ConstArrayRef<T> map(reinterpret_cast<const T *>(udatBuffer->buffer()), udatBuffer->size() / sizeof(T)); diff --git a/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp b/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp index 990388d2a12..5fb587c908e 100644 --- a/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp +++ b/searchlib/src/vespa/searchlib/attribute/singlenumericenumattribute.hpp @@ -2,14 +2,15 @@ #pragma once -#include "singlenumericenumattribute.h" -#include <vespa/searchlib/common/sort.h> -#include "singleenumattribute.hpp" +#include "attributeiterators.hpp" +#include "load_utils.h" #include "loadednumericvalue.h" #include "primitivereader.h" -#include "attributeiterators.hpp" -#include <vespa/searchlib/queryeval/emptysearch.h> +#include "singleenumattribute.hpp" +#include "singlenumericenumattribute.h" +#include <vespa/searchlib/common/sort.h> #include <vespa/searchlib/query/query_term_simple.h> +#include <vespa/searchlib/queryeval/emptysearch.h> #include <vespa/searchlib/util/fileutil.hpp> namespace search { @@ -79,7 +80,7 @@ template <typename B> bool SingleValueNumericEnumAttribute<B>::onLoadEnumerated(ReaderBase &attrReader) { - fileutil::LoadedBuffer::UP udatBuffer(this->loadUDAT()); + auto udatBuffer = attribute::LoadUtils::loadUDAT(*this); uint64_t numValues = attrReader.getEnumCount(); uint32_t numDocs = numValues; diff --git a/searchlib/src/vespa/searchlib/attribute/stringbase.cpp b/searchlib/src/vespa/searchlib/attribute/stringbase.cpp index 32b5b3ca373..40e706e924d 100644 --- a/searchlib/src/vespa/searchlib/attribute/stringbase.cpp +++ b/searchlib/src/vespa/searchlib/attribute/stringbase.cpp @@ -1,11 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "stringbase.h" #include "attributevector.hpp" +#include "load_utils.h" #include "readerbase.h" +#include "stringbase.h" #include <vespa/document/fieldvalue/fieldvalue.h> -#include <vespa/searchlib/util/fileutil.hpp> #include <vespa/searchlib/query/query_term_ucs4.h> +#include <vespa/searchlib/util/fileutil.hpp> #include <vespa/vespalib/locale/c.h> #include <vespa/vespalib/util/array.hpp> @@ -316,7 +317,7 @@ bool StringAttribute::apply(DocId, const ArithmeticValueUpdate & ) bool StringAttribute::onLoadEnumerated(ReaderBase &attrReader) { - fileutil::LoadedBuffer::UP udatBuffer(loadUDAT()); + auto udatBuffer = attribute::LoadUtils::loadUDAT(*this); bool hasIdx(attrReader.hasIdx()); size_t numDocs(0); diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt index 2ee722902c8..f4a9e27b79d 100644 --- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchlib_common OBJECT SOURCES + adaptive_sequenced_executor.cpp allocatedbitvector.cpp bitvector.cpp bitvectorcache.cpp diff --git a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp new file mode 100644 index 00000000000..f31172b1eba --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp @@ -0,0 +1,324 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "adaptive_sequenced_executor.h" + +namespace search { + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::Strand::Strand() + : state(State::IDLE), + queue() +{ +} + +AdaptiveSequencedExecutor::Strand::~Strand() +{ + assert(queue.empty()); +} + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::Worker::Worker() + : cond(), + state(State::RUNNING), + strand(nullptr) +{ +} + +AdaptiveSequencedExecutor::Worker::~Worker() +{ + assert(state == State::DONE); + assert(strand == nullptr); +} + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::Self::Self() + : cond(), + state(State::OPEN), + waiting_tasks(0), + pending_tasks(0) +{ +} + +AdaptiveSequencedExecutor::Self::~Self() +{ + assert(state == State::CLOSED); + assert(waiting_tasks == 0); + assert(pending_tasks == 0); +} + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::ThreadTools::ThreadTools(AdaptiveSequencedExecutor &parent_in) + : parent(parent_in), + pool(std::make_unique<FastOS_ThreadPool>(STACK_SIZE)), + allow_worker_exit() +{ +} + +AdaptiveSequencedExecutor::ThreadTools::~ThreadTools() +{ + assert(pool->isClosed()); +} + +void +AdaptiveSequencedExecutor::ThreadTools::Run(FastOS_ThreadInterface *, void *) +{ + parent.worker_main(); +} + +void +AdaptiveSequencedExecutor::ThreadTools::start(size_t num_threads) +{ + for (size_t i = 0; i < num_threads; ++i) { + FastOS_ThreadInterface *thread = pool->NewThread(this); + assert(thread != nullptr); + (void)thread; + } +} + +void +AdaptiveSequencedExecutor::ThreadTools::close() +{ + allow_worker_exit.countDown(); + pool->Close(); +} + +//----------------------------------------------------------------------------- + +void +AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock) +{ + while (_self.state == Self::State::BLOCKED) { + _self.cond.wait(lock); + } + while ((_self.state == Self::State::OPEN) && (_self.pending_tasks >= _cfg.max_pending)) { + _self.state = Self::State::BLOCKED; + while (_self.state == Self::State::BLOCKED) { + _self.cond.wait(lock); + } + } +} + +bool +AdaptiveSequencedExecutor::maybe_unblock_self(const std::unique_lock<std::mutex> &) +{ + if ((_self.state == Self::State::BLOCKED) && (_self.pending_tasks < _cfg.wakeup_limit)) { + _self.state = Self::State::OPEN; + return true; + } + return false; +} + +AdaptiveSequencedExecutor::Worker * +AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex> &) +{ + if ((_self.waiting_tasks > _cfg.max_waiting) && (!_worker_stack.empty())) { + assert(!_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = _wait_queue.front(); + _wait_queue.pop(); + assert(worker->strand->state == Strand::State::WAITING); + assert(!worker->strand->queue.empty()); + worker->strand->state = Strand::State::ACTIVE; + assert(_self.waiting_tasks >= worker->strand->queue.size()); + _self.waiting_tasks -= worker->strand->queue.size(); + return worker; + } + return nullptr; +} + +bool +AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock) +{ + assert(worker.strand == nullptr); + if (!_wait_queue.empty()) { + worker.strand = _wait_queue.front(); + _wait_queue.pop(); + assert(worker.strand->state == Strand::State::WAITING); + assert(!worker.strand->queue.empty()); + worker.strand->state = Strand::State::ACTIVE; + assert(_self.waiting_tasks >= worker.strand->queue.size()); + _self.waiting_tasks -= worker.strand->queue.size(); + } else if (_self.state == Self::State::CLOSED) { + worker.state = Worker::State::DONE; + } else { + worker.state = Worker::State::BLOCKED; + _worker_stack.push(&worker); + while (worker.state == Worker::State::BLOCKED) { + worker.cond.wait(lock); + } + } + return (worker.state == Worker::State::RUNNING); +} + +bool +AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock) +{ + if (worker.strand == nullptr) { + return obtain_strand(worker, lock); + } + if (worker.strand->queue.empty()) { + worker.strand->state = Strand::State::IDLE; + worker.strand = nullptr; + return obtain_strand(worker, lock); + } + if (!_wait_queue.empty()) { + worker.strand->state = Strand::State::WAITING; + _self.waiting_tasks += worker.strand->queue.size(); + _wait_queue.push(worker.strand); + worker.strand = nullptr; + return obtain_strand(worker, lock); + } + return true; +} + +AdaptiveSequencedExecutor::Task::UP +AdaptiveSequencedExecutor::next_task(Worker &worker) +{ + Task::UP task; + Worker *worker_to_wake = nullptr; + auto guard = std::unique_lock(_mutex); + if (exchange_strand(worker, guard)) { + assert(worker.state == Worker::State::RUNNING); + assert(worker.strand != nullptr); + assert(!worker.strand->queue.empty()); + task = std::move(worker.strand->queue.front()); + worker.strand->queue.pop(); + _stats.queueSize.add(--_self.pending_tasks); + worker_to_wake = get_worker_to_wake(guard); + } else { + assert(worker.state == Worker::State::DONE); + assert(worker.strand == nullptr); + } + bool signal_self = maybe_unblock_self(guard); + guard.unlock(); // UNLOCK + if (worker_to_wake != nullptr) { + worker_to_wake->cond.notify_one(); + } + if (signal_self) { + _self.cond.notify_all(); + } + return task; +} + +void +AdaptiveSequencedExecutor::worker_main() +{ + Worker worker; + while (Task::UP my_task = next_task(worker)) { + my_task->run(); + } + _thread_tools->allow_worker_exit.await(); +} + +AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, + size_t max_waiting, size_t max_pending) + : ISequencedTaskExecutor(num_strands), + _thread_tools(std::make_unique<ThreadTools>(*this)), + _mutex(), + _strands(num_strands), + _wait_queue(num_strands), + _worker_stack(num_threads), + _self(), + _stats(), + _cfg(num_threads, max_waiting, max_pending) +{ + _stats.queueSize.add(_self.pending_tasks); + _thread_tools->start(num_threads); +} + +AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor() +{ + sync(); + { + auto guard = std::unique_lock(_mutex); + assert(_self.state == Self::State::OPEN); + _self.state = Self::State::CLOSED; + while (!_worker_stack.empty()) { + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::DONE; + worker->cond.notify_one(); + } + _self.cond.notify_all(); + } + _thread_tools->close(); + assert(_wait_queue.empty()); + assert(_worker_stack.empty()); +} + +void +AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) +{ + assert(id.getId() < _strands.size()); + Strand &strand = _strands[id.getId()]; + auto guard = std::unique_lock(_mutex); + maybe_block_self(guard); + assert(_self.state != Self::State::CLOSED); + strand.queue.push(std::move(task)); + _stats.queueSize.add(++_self.pending_tasks); + ++_stats.acceptedTasks; + if (strand.state == Strand::State::WAITING) { + ++_self.waiting_tasks; + } else if (strand.state == Strand::State::IDLE) { + if (_worker_stack.size() < _cfg.num_threads) { + strand.state = Strand::State::WAITING; + _wait_queue.push(&strand); + _self.waiting_tasks += strand.queue.size(); + } else { + strand.state = Strand::State::ACTIVE; + assert(_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = &strand; + guard.unlock(); // UNLOCK + worker->cond.notify_one(); + } + } +} + +void +AdaptiveSequencedExecutor::sync() +{ + vespalib::CountDownLatch latch(_strands.size()); + for (size_t i = 0; i < _strands.size(); ++i) { + execute(ExecutorId(i), [&](){ latch.countDown(); }); + } + latch.await(); +} + +void +AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit) +{ + auto guard = std::unique_lock(_mutex); + _cfg.set_max_pending(task_limit); + bool signal_self = maybe_unblock_self(guard); + guard.unlock(); // UNLOCK + if (signal_self) { + _self.cond.notify_all(); + } +} + +AdaptiveSequencedExecutor::Stats +AdaptiveSequencedExecutor::getStats() +{ + auto guard = std::lock_guard(_mutex); + Stats stats = _stats; + _stats = Stats(); + _stats.queueSize.add(_self.pending_tasks); + return stats; +} + +} diff --git a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h new file mode 100644 index 00000000000..3abc095e9df --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h @@ -0,0 +1,126 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "isequencedtaskexecutor.h" +#include <vespa/vespalib/util/arrayqueue.hpp> +#include <vespa/vespalib/util/gate.h> +#include <vespa/fastos/thread.h> +#include <mutex> +#include <condition_variable> +#include <cassert> + +namespace search { + +/** + * Sequenced executor that balances the number of active threads in + * order to optimize for throughput over latency by minimizing the + * number of critical-path wakeups. + **/ +class AdaptiveSequencedExecutor : public ISequencedTaskExecutor +{ +private: + using Stats = vespalib::ExecutorStats; + using Task = vespalib::Executor::Task; + + /** + * Values used to configure the executor. + **/ + struct Config { + size_t num_threads; + size_t max_waiting; + size_t max_pending; + size_t wakeup_limit; + void set_max_pending(size_t max_pending_in) { + max_pending = std::max(1uL, max_pending_in); + wakeup_limit = std::max(1uL, size_t(max_pending * 0.9)); + assert(wakeup_limit > 0); + assert(wakeup_limit <= max_pending); + } + Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in) + : num_threads(num_threads_in), max_waiting(max_waiting_in), max_pending(1000), wakeup_limit(900) + { + assert(num_threads > 0); + set_max_pending(max_pending_in); + } + }; + + /** + * Tasks that need to be sequenced are handled by a single strand. + **/ + struct Strand { + enum class State { IDLE, WAITING, ACTIVE }; + State state; + vespalib::ArrayQueue<Task::UP> queue; + Strand(); + ~Strand(); + }; + + /** + * The state of a single worker thread. + **/ + struct Worker { + enum class State { RUNNING, BLOCKED, DONE }; + std::condition_variable cond; + State state; + Strand *strand; + Worker(); + ~Worker(); + }; + + /** + * State related to the executor itself. + **/ + struct Self { + enum class State { OPEN, BLOCKED, CLOSED }; + std::condition_variable cond; + State state; + size_t waiting_tasks; + size_t pending_tasks; + Self(); + ~Self(); + }; + + /** + * Stuff related to worker thread startup and shutdown. + **/ + struct ThreadTools : FastOS_Runnable { + static constexpr size_t STACK_SIZE = (256 * 1024); + AdaptiveSequencedExecutor &parent; + std::unique_ptr<FastOS_ThreadPool> pool; + vespalib::Gate allow_worker_exit; + ThreadTools(AdaptiveSequencedExecutor &parent_in); + ~ThreadTools(); + void Run(FastOS_ThreadInterface *, void *) override; + void start(size_t num_threads); + void close(); + }; + + std::unique_ptr<ThreadTools> _thread_tools; + std::mutex _mutex; + std::vector<Strand> _strands; + vespalib::ArrayQueue<Strand*> _wait_queue; + vespalib::ArrayQueue<Worker*> _worker_stack; + Self _self; + Stats _stats; + Config _cfg; + + void maybe_block_self(std::unique_lock<std::mutex> &lock); + bool maybe_unblock_self(const std::unique_lock<std::mutex> &lock); + + Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock); + bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock); + bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock); + Task::UP next_task(Worker &worker); + void worker_main(); +public: + AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, + size_t max_waiting, size_t max_pending); + ~AdaptiveSequencedExecutor() override; + void executeTask(ExecutorId id, Task::UP task) override; + void sync() override; + void setTaskLimit(uint32_t task_limit) override; + vespalib::ExecutorStats getStats() override; +}; + +} diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp index 4c501defeea..a93eb1ff4bc 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp @@ -39,7 +39,7 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) { } vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { - return vespalib::ExecutorStats(0, _accepted.load(std::memory_order_relaxed), 0); + return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); } } // namespace search diff --git a/searchlib/src/vespa/searchlib/features/dotproductfeature.cpp b/searchlib/src/vespa/searchlib/features/dotproductfeature.cpp index ec31bcb5117..a8737a19eec 100644 --- a/searchlib/src/vespa/searchlib/features/dotproductfeature.cpp +++ b/searchlib/src/vespa/searchlib/features/dotproductfeature.cpp @@ -224,25 +224,27 @@ private: template <typename A> class SingleDotProductExecutorByValue final : public fef::FeatureExecutor { public: - SingleDotProductExecutorByValue(const A * attribute, multivalue::WeightedValue<typename A::BaseType> keyValue) + SingleDotProductExecutorByValue(const A * attribute, typename A::BaseType key, feature_t value) : _attribute(attribute), - _keyValue(keyValue) + _key(key), + _value(value) {} void execute(uint32_t docId) override { const multivalue::WeightedValue<typename A::BaseType> *values(nullptr); uint32_t sz = _attribute->getRawValues(docId, values); for (size_t i = 0; i < sz; ++i) { - if (values[i].value() == _keyValue.value()) { - outputs().set_number(0, values[i].weight()*_keyValue.weight()); + if (values[i].value() == _key) { + outputs().set_number(0, values[i].weight() * _value); return; } } outputs().set_number(0, 0); } private: - const A * _attribute; - multivalue::WeightedValue<typename A::BaseType> _keyValue; + const A * _attribute; + typename A::BaseType _key; + feature_t _value; }; } @@ -628,9 +630,9 @@ size_t extractSize(const dotproduct::wset::IntegerVectorT<T> & v) { } template<typename T> -multivalue::WeightedValue<T> extractElem(const dotproduct::wset::IntegerVectorT<T> & v, size_t idx) { +std::pair<T, feature_t> extractElem(const dotproduct::wset::IntegerVectorT<T> & v, size_t idx) { const auto & pair = v.getVector()[idx]; - return multivalue::WeightedValue<T>(pair.first, pair.second); + return std::pair<T, feature_t>(pair.first, pair.second); } template<typename T> @@ -639,7 +641,7 @@ size_t extractSize(const std::unique_ptr<dotproduct::wset::IntegerVectorT<T>> & } template<typename T> -multivalue::WeightedValue<T> extractElem(const std::unique_ptr<dotproduct::wset::IntegerVectorT<T>> & v, size_t idx) { +std::pair<T, feature_t> extractElem(const std::unique_ptr<dotproduct::wset::IntegerVectorT<T>> & v, size_t idx) { return extractElem(*v, idx); } @@ -656,7 +658,8 @@ createForDirectWSetImpl(const IAttributeVector * attribute, V && vector, vespali auto * exactA = dynamic_cast<const ExactA *>(iattr); if (exactA != nullptr) { if (extractSize(vector) == 1) { - return stash.create<SingleDotProductExecutorByValue<ExactA>>(exactA, extractElem(vector, 0ul)); + auto elem = extractElem(vector, 0ul); + return stash.create<SingleDotProductExecutorByValue<ExactA>>(exactA, elem.first, elem.second); } return stash.create<DotProductExecutor<ExactA>>(exactA, std::forward<V>(vector)); } diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt index 7090158c773..0f106f693f8 100644 --- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt @@ -9,11 +9,15 @@ vespa_add_library(searchlib_tensor OBJECT generic_tensor_attribute.cpp generic_tensor_attribute_saver.cpp generic_tensor_store.cpp + hnsw_graph.cpp hnsw_index.cpp + hnsw_index_loader.cpp + hnsw_index_saver.cpp imported_tensor_attribute_vector.cpp imported_tensor_attribute_vector_read_guard.cpp inv_log_level_generator.cpp nearest_neighbor_index.cpp + nearest_neighbor_index_saver.cpp tensor_attribute.cpp tensor_store.cpp DEPENDS diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 627f7f0dfa9..68ce0c1bb00 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -3,16 +3,19 @@ #include "dense_tensor_attribute.h" #include "dense_tensor_attribute_saver.h" #include "nearest_neighbor_index.h" +#include "nearest_neighbor_index_saver.h" #include "tensor_attribute.hpp" #include <vespa/eval/tensor/dense/mutable_dense_tensor_view.h> #include <vespa/eval/tensor/tensor.h> #include <vespa/fastlib/io/bufferedfile.h> +#include <vespa/searchlib/attribute/load_utils.h> #include <vespa/searchlib/attribute/readerbase.h> #include <vespa/vespalib/data/slime/inserter.h> #include <vespa/log/log.h> LOG_SETUP(".searchlib.tensor.dense_tensor_attribute"); +using search::attribute::LoadUtils; using vespalib::eval::ValueType; using vespalib::slime::ObjectInserter; using vespalib::tensor::MutableDenseTensorView; @@ -148,6 +151,8 @@ DenseTensorAttribute::onLoad() if (!tensorReader.hasData()) { return false; } + bool has_index_file = LoadUtils::file_exists(*this, DenseTensorAttributeSaver::index_file_suffix()); + setCreateSerialNum(tensorReader.getCreateSerialNum()); assert(tensorReader.getVersion() == DENSE_TENSOR_ATTRIBUTE_VERSION); assert(getConfig().tensorType().to_spec() == @@ -160,7 +165,7 @@ DenseTensorAttribute::onLoad() auto raw = _denseTensorStore.allocRawBuffer(); tensorReader.readTensor(raw.data, _denseTensorStore.getBufSize()); _refVector.push_back(raw.ref); - if (_index) { + if (_index && !has_index_file) { // This ensures that get_vector() (via getTensor()) is able to find the newly added tensor. setCommittedDocIdLimit(lid + 1); _index->add_document(lid); @@ -171,6 +176,12 @@ DenseTensorAttribute::onLoad() } setNumDocs(numDocs); setCommittedDocIdLimit(numDocs); + if (_index && has_index_file) { + auto buffer = LoadUtils::loadFile(*this, DenseTensorAttributeSaver::index_file_suffix()); + if (!_index->load(*buffer)) { + return false; + } + } return true; } @@ -180,11 +191,13 @@ DenseTensorAttribute::onInitSave(vespalib::stringref fileName) { vespalib::GenerationHandler::Guard guard(getGenerationHandler(). takeGuard()); + auto index_saver = (_index ? _index->make_saver() : std::unique_ptr<NearestNeighborIndexSaver>()); return std::make_unique<DenseTensorAttributeSaver> (std::move(guard), this->createAttributeHeader(fileName), getRefCopy(), - _denseTensorStore); + _denseTensorStore, + std::move(index_saver)); } void diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.cpp index d78adab81b5..fd8d6162f01 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.cpp @@ -1,20 +1,19 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "dense_tensor_attribute_saver.h" -#include <vespa/vespalib/util/bufferwriter.h> #include "dense_tensor_store.h" +#include "nearest_neighbor_index_saver.h" +#include <vespa/vespalib/util/bufferwriter.h> #include <vespa/searchlib/attribute/iattributesavetarget.h> using vespalib::GenerationHandler; -namespace search { - -namespace tensor { +namespace search::tensor { namespace { -static const uint8_t tensorIsNotPresent = 0; -static const uint8_t tensorIsPresent = 1; +constexpr uint8_t tensorIsNotPresent = 0; +constexpr uint8_t tensorIsPresent = 1; } @@ -22,42 +21,60 @@ DenseTensorAttributeSaver:: DenseTensorAttributeSaver(GenerationHandler::Guard &&guard, const attribute::AttributeHeader &header, RefCopyVector &&refs, - const DenseTensorStore &tensorStore) + const DenseTensorStore &tensorStore, + IndexSaverUP index_saver) : AttributeSaver(std::move(guard), header), _refs(std::move(refs)), - _tensorStore(tensorStore) + _tensorStore(tensorStore), + _index_saver(std::move(index_saver)) { } +DenseTensorAttributeSaver::~DenseTensorAttributeSaver() = default; -DenseTensorAttributeSaver::~DenseTensorAttributeSaver() +vespalib::string +DenseTensorAttributeSaver::index_file_suffix() { + return "nnidx"; } - bool DenseTensorAttributeSaver::onSave(IAttributeSaveTarget &saveTarget) { - std::unique_ptr<BufferWriter> - datWriter(saveTarget.datWriter().allocBufferWriter()); + if (_index_saver) { + if (!saveTarget.setup_writer(index_file_suffix(), "Binary data file for nearest neighbor index")) { + return false; + } + } + + auto dat_writer = saveTarget.datWriter().allocBufferWriter(); + save_tensor_store(*dat_writer); + + if (_index_saver) { + auto index_writer = saveTarget.get_writer(index_file_suffix()).allocBufferWriter(); + // Note: Implementation of save() is responsible to call BufferWriter::flush(). + _index_saver->save(*index_writer); + } + return true; +} + +void +DenseTensorAttributeSaver::save_tensor_store(BufferWriter& writer) const +{ const uint32_t docIdLimit(_refs.size()); const uint32_t cellSize = _tensorStore.getCellSize(); for (uint32_t lid = 0; lid < docIdLimit; ++lid) { if (_refs[lid].valid()) { auto raw = _tensorStore.getRawBuffer(_refs[lid]); - datWriter->write(&tensorIsPresent, sizeof(tensorIsPresent)); + writer.write(&tensorIsPresent, sizeof(tensorIsPresent)); size_t numCells = _tensorStore.getNumCells(); size_t rawLen = numCells * cellSize; - datWriter->write(static_cast<const char *>(raw), rawLen); + writer.write(static_cast<const char *>(raw), rawLen); } else { - datWriter->write(&tensorIsNotPresent, sizeof(tensorIsNotPresent)); + writer.write(&tensorIsNotPresent, sizeof(tensorIsNotPresent)); } } - datWriter->flush(); - return true; + writer.flush(); } - -} // namespace search::tensor - -} // namespace search +} diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.h b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.h index 1f6596e82f5..895e2951cea 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.h +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute_saver.h @@ -5,28 +5,41 @@ #include "tensor_attribute.h" #include <vespa/searchlib/attribute/attributesaver.h> +namespace search { class BufferWriter; } + namespace search::tensor { class DenseTensorStore; +class NearestNeighborIndexSaver; -/* - * Class for saving a tensor attribute. +/** + * Class for saving a dense tensor attribute. + * Will also save the nearest neighbor index if existing. */ -class DenseTensorAttributeSaver : public AttributeSaver -{ +class DenseTensorAttributeSaver : public AttributeSaver { public: using RefCopyVector = TensorAttribute::RefCopyVector; private: + using GenerationHandler = vespalib::GenerationHandler; + using IndexSaverUP = std::unique_ptr<NearestNeighborIndexSaver>; + RefCopyVector _refs; const DenseTensorStore &_tensorStore; - using GenerationHandler = vespalib::GenerationHandler; + IndexSaverUP _index_saver; bool onSave(IAttributeSaveTarget &saveTarget) override; + void save_tensor_store(BufferWriter& writer) const; + public: - DenseTensorAttributeSaver(GenerationHandler::Guard &&guard, const attribute::AttributeHeader &header, - RefCopyVector &&refs, const DenseTensorStore &tensorStore); + DenseTensorAttributeSaver(GenerationHandler::Guard &&guard, + const attribute::AttributeHeader &header, + RefCopyVector &&refs, + const DenseTensorStore &tensorStore, + IndexSaverUP index_saver); ~DenseTensorAttributeSaver() override; + + static vespalib::string index_file_suffix(); }; } diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp new file mode 100644 index 00000000000..6f902a30861 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.cpp @@ -0,0 +1,72 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "hnsw_graph.h" +#include "hnsw_index.h" +#include <vespa/vespalib/datastore/array_store.hpp> +#include <vespa/vespalib/util/rcuvector.hpp> + +namespace search::tensor { + +HnswGraph::HnswGraph() + : node_refs(), + nodes(HnswIndex::make_default_node_store_config()), + links(HnswIndex::make_default_link_store_config()), + entry_docid(0), // Note that docid 0 is reserved and never used + entry_level(-1) +{} + +HnswGraph::~HnswGraph() {} + +void +HnswGraph::make_node_for_document(uint32_t docid, uint32_t num_levels) +{ + node_refs.ensure_size(docid + 1, AtomicEntryRef()); + // A document cannot be added twice. + assert(!node_refs[docid].load_acquire().valid()); + // Note: The level array instance lives as long as the document is present in the index. + vespalib::Array<AtomicEntryRef> levels(num_levels, AtomicEntryRef()); + auto node_ref = nodes.add(levels); + node_refs[docid].store_release(node_ref); +} + +void +HnswGraph::remove_node_for_document(uint32_t docid) +{ + auto node_ref = node_refs[docid].load_acquire(); + nodes.remove(node_ref); + search::datastore::EntryRef invalid; + node_refs[docid].store_release(invalid); +} + +void +HnswGraph::set_link_array(uint32_t docid, uint32_t level, const LinkArrayRef& new_links) +{ + auto new_links_ref = links.add(new_links); + auto node_ref = node_refs[docid].load_acquire(); + assert(node_ref.valid()); + auto levels = nodes.get_writable(node_ref); + auto old_links_ref = levels[level].load_acquire(); + levels[level].store_release(new_links_ref); + links.remove(old_links_ref); +} + +std::vector<uint32_t> +HnswGraph::level_histogram() const +{ + std::vector<uint32_t> result; + size_t num_nodes = node_refs.size(); + for (size_t i = 0; i < num_nodes; ++i) { + uint32_t levels = 0; + auto node_ref = node_refs[i].load_acquire(); + if (node_ref.valid()) { + levels = nodes.get(node_ref).size(); + } + while (result.size() <= levels) { + result.push_back(0); + } + ++result[levels]; + } + return result; +} + +} // namespace diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h new file mode 100644 index 00000000000..233b9087af7 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_graph.h @@ -0,0 +1,76 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/datastore/array_store.h> +#include <vespa/vespalib/datastore/atomic_entry_ref.h> +#include <vespa/vespalib/datastore/entryref.h> +#include <vespa/vespalib/util/rcuvector.h> + +namespace search::tensor { + +/** + * Stroage of a hierarchical navigable small world graph (HNSW) + * that is used for approximate K-nearest neighbor search. + */ +struct HnswGraph { + using AtomicEntryRef = search::datastore::AtomicEntryRef; + + // This uses 10 bits for buffer id -> 1024 buffers. + // As we have very short arrays we get less fragmentation with fewer and larger buffers. + using EntryRefType = search::datastore::EntryRefT<22>; + + // Provides mapping from document id -> node reference. + // The reference is used to lookup the node data in NodeStore. + using NodeRefVector = vespalib::RcuVector<AtomicEntryRef>; + + // This stores the level arrays for all nodes. + // Each node consists of an array of levels (from level 0 to n) where each entry is a reference to the link array at that level. + using NodeStore = search::datastore::ArrayStore<AtomicEntryRef, EntryRefType>; + using StoreConfig = search::datastore::ArrayStoreConfig; + using LevelArrayRef = NodeStore::ConstArrayRef; + + // This stores all link arrays. + // A link array consists of the document ids of the nodes a particular node is linked to. + using LinkStore = search::datastore::ArrayStore<uint32_t, EntryRefType>; + using LinkArrayRef = LinkStore::ConstArrayRef; + + NodeRefVector node_refs; + NodeStore nodes; + LinkStore links; + uint32_t entry_docid; + int32_t entry_level; + + HnswGraph(); + + ~HnswGraph(); + + void make_node_for_document(uint32_t docid, uint32_t num_levels); + + void remove_node_for_document(uint32_t docid); + + LevelArrayRef get_level_array(uint32_t docid) const { + auto node_ref = node_refs[docid].load_acquire(); + assert(node_ref.valid()); + return nodes.get(node_ref); + } + + LinkArrayRef get_link_array(uint32_t docid, uint32_t level) const { + auto levels = get_level_array(docid); + assert(level < levels.size()); + return links.get(levels[level].load_acquire()); + } + + void set_link_array(uint32_t docid, uint32_t level, const LinkArrayRef& new_links); + + void set_entry_node(uint32_t docid, int32_t level) { + entry_docid = docid; + entry_level = level; + } + + size_t size() const { return node_refs.size(); } + + std::vector<uint32_t> level_histogram() const; +}; + +} diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp index 988264c0455..b08d862ae6d 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.cpp @@ -2,6 +2,8 @@ #include "distance_function.h" #include "hnsw_index.h" +#include "hnsw_index_loader.h" +#include "hnsw_index_saver.h" #include "random_level_generator.h" #include <vespa/searchlib/util/state_explorer_utils.h> #include <vespa/eval/tensor/dense/typed_cells.h> @@ -66,54 +68,6 @@ HnswIndex::max_links_for_level(uint32_t level) const return (level == 0) ? _cfg.max_links_at_level_0() : _cfg.max_links_on_inserts(); } -void -HnswIndex::make_node_for_document(uint32_t docid, uint32_t num_levels) -{ - _node_refs.ensure_size(docid + 1, AtomicEntryRef()); - // A document cannot be added twice. - assert(!_node_refs[docid].load_acquire().valid()); - - // Note: The level array instance lives as long as the document is present in the index. - LevelArray levels(num_levels, AtomicEntryRef()); - auto node_ref = _nodes.add(levels); - _node_refs[docid].store_release(node_ref); -} - -void -HnswIndex::remove_node_for_document(uint32_t docid) -{ - auto node_ref = _node_refs[docid].load_acquire(); - _nodes.remove(node_ref); - EntryRef invalid; - _node_refs[docid].store_release(invalid); -} - -HnswIndex::LevelArrayRef -HnswIndex::get_level_array(uint32_t docid) const -{ - auto node_ref = _node_refs[docid].load_acquire(); - return _nodes.get(node_ref); -} - -HnswIndex::LinkArrayRef -HnswIndex::get_link_array(uint32_t docid, uint32_t level) const -{ - auto levels = get_level_array(docid); - assert(level < levels.size()); - return _links.get(levels[level].load_acquire()); -} - -void -HnswIndex::set_link_array(uint32_t docid, uint32_t level, const LinkArrayRef& links) -{ - auto new_links_ref = _links.add(links); - auto node_ref = _node_refs[docid].load_acquire(); - auto levels = _nodes.get_writable(node_ref); - auto old_links_ref = levels[level].load_acquire(); - levels[level].store_release(new_links_ref); - _links.remove(old_links_ref); -} - bool HnswIndex::have_closer_distance(HnswCandidate candidate, const LinkArrayRef& result) const { @@ -182,7 +136,7 @@ HnswIndex::select_neighbors(const HnswCandidateVector& neighbors, uint32_t max_l void HnswIndex::shrink_if_needed(uint32_t docid, uint32_t level) { - auto old_links = get_link_array(docid, level); + auto old_links = _graph.get_link_array(docid, level); uint32_t max_links = max_links_for_level(level); if (old_links.size() > max_links) { HnswCandidateVector neighbors; @@ -191,7 +145,7 @@ HnswIndex::shrink_if_needed(uint32_t docid, uint32_t level) neighbors.emplace_back(neighbor_docid, dist); } auto split = select_neighbors(neighbors, max_links); - set_link_array(docid, level, split.used); + _graph.set_link_array(docid, level, split.used); for (uint32_t removed_docid : split.unused) { remove_link_to(removed_docid, docid, level); } @@ -201,9 +155,9 @@ HnswIndex::shrink_if_needed(uint32_t docid, uint32_t level) void HnswIndex::connect_new_node(uint32_t docid, const LinkArrayRef &neighbors, uint32_t level) { - set_link_array(docid, level, neighbors); + _graph.set_link_array(docid, level, neighbors); for (uint32_t neighbor_docid : neighbors) { - auto old_links = get_link_array(neighbor_docid, level); + auto old_links = _graph.get_link_array(neighbor_docid, level); add_link_to(neighbor_docid, level, old_links, docid); } for (uint32_t neighbor_docid : neighbors) { @@ -215,11 +169,11 @@ void HnswIndex::remove_link_to(uint32_t remove_from, uint32_t remove_id, uint32_t level) { LinkArray new_links; - auto old_links = get_link_array(remove_from, level); + auto old_links = _graph.get_link_array(remove_from, level); for (uint32_t id : old_links) { if (id != remove_id) new_links.push_back(id); } - set_link_array(remove_from, level, new_links); + _graph.set_link_array(remove_from, level, new_links); } @@ -244,7 +198,7 @@ HnswIndex::find_nearest_in_layer(const TypedCells& input, const HnswCandidate& e bool keep_searching = true; while (keep_searching) { keep_searching = false; - for (uint32_t neighbor_docid : get_link_array(nearest.docid, level)) { + for (uint32_t neighbor_docid : _graph.get_link_array(nearest.docid, level)) { double dist = calc_distance(input, neighbor_docid); if (dist < nearest.distance) { nearest = HnswCandidate(neighbor_docid, dist); @@ -259,7 +213,7 @@ void HnswIndex::search_layer(const TypedCells& input, uint32_t neighbors_to_find, FurthestPriQ& best_neighbors, uint32_t level) const { NearestPriQ candidates; - uint32_t doc_id_limit = _node_refs.size(); + uint32_t doc_id_limit = _graph.node_refs.size(); auto visited = _visited_set_pool.get(doc_id_limit); for (const auto &entry : best_neighbors.peek()) { assert(entry.docid < doc_id_limit); @@ -274,7 +228,7 @@ HnswIndex::search_layer(const TypedCells& input, uint32_t neighbors_to_find, Fur break; } candidates.pop(); - for (uint32_t neighbor_docid : get_link_array(cand.docid, level)) { + for (uint32_t neighbor_docid : _graph.get_link_array(cand.docid, level)) { if ((neighbor_docid >= doc_id_limit) || visited.is_marked(neighbor_docid)) { continue; } @@ -294,15 +248,12 @@ HnswIndex::search_layer(const TypedCells& input, uint32_t neighbors_to_find, Fur HnswIndex::HnswIndex(const DocVectorAccess& vectors, DistanceFunction::UP distance_func, RandomLevelGenerator::UP level_generator, const Config& cfg) - : _vectors(vectors), + : + _graph(), + _vectors(vectors), _distance_func(std::move(distance_func)), _level_generator(std::move(level_generator)), - _cfg(cfg), - _node_refs(), - _nodes(make_default_node_store_config()), - _links(make_default_link_store_config()), - _entry_docid(0), // Note that docid 0 is reserved and never used - _entry_level(-1) + _cfg(cfg) { } @@ -314,16 +265,16 @@ HnswIndex::add_document(uint32_t docid) auto input = get_vector(docid); // TODO: Add capping on num_levels int level = _level_generator->max_level(); - make_node_for_document(docid, level + 1); - if (_entry_docid == 0) { - _entry_docid = docid; - _entry_level = level; + _graph.make_node_for_document(docid, level + 1); + uint32_t entry_docid = get_entry_docid(); + if (entry_docid == 0) { + _graph.set_entry_node(docid, level); return; } - int search_level = _entry_level; - double entry_dist = calc_distance(input, _entry_docid); - HnswCandidate entry_point(_entry_docid, entry_dist); + int search_level = get_entry_level(); + double entry_dist = calc_distance(input, entry_docid); + HnswCandidate entry_point(entry_docid, entry_dist); while (search_level > level) { entry_point = find_nearest_in_layer(input, entry_point, search_level); --search_level; @@ -331,7 +282,7 @@ HnswIndex::add_document(uint32_t docid) FurthestPriQ best_neighbors; best_neighbors.push(entry_point); - search_level = std::min(level, _entry_level); + search_level = std::min(level, search_level); // Insert the added document in each level it should exist in. while (search_level >= 0) { @@ -341,9 +292,8 @@ HnswIndex::add_document(uint32_t docid) connect_new_node(docid, neighbors.used, search_level); --search_level; } - if (level > _entry_level) { - _entry_docid = docid; - _entry_level = level; + if (level > get_entry_level()) { + _graph.set_entry_node(docid, level); } } @@ -353,7 +303,7 @@ HnswIndex::mutual_reconnect(const LinkArrayRef &cluster, uint32_t level) std::vector<PairDist> pairs; for (uint32_t i = 0; i + 1 < cluster.size(); ++i) { uint32_t n_id_1 = cluster[i]; - LinkArrayRef n_list_1 = get_link_array(n_id_1, level); + LinkArrayRef n_list_1 = _graph.get_link_array(n_id_1, level); for (uint32_t j = i + 1; j < cluster.size(); ++j) { uint32_t n_id_2 = cluster[j]; if (has_link_to(n_list_1, n_id_2)) continue; @@ -362,10 +312,10 @@ HnswIndex::mutual_reconnect(const LinkArrayRef &cluster, uint32_t level) } std::sort(pairs.begin(), pairs.end()); for (const PairDist & pair : pairs) { - LinkArrayRef old_links_1 = get_link_array(pair.id_first, level); + LinkArrayRef old_links_1 = _graph.get_link_array(pair.id_first, level); if (old_links_1.size() >= _cfg.max_links_on_inserts()) continue; - LinkArrayRef old_links_2 = get_link_array(pair.id_second, level); + LinkArrayRef old_links_2 = _graph.get_link_array(pair.id_second, level); if (old_links_2.size() >= _cfg.max_links_on_inserts()) continue; add_link_to(pair.id_first, level, old_links_1, pair.id_second); @@ -376,27 +326,25 @@ HnswIndex::mutual_reconnect(const LinkArrayRef &cluster, uint32_t level) void HnswIndex::remove_document(uint32_t docid) { - bool need_new_entrypoint = (docid == _entry_docid); + bool need_new_entrypoint = (docid == get_entry_docid()); LinkArray empty; - LevelArrayRef node_levels = get_level_array(docid); + LevelArrayRef node_levels = _graph.get_level_array(docid); for (int level = node_levels.size(); level-- > 0; ) { - LinkArrayRef my_links = get_link_array(docid, level); + LinkArrayRef my_links = _graph.get_link_array(docid, level); for (uint32_t neighbor_id : my_links) { if (need_new_entrypoint) { - _entry_docid = neighbor_id; - _entry_level = level; + _graph.set_entry_node(neighbor_id, level); need_new_entrypoint = false; } remove_link_to(neighbor_id, docid, level); } mutual_reconnect(my_links, level); - set_link_array(docid, level, empty); + _graph.set_link_array(docid, level, empty); } if (need_new_entrypoint) { - _entry_docid = 0; - _entry_level = -1; + _graph.set_entry_node(0, -1); } - remove_node_for_document(docid); + _graph.remove_node_for_document(docid); } void @@ -404,26 +352,26 @@ HnswIndex::transfer_hold_lists(generation_t current_gen) { // Note: RcuVector transfers hold lists as part of reallocation based on current generation. // We need to set the next generation here, as it is incremented on a higher level right after this call. - _node_refs.setGeneration(current_gen + 1); - _nodes.transferHoldLists(current_gen); - _links.transferHoldLists(current_gen); + _graph.node_refs.setGeneration(current_gen + 1); + _graph.nodes.transferHoldLists(current_gen); + _graph.links.transferHoldLists(current_gen); } void HnswIndex::trim_hold_lists(generation_t first_used_gen) { - _node_refs.removeOldGenerations(first_used_gen); - _nodes.trimHoldLists(first_used_gen); - _links.trimHoldLists(first_used_gen); + _graph.node_refs.removeOldGenerations(first_used_gen); + _graph.nodes.trimHoldLists(first_used_gen); + _graph.links.trimHoldLists(first_used_gen); } vespalib::MemoryUsage HnswIndex::memory_usage() const { vespalib::MemoryUsage result; - result.merge(_node_refs.getMemoryUsage()); - result.merge(_nodes.getMemoryUsage()); - result.merge(_links.getMemoryUsage()); + result.merge(_graph.node_refs.getMemoryUsage()); + result.merge(_graph.nodes.getMemoryUsage()); + result.merge(_graph.links.getMemoryUsage()); result.merge(_visited_set_pool.memory_usage()); return result; } @@ -433,6 +381,34 @@ HnswIndex::get_state(const vespalib::slime::Inserter& inserter) const { auto& object = inserter.insertObject(); StateExplorerUtils::memory_usage_to_slime(memory_usage(), object.setObject("memory_usage")); + object.setLong("nodes", _graph.size()); + auto& histogram_array = object.setArray("level_histogram"); + auto level_histogram = _graph.level_histogram(); + for (uint32_t hist_val : level_histogram) { + histogram_array.addLong(hist_val); + } + uint32_t reachable = count_reachable_nodes(); + uint32_t unreachable = _graph.size() - reachable; + if (level_histogram.size() > 0) { + unreachable -= level_histogram[0]; + } + object.setLong("unreachable_nodes", unreachable); + object.setLong("entry_docid", _graph.entry_docid); + object.setLong("entry_level", _graph.entry_level); +} + +std::unique_ptr<NearestNeighborIndexSaver> +HnswIndex::make_saver() const +{ + return std::make_unique<HnswIndexSaver>(_graph); +} + +bool +HnswIndex::load(const fileutil::LoadedBuffer& buf) +{ + assert(get_entry_docid() == 0); // cannot load after index has data + HnswIndexLoader loader(_graph); + return loader.load(buf); } struct NeighborsByDocId { @@ -463,12 +439,13 @@ FurthestPriQ HnswIndex::top_k_candidates(const TypedCells &vector, uint32_t k) const { FurthestPriQ best_neighbors; - if (_entry_level < 0) { + if (get_entry_level() < 0) { return best_neighbors; } - double entry_dist = calc_distance(vector, _entry_docid); - HnswCandidate entry_point(_entry_docid, entry_dist); - int search_level = _entry_level; + uint32_t entry_docid = get_entry_docid(); + int search_level = get_entry_level(); + double entry_dist = calc_distance(vector, entry_docid); + HnswCandidate entry_point(entry_docid, entry_dist); while (search_level > 0) { entry_point = find_nearest_in_layer(vector, entry_point, search_level); --search_level; @@ -481,14 +458,14 @@ HnswIndex::top_k_candidates(const TypedCells &vector, uint32_t k) const HnswNode HnswIndex::get_node(uint32_t docid) const { - auto node_ref = _node_refs[docid].load_acquire(); + auto node_ref = _graph.node_refs[docid].load_acquire(); if (!node_ref.valid()) { return HnswNode(); } - auto levels = _nodes.get(node_ref); + auto levels = _graph.nodes.get(node_ref); HnswNode::LevelArray result; for (const auto& links_ref : levels) { - auto links = _links.get(links_ref.load_acquire()); + auto links = _graph.links.get(links_ref.load_acquire()); HnswNode::LinkArray result_links(links.begin(), links.end()); std::sort(result_links.begin(), result_links.end()); result.push_back(result_links); @@ -501,14 +478,13 @@ HnswIndex::set_node(uint32_t docid, const HnswNode &node) { size_t num_levels = node.size(); assert(num_levels > 0); - make_node_for_document(docid, num_levels); + _graph.make_node_for_document(docid, num_levels); for (size_t level = 0; level < num_levels; ++level) { connect_new_node(docid, node.level(level), level); } int max_level = num_levels - 1; - if (_entry_level < max_level) { - _entry_docid = docid; - _entry_level = max_level; + if (get_entry_level() < max_level) { + _graph.set_entry_node(docid, max_level); } } @@ -516,15 +492,15 @@ bool HnswIndex::check_link_symmetry() const { bool all_sym = true; - for (size_t docid = 0; docid < _node_refs.size(); ++docid) { - auto node_ref = _node_refs[docid].load_acquire(); + for (size_t docid = 0; docid < _graph.node_refs.size(); ++docid) { + auto node_ref = _graph.node_refs[docid].load_acquire(); if (node_ref.valid()) { - auto levels = _nodes.get(node_ref); + auto levels = _graph.nodes.get(node_ref); uint32_t level = 0; for (const auto& links_ref : levels) { - auto links = _links.get(links_ref.load_acquire()); + auto links = _graph.links.get(links_ref.load_acquire()); for (auto neighbor_docid : links) { - auto neighbor_links = get_link_array(neighbor_docid, level); + auto neighbor_links = _graph.get_link_array(neighbor_docid, level); if (! has_link_to(neighbor_links, docid)) { all_sym = false; } @@ -536,4 +512,31 @@ HnswIndex::check_link_symmetry() const return all_sym; } +uint32_t +HnswIndex::count_reachable_nodes() const +{ + int search_level = get_entry_level(); + if (search_level < 0) { + return 0; + } + auto visited = _visited_set_pool.get(_graph.size()); + uint32_t entry_id = get_entry_docid(); + LinkArray found_links; + found_links.push_back(entry_id); + visited.mark(entry_id); + while (search_level >= 0) { + for (uint32_t idx = 0; idx < found_links.size(); ++idx) { + uint32_t docid = found_links[idx]; + auto neighbors = _graph.get_link_array(docid, search_level); + for (uint32_t neighbor : neighbors) { + if (visited.is_marked(neighbor)) continue; + visited.mark(neighbor); + found_links.push_back(neighbor); + } + } + --search_level; + } + return found_links.size(); +} + } // namespace diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h index 130c012effe..95001853710 100644 --- a/searchlib/src/vespa/searchlib/tensor/hnsw_index.h +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index.h @@ -8,6 +8,7 @@ #include "hnsw_node.h" #include "nearest_neighbor_index.h" #include "random_level_generator.h" +#include "hnsw_graph.h" #include <vespa/eval/tensor/dense/typed_cells.h> #include <vespa/searchlib/common/bitvector.h> #include <vespa/vespalib/datastore/array_store.h> @@ -57,54 +58,30 @@ public: }; protected: - using AtomicEntryRef = search::datastore::AtomicEntryRef; + using AtomicEntryRef = HnswGraph::AtomicEntryRef; + using NodeStore = HnswGraph::NodeStore; - // This uses 10 bits for buffer id -> 1024 buffers. - // As we have very short arrays we get less fragmentation with fewer and larger buffers. - using EntryRefType = search::datastore::EntryRefT<22>; - - // Provides mapping from document id -> node reference. - // The reference is used to lookup the node data in NodeStore. - using NodeRefVector = vespalib::RcuVector<AtomicEntryRef>; + using LinkStore = HnswGraph::LinkStore; + using LinkArrayRef = HnswGraph::LinkArrayRef; + using LinkArray = vespalib::Array<uint32_t>; - // This stores the level arrays for all nodes. - // Each node consists of an array of levels (from level 0 to n) where each entry is a reference to the link array at that level. - using NodeStore = search::datastore::ArrayStore<AtomicEntryRef, EntryRefType>; - using LevelArrayRef = NodeStore::ConstArrayRef; + using LevelArrayRef = HnswGraph::LevelArrayRef; using LevelArray = vespalib::Array<AtomicEntryRef>; - // This stores all link arrays. - // A link array consists of the document ids of the nodes a particular node is linked to. - using LinkStore = search::datastore::ArrayStore<uint32_t, EntryRefType>; - using LinkArrayRef = LinkStore::ConstArrayRef; - using LinkArray = vespalib::Array<uint32_t>; - using TypedCells = vespalib::tensor::TypedCells; + HnswGraph _graph; const DocVectorAccess& _vectors; DistanceFunction::UP _distance_func; RandomLevelGenerator::UP _level_generator; Config _cfg; - NodeRefVector _node_refs; - NodeStore _nodes; - LinkStore _links; mutable vespalib::ReusableSetPool _visited_set_pool; - uint32_t _entry_docid; - int _entry_level; - - static search::datastore::ArrayStoreConfig make_default_node_store_config(); - static search::datastore::ArrayStoreConfig make_default_link_store_config(); uint32_t max_links_for_level(uint32_t level) const; - void make_node_for_document(uint32_t docid, uint32_t num_levels); - void remove_node_for_document(uint32_t docid); - LevelArrayRef get_level_array(uint32_t docid) const; - LinkArrayRef get_link_array(uint32_t docid, uint32_t level) const; - void set_link_array(uint32_t docid, uint32_t level, const LinkArrayRef& links); void add_link_to(uint32_t docid, uint32_t level, const LinkArrayRef& old_links, uint32_t new_link) { LinkArray new_links(old_links.begin(), old_links.end()); new_links.push_back(new_link); - set_link_array(docid, level, new_links); + _graph.set_link_array(docid, level, new_links); } /** @@ -155,18 +132,25 @@ public: vespalib::MemoryUsage memory_usage() const override; void get_state(const vespalib::slime::Inserter& inserter) const override; + std::unique_ptr<NearestNeighborIndexSaver> make_saver() const override; + bool load(const fileutil::LoadedBuffer& buf) override; + std::vector<Neighbor> find_top_k(uint32_t k, TypedCells vector, uint32_t explore_k) const override; const DistanceFunction *distance_function() const override { return _distance_func.get(); } FurthestPriQ top_k_candidates(const TypedCells &vector, uint32_t k) const; - uint32_t get_entry_docid() const { return _entry_docid; } - uint32_t get_entry_level() const { return _entry_level; } + uint32_t get_entry_docid() const { return _graph.entry_docid; } + int32_t get_entry_level() const { return _graph.entry_level; } // Should only be used by unit tests. HnswNode get_node(uint32_t docid) const; void set_node(uint32_t docid, const HnswNode &node); bool check_link_symmetry() const; + uint32_t count_reachable_nodes() const; + + static search::datastore::ArrayStoreConfig make_default_node_store_config(); + static search::datastore::ArrayStoreConfig make_default_link_store_config(); }; } diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp new file mode 100644 index 00000000000..f02ead86a8d --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.cpp @@ -0,0 +1,47 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "hnsw_index_loader.h" +#include "hnsw_graph.h" +#include <vespa/searchlib/util/fileutil.h> + +namespace search::tensor { + +HnswIndexLoader::~HnswIndexLoader() {} + +HnswIndexLoader::HnswIndexLoader(HnswGraph &graph) + : _graph(graph), _ptr(nullptr), _end(nullptr), _failed(false) +{ +} + +bool +HnswIndexLoader::load(const fileutil::LoadedBuffer& buf) +{ + size_t num_readable = buf.size(sizeof(uint32_t)); + _ptr = static_cast<const uint32_t *>(buf.buffer()); + _end = _ptr + num_readable; + uint32_t entry_docid = next_int(); + int32_t entry_level = next_int(); + uint32_t num_nodes = next_int(); + std::vector<uint32_t> link_array; + for (uint32_t docid = 0; docid < num_nodes; ++docid) { + uint32_t num_levels = next_int(); + if (num_levels > 0) { + _graph.make_node_for_document(docid, num_levels); + for (uint32_t level = 0; level < num_levels; ++level) { + uint32_t num_links = next_int(); + link_array.clear(); + while (num_links-- > 0) { + link_array.push_back(next_int()); + } + _graph.set_link_array(docid, level, link_array); + } + } + } + if (_failed) return false; + _graph.node_refs.ensure_size(num_nodes); + _graph.set_entry_node(entry_docid, entry_level); + return true; +} + + +} diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h new file mode 100644 index 00000000000..174f66b95ec --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_loader.h @@ -0,0 +1,35 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> + +namespace search::fileutil { class LoadedBuffer; } + +namespace search::tensor { + +class HnswGraph; + +/** + * Implements loading of HNSW graph structure from binary format. + **/ +class HnswIndexLoader { +public: + HnswIndexLoader(HnswGraph &graph); + ~HnswIndexLoader(); + bool load(const fileutil::LoadedBuffer& buf); +private: + HnswGraph &_graph; + const uint32_t *_ptr; + const uint32_t *_end; + bool _failed; + uint32_t next_int() { + if (__builtin_expect((_ptr == _end), false)) { + _failed = true; + return 0; + } + return *_ptr++; + } +}; + +} diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_saver.cpp b/searchlib/src/vespa/searchlib/tensor/hnsw_index_saver.cpp new file mode 100644 index 00000000000..acff30f8cbf --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_saver.cpp @@ -0,0 +1,57 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "hnsw_index_saver.h" +#include "hnsw_graph.h" +#include <vespa/vespalib/util/bufferwriter.h> + +namespace search::tensor { + +HnswIndexSaver::~HnswIndexSaver() {} + +HnswIndexSaver::HnswIndexSaver(const HnswGraph &graph) + : _graph_links(graph.links), _meta_data() +{ + _meta_data.entry_docid = graph.entry_docid; + _meta_data.entry_level = graph.entry_level; + size_t num_nodes = graph.node_refs.size(); + _meta_data.nodes.reserve(num_nodes); + for (size_t i = 0; i < num_nodes; ++i) { + LevelVector node; + auto node_ref = graph.node_refs[i].load_acquire(); + if (node_ref.valid()) { + auto levels = graph.nodes.get(node_ref); + for (const auto& links_ref : levels) { + auto level = links_ref.load_acquire(); + node.push_back(level); + } + } + _meta_data.nodes.emplace_back(std::move(node)); + } +} + +void +HnswIndexSaver::save(BufferWriter& writer) const +{ + writer.write(&_meta_data.entry_docid, sizeof(uint32_t)); + writer.write(&_meta_data.entry_level, sizeof(int32_t)); + uint32_t num_nodes = _meta_data.nodes.size(); + writer.write(&num_nodes, sizeof(uint32_t)); + for (const auto &node : _meta_data.nodes) { + uint32_t num_levels = node.size(); + writer.write(&num_levels, sizeof(uint32_t)); + for (auto links_ref : node) { + if (links_ref.valid()) { + vespalib::ConstArrayRef<uint32_t> link_array = _graph_links.get(links_ref); + uint32_t num_links = link_array.size(); + writer.write(&num_links, sizeof(uint32_t)); + writer.write(link_array.cbegin(), sizeof(uint32_t)*num_links); + } else { + uint32_t num_links = 0; + writer.write(&num_links, sizeof(uint32_t)); + } + } + } + writer.flush(); +} + +} diff --git a/searchlib/src/vespa/searchlib/tensor/hnsw_index_saver.h b/searchlib/src/vespa/searchlib/tensor/hnsw_index_saver.h new file mode 100644 index 00000000000..d1d8e0db19d --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/hnsw_index_saver.h @@ -0,0 +1,37 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "nearest_neighbor_index_saver.h" +#include "hnsw_graph.h" +#include <vespa/vespalib/datastore/entryref.h> +#include <vector> + +namespace search::tensor { + +/** + * Implements saving of HNSW graph structure in binary format. + * The constructor takes a snapshot of all meta-data, but + * the links will be fetched from the graph in the save() + * method. + **/ +class HnswIndexSaver : public NearestNeighborIndexSaver { +public: + using LevelVector = std::vector<search::datastore::EntryRef>; + + HnswIndexSaver(const HnswGraph &graph); + ~HnswIndexSaver(); + void save(BufferWriter& writer) const override; + +private: + struct MetaData { + uint32_t entry_docid; + int32_t entry_level; + std::vector<LevelVector> nodes; + MetaData() : entry_docid(0), entry_level(-1), nodes() {} + }; + const HnswGraph::LinkStore &_graph_links; + MetaData _meta_data; +}; + +} diff --git a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h index e7302028996..aca2ce2af66 100644 --- a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h +++ b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index.h @@ -2,17 +2,22 @@ #pragma once -#include <cstdint> -#include <vector> +#include "distance_function.h" #include <vespa/eval/tensor/dense/typed_cells.h> #include <vespa/vespalib/util/generationhandler.h> #include <vespa/vespalib/util/memoryusage.h> -#include "distance_function.h" +#include <cstdint> +#include <memory> +#include <vector> namespace vespalib::slime { struct Inserter; } +namespace search::fileutil { class LoadedBuffer; } + namespace search::tensor { +class NearestNeighborIndexSaver; + /** * Interface for an index that is used for (approximate) nearest neighbor search. */ @@ -35,6 +40,15 @@ public: virtual vespalib::MemoryUsage memory_usage() const = 0; virtual void get_state(const vespalib::slime::Inserter& inserter) const = 0; + /** + * Creates a saver that is used to save the index to binary form. + * + * This function is always called by the attribute write thread, + * and the caller ensures that an attribute read guard is held during the lifetime of the saver. + */ + virtual std::unique_ptr<NearestNeighborIndexSaver> make_saver() const = 0; + virtual bool load(const fileutil::LoadedBuffer& buf) = 0; + virtual std::vector<Neighbor> find_top_k(uint32_t k, vespalib::tensor::TypedCells vector, uint32_t explore_k) const = 0; diff --git a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index_saver.cpp b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index_saver.cpp new file mode 100644 index 00000000000..4b293488737 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index_saver.cpp @@ -0,0 +1,3 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "nearest_neighbor_index_saver.h" diff --git a/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index_saver.h b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index_saver.h new file mode 100644 index 00000000000..99d8960ae10 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/nearest_neighbor_index_saver.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace search { class BufferWriter; } + +namespace search::tensor { + +/** + * Interface that is used to save a nearest neighbor index to binary form. + * + * An instance of this interface must hold a snapshot of the index from the + * point in time the instance was created, and then save this to binary form in the save() function. + * + * The instance is always created by the attribute write thread, + * and the caller ensures that an attribute read guard is held during the lifetime of the saver. + * Data that might change later must be copied in the constructor. + * + * A flush thread is calling save() at a later point in time. + */ +class NearestNeighborIndexSaver { +public: + virtual ~NearestNeighborIndexSaver() {} + + /** + * Saves the index in binary form using the given writer. + * + * It is the responsibility of the implementer to call BufferWriter::flush() at the end. + */ + virtual void save(BufferWriter& writer) const = 0; +}; + +} |