diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-08-13 09:25:33 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-08-13 09:25:33 +0000 |
commit | e1893e482f979e56d4bbf39dce8602938d2250d1 (patch) | |
tree | 57ba3885df6a9312f5a46c258885a670e7dcbc68 /searchlib | |
parent | 1f49eabb261f4db4a47ec47d1971d4927cab867f (diff) |
Use the executor for the part that can be parallell when rebuilding index on load.
Diffstat (limited to 'searchlib')
-rw-r--r-- | searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp | 33 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp | 38 |
2 files changed, 64 insertions, 7 deletions
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp index 605fcb538d6..a49816f1a52 100644 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp +++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp @@ -28,6 +28,7 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> #include <vespa/searchlib/util/bufferwriter.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/log/log.h> LOG_SETUP("tensorattribute_test"); @@ -146,6 +147,12 @@ public: void expect_adds(const EntryVector &exp_adds) const { EXPECT_EQUAL(exp_adds, _adds); } + void expect_prepare_adds(const EntryVector &exp) const { + EXPECT_EQUAL(exp, _prepare_adds); + } + void expect_complete_adds(const EntryVector &exp) const { + EXPECT_EQUAL(exp, _complete_adds); + } void expect_empty_remove() const { EXPECT_TRUE(_removes.empty()); } @@ -305,6 +312,7 @@ struct Fixture { std::unique_ptr<NearestNeighborIndexFactory> _index_factory; std::shared_ptr<TensorAttribute> _tensorAttr; std::shared_ptr<AttributeVector> _attr; + vespalib::ThreadStackExecutor _executor; bool _denseTensors; FixtureTraits _traits; @@ -317,6 +325,7 @@ struct Fixture { _index_factory(), _tensorAttr(), _attr(), + _executor(1, 0x10000), _denseTensors(false), _traits(traits) { @@ -462,6 +471,13 @@ struct Fixture { EXPECT_TRUE(loadok); } + void loadWithExecutor() { + _tensorAttr = makeAttr(); + _attr = _tensorAttr; + bool loadok = _attr->load(&_executor); + EXPECT_TRUE(loadok); + } + TensorSpec expDenseTensor3() const { return TensorSpec(denseSpec) .add({{"x", 0}, {"y", 1}}, 11) @@ -895,11 +911,28 @@ TEST_F("onLoads() ignores saved nearest neighbor index if not enabled in config" EXPECT_EQUAL(f.as_dense_tensor().nearest_neighbor_index(), nullptr); } +TEST_F("onLoad() uses executor if major index parameters are changed", DenseTensorAttributeMockIndex) +{ + f.save_example_tensors_with_mock_index(); + f.set_hnsw_index_params(HnswIndexParams(5, 20, DistanceMetric::Euclidean)); + EXPECT_EQUAL(0ul, f._executor.getStats().acceptedTasks); + f.loadWithExecutor(); + EXPECT_EQUAL(2ul, f._executor.getStats().acceptedTasks); + f.assert_example_tensors(); + auto& index = f.mock_index(); + EXPECT_EQUAL(0, index.get_index_value()); + index.expect_adds({}); + index.expect_prepare_adds({{1, {3, 5}}, {2, {7, 9}}}); + index.expect_complete_adds({{1, {3, 5}}, {2, {7, 9}}}); +} + TEST_F("onLoad() ignores saved nearest neighbor index if major index parameters are changed", DenseTensorAttributeMockIndex) { f.save_example_tensors_with_mock_index(); f.set_hnsw_index_params(HnswIndexParams(5, 20, DistanceMetric::Euclidean)); + EXPECT_EQUAL(0ul, f._executor.getStats().acceptedTasks); f.load(); + EXPECT_EQUAL(0ul, f._executor.getStats().acceptedTasks); f.assert_example_tensors(); auto& index = f.mock_index(); EXPECT_EQUAL(0, index.get_index_value()); diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index fc1585314f8..8f56a4a416c 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -12,6 +12,9 @@ #include <vespa/vespalib/data/slime/inserter.h> #include <vespa/vespalib/util/memory_allocator.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <thread> #include <vespa/log/log.h> LOG_SETUP(".searchlib.tensor.dense_tensor_attribute"); @@ -220,23 +223,45 @@ DenseTensorAttribute::onLoad(vespalib::Executor *executor) uint32_t numDocs(tensorReader.getDocIdLimit()); _refVector.reset(); _refVector.unsafe_reserve(numDocs); + auto complete_executor = (_index && !use_index_file && executor) + ? std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000) + : std::unique_ptr<vespalib::ThreadStackExecutor>(); + std::atomic<uint64_t> pending(0); for (uint32_t lid = 0; lid < numDocs; ++lid) { if (tensorReader.is_present()) { auto raw = _denseTensorStore.allocRawBuffer(); tensorReader.readTensor(raw.data, _denseTensorStore.getBufSize()); _refVector.push_back(raw.ref); if (_index && !use_index_file) { - // This ensures that get_vector() (via getTensor()) is able to find the newly added tensor. - setCommittedDocIdLimit(lid + 1); - _index->add_document(lid); - if ((lid % 256) == 0) { - commit(); + if (executor != nullptr) { + while (pending > 1000) { std::this_thread::yield(); } + ++pending; + executor->execute(vespalib::makeLambdaTask([this, raw, lid, &pending, &complete_executor]() { + auto prepared = _index->prepare_add_document(lid, _denseTensorStore.get_typed_cells(raw.ref), getGenerationHandler().takeGuard()); + complete_executor->execute(vespalib::makeLambdaTask([this, lid, &pending, result=std::move(prepared)]() mutable { + setCommittedDocIdLimit(std::max(getCommittedDocIdLimit(), lid + 1)); + _index->complete_add_document(lid, std::move(result)); + --pending; + if ((lid % 256) == 0) { + commit(); + }; + })); + })); + } else { + // This ensures that get_vector() (via getTensor()) is able to find the newly added tensor. + setCommittedDocIdLimit(lid + 1); + _index->add_document(lid); + if ((lid % 256) == 0) { + commit(); + } } } } else { _refVector.push_back(EntryRef()); } } + while (pending > 0) { std::this_thread::sleep_for(1ms); } + commit(); setNumDocs(numDocs); setCommittedDocIdLimit(numDocs); if (_index && use_index_file) { @@ -252,8 +277,7 @@ DenseTensorAttribute::onLoad(vespalib::Executor *executor) std::unique_ptr<AttributeSaver> DenseTensorAttribute::onInitSave(vespalib::stringref fileName) { - vespalib::GenerationHandler::Guard guard(getGenerationHandler(). - takeGuard()); + vespalib::GenerationHandler::Guard guard(getGenerationHandler().takeGuard()); auto index_saver = (_index ? _index->make_saver() : std::unique_ptr<NearestNeighborIndexSaver>()); return std::make_unique<DenseTensorAttributeSaver> (std::move(guard), |