summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-08-13 09:25:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-08-13 09:25:33 +0000
commite1893e482f979e56d4bbf39dce8602938d2250d1 (patch)
tree57ba3885df6a9312f5a46c258885a670e7dcbc68 /searchlib
parent1f49eabb261f4db4a47ec47d1971d4927cab867f (diff)
Use the executor for the part that can be parallell when rebuilding index on load.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp33
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp38
2 files changed, 64 insertions, 7 deletions
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
index 605fcb538d6..a49816f1a52 100644
--- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
+++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
@@ -28,6 +28,7 @@
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/mmap_file_allocator_factory.h>
#include <vespa/searchlib/util/bufferwriter.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/log/log.h>
LOG_SETUP("tensorattribute_test");
@@ -146,6 +147,12 @@ public:
void expect_adds(const EntryVector &exp_adds) const {
EXPECT_EQUAL(exp_adds, _adds);
}
+ void expect_prepare_adds(const EntryVector &exp) const {
+ EXPECT_EQUAL(exp, _prepare_adds);
+ }
+ void expect_complete_adds(const EntryVector &exp) const {
+ EXPECT_EQUAL(exp, _complete_adds);
+ }
void expect_empty_remove() const {
EXPECT_TRUE(_removes.empty());
}
@@ -305,6 +312,7 @@ struct Fixture {
std::unique_ptr<NearestNeighborIndexFactory> _index_factory;
std::shared_ptr<TensorAttribute> _tensorAttr;
std::shared_ptr<AttributeVector> _attr;
+ vespalib::ThreadStackExecutor _executor;
bool _denseTensors;
FixtureTraits _traits;
@@ -317,6 +325,7 @@ struct Fixture {
_index_factory(),
_tensorAttr(),
_attr(),
+ _executor(1, 0x10000),
_denseTensors(false),
_traits(traits)
{
@@ -462,6 +471,13 @@ struct Fixture {
EXPECT_TRUE(loadok);
}
+ void loadWithExecutor() {
+ _tensorAttr = makeAttr();
+ _attr = _tensorAttr;
+ bool loadok = _attr->load(&_executor);
+ EXPECT_TRUE(loadok);
+ }
+
TensorSpec expDenseTensor3() const {
return TensorSpec(denseSpec)
.add({{"x", 0}, {"y", 1}}, 11)
@@ -895,11 +911,28 @@ TEST_F("onLoads() ignores saved nearest neighbor index if not enabled in config"
EXPECT_EQUAL(f.as_dense_tensor().nearest_neighbor_index(), nullptr);
}
+TEST_F("onLoad() uses executor if major index parameters are changed", DenseTensorAttributeMockIndex)
+{
+ f.save_example_tensors_with_mock_index();
+ f.set_hnsw_index_params(HnswIndexParams(5, 20, DistanceMetric::Euclidean));
+ EXPECT_EQUAL(0ul, f._executor.getStats().acceptedTasks);
+ f.loadWithExecutor();
+ EXPECT_EQUAL(2ul, f._executor.getStats().acceptedTasks);
+ f.assert_example_tensors();
+ auto& index = f.mock_index();
+ EXPECT_EQUAL(0, index.get_index_value());
+ index.expect_adds({});
+ index.expect_prepare_adds({{1, {3, 5}}, {2, {7, 9}}});
+ index.expect_complete_adds({{1, {3, 5}}, {2, {7, 9}}});
+}
+
TEST_F("onLoad() ignores saved nearest neighbor index if major index parameters are changed", DenseTensorAttributeMockIndex)
{
f.save_example_tensors_with_mock_index();
f.set_hnsw_index_params(HnswIndexParams(5, 20, DistanceMetric::Euclidean));
+ EXPECT_EQUAL(0ul, f._executor.getStats().acceptedTasks);
f.load();
+ EXPECT_EQUAL(0ul, f._executor.getStats().acceptedTasks);
f.assert_example_tensors();
auto& index = f.mock_index();
EXPECT_EQUAL(0, index.get_index_value());
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
index fc1585314f8..8f56a4a416c 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
@@ -12,6 +12,9 @@
#include <vespa/vespalib/data/slime/inserter.h>
#include <vespa/vespalib/util/memory_allocator.h>
#include <vespa/vespalib/util/mmap_file_allocator_factory.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".searchlib.tensor.dense_tensor_attribute");
@@ -220,23 +223,45 @@ DenseTensorAttribute::onLoad(vespalib::Executor *executor)
uint32_t numDocs(tensorReader.getDocIdLimit());
_refVector.reset();
_refVector.unsafe_reserve(numDocs);
+ auto complete_executor = (_index && !use_index_file && executor)
+ ? std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000)
+ : std::unique_ptr<vespalib::ThreadStackExecutor>();
+ std::atomic<uint64_t> pending(0);
for (uint32_t lid = 0; lid < numDocs; ++lid) {
if (tensorReader.is_present()) {
auto raw = _denseTensorStore.allocRawBuffer();
tensorReader.readTensor(raw.data, _denseTensorStore.getBufSize());
_refVector.push_back(raw.ref);
if (_index && !use_index_file) {
- // This ensures that get_vector() (via getTensor()) is able to find the newly added tensor.
- setCommittedDocIdLimit(lid + 1);
- _index->add_document(lid);
- if ((lid % 256) == 0) {
- commit();
+ if (executor != nullptr) {
+ while (pending > 1000) { std::this_thread::yield(); }
+ ++pending;
+ executor->execute(vespalib::makeLambdaTask([this, raw, lid, &pending, &complete_executor]() {
+ auto prepared = _index->prepare_add_document(lid, _denseTensorStore.get_typed_cells(raw.ref), getGenerationHandler().takeGuard());
+ complete_executor->execute(vespalib::makeLambdaTask([this, lid, &pending, result=std::move(prepared)]() mutable {
+ setCommittedDocIdLimit(std::max(getCommittedDocIdLimit(), lid + 1));
+ _index->complete_add_document(lid, std::move(result));
+ --pending;
+ if ((lid % 256) == 0) {
+ commit();
+ };
+ }));
+ }));
+ } else {
+ // This ensures that get_vector() (via getTensor()) is able to find the newly added tensor.
+ setCommittedDocIdLimit(lid + 1);
+ _index->add_document(lid);
+ if ((lid % 256) == 0) {
+ commit();
+ }
}
}
} else {
_refVector.push_back(EntryRef());
}
}
+ while (pending > 0) { std::this_thread::sleep_for(1ms); }
+ commit();
setNumDocs(numDocs);
setCommittedDocIdLimit(numDocs);
if (_index && use_index_file) {
@@ -252,8 +277,7 @@ DenseTensorAttribute::onLoad(vespalib::Executor *executor)
std::unique_ptr<AttributeSaver>
DenseTensorAttribute::onInitSave(vespalib::stringref fileName)
{
- vespalib::GenerationHandler::Guard guard(getGenerationHandler().
- takeGuard());
+ vespalib::GenerationHandler::Guard guard(getGenerationHandler().takeGuard());
auto index_saver = (_index ? _index->make_saver() : std::unique_ptr<NearestNeighborIndexSaver>());
return std::make_unique<DenseTensorAttributeSaver>
(std::move(guard),