summaryrefslogtreecommitdiffstats
path: root/searchlib/src
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-11-01 11:34:06 +0100
committerTor Egge <Tor.Egge@online.no>2022-11-01 11:34:06 +0100
commit48c4a42f24048813700c07a70840189b3f14ab89 (patch)
tree70f3f6c2c65f478eeb73dcf94e501c0af25b94e7 /searchlib/src
parent88463ea1b79873a63bf0aa6aa2813e7abfa85659 (diff)
Consolidate tensor attribute loaders.
Diffstat (limited to 'searchlib/src')
-rw-r--r--searchlib/src/vespa/searchlib/tensor/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp274
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h3
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h1
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp34
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.cpp329
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.h45
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_store.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_store.h1
10 files changed, 400 insertions, 300 deletions
diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
index 1a412b77270..4d98e7d59a8 100644
--- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
@@ -28,6 +28,7 @@ vespa_add_library(searchlib_tensor OBJECT
serialized_fast_value_attribute.cpp
small_subspaces_buffer_type.cpp
tensor_attribute.cpp
+ tensor_attribute_loader.cpp
tensor_attribute_saver.cpp
tensor_buffer_operations.cpp
tensor_buffer_store.cpp
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
index 12d5b2864a0..67215a99b71 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
@@ -2,111 +2,26 @@
#include "dense_tensor_attribute.h"
#include "nearest_neighbor_index.h"
-#include "nearest_neighbor_index_loader.h"
-#include "tensor_attribute_constants.h"
+#include "tensor_attribute_loader.h"
#include "tensor_attribute_saver.h"
#include <vespa/eval/eval/value.h>
-#include <vespa/fastlib/io/bufferedfile.h>
#include <vespa/searchcommon/attribute/config.h>
-#include <vespa/searchlib/attribute/load_utils.h>
-#include <vespa/searchlib/attribute/readerbase.h>
#include <vespa/vespalib/data/slime/inserter.h>
-#include <vespa/vespalib/util/cpu_usage.h>
-#include <vespa/vespalib/util/lambdatask.h>
-#include <vespa/vespalib/util/memory_allocator.h>
-#include <vespa/vespalib/util/mmap_file_allocator_factory.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
-#include <thread>
-#include <vespa/log/log.h>
-LOG_SETUP(".searchlib.tensor.dense_tensor_attribute");
-
-using search::attribute::LoadUtils;
-using vespalib::CpuUsage;
using vespalib::eval::Value;
-using vespalib::eval::ValueType;
using vespalib::slime::ObjectInserter;
namespace search::tensor {
-namespace {
-
-constexpr uint32_t LOAD_COMMIT_INTERVAL = 256;
-const vespalib::string tensorTypeTag("tensortype");
-
-class BlobSequenceReader : public ReaderBase
-{
-private:
- bool _use_index_file;
- FileWithHeader _index_file;
-
-public:
- BlobSequenceReader(AttributeVector& attr, bool has_index);
- ~BlobSequenceReader();
- bool is_present();
- void readTensor(void *buf, size_t len) { _datFile.file().ReadBuf(buf, len); }
- bool use_index_file() const { return _use_index_file; }
- FastOS_FileInterface& index_file() { return _index_file.file(); }
-};
-
-bool
-can_use_index_save_file(const search::attribute::Config &config, const search::attribute::AttributeHeader &header)
-{
- if (!config.hnsw_index_params().has_value() || !header.get_hnsw_index_params().has_value()) {
- return false;
- }
- const auto &config_params = config.hnsw_index_params().value();
- const auto &header_params = header.get_hnsw_index_params().value();
- if ((config_params.max_links_per_node() != header_params.max_links_per_node()) ||
- (config_params.distance_metric() != header_params.distance_metric())) {
- return false;
- }
- return true;
-}
-
bool
-has_index_file(AttributeVector& attr)
-{
- return LoadUtils::file_exists(attr, TensorAttributeSaver::index_file_suffix());
-}
-
-BlobSequenceReader::BlobSequenceReader(AttributeVector& attr, bool has_index)
- : ReaderBase(attr),
- _use_index_file(has_index && has_index_file(attr) &&
- can_use_index_save_file(attr.getConfig(),
- search::attribute::AttributeHeader::extractTags(getDatHeader(), attr.getBaseFileName()))),
- _index_file(_use_index_file ?
- attribute::LoadUtils::openFile(attr, TensorAttributeSaver::index_file_suffix()) :
- std::unique_ptr<Fast_BufferedFile>())
-{
-}
-
-BlobSequenceReader::~BlobSequenceReader() = default;
-
-bool
-BlobSequenceReader::is_present() {
- unsigned char detect;
- _datFile.file().ReadBuf(&detect, sizeof(detect));
- if (detect == tensorIsNotPresent) {
- return false;
- }
- if (detect != tensorIsPresent) {
- LOG_ABORT("should not be reached");
- }
- return true;
-}
-
-}
-
-bool
-DenseTensorAttribute::tensor_is_unchanged(DocId docid, const vespalib::eval::Value& new_tensor) const
+DenseTensorAttribute::tensor_is_unchanged(DocId docid, const Value& new_tensor) const
{
auto old_tensor = extract_cells_ref(docid);
return _comp.equals(old_tensor, new_tensor.cells());
}
void
-DenseTensorAttribute::internal_set_tensor(DocId docid, const vespalib::eval::Value& tensor)
+DenseTensorAttribute::internal_set_tensor(DocId docid, const Value& tensor)
{
consider_remove_from_index(docid);
EntryRef ref = _denseTensorStore.store_tensor(tensor);
@@ -180,7 +95,7 @@ DenseTensorAttribute::clearDoc(DocId docId)
}
void
-DenseTensorAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor)
+DenseTensorAttribute::setTensor(DocId docId, const Value& tensor)
{
checkTensorType(tensor);
internal_set_tensor(docId, tensor);
@@ -190,7 +105,7 @@ DenseTensorAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor
}
std::unique_ptr<PrepareResult>
-DenseTensorAttribute::prepare_set_tensor(DocId docid, const vespalib::eval::Value& tensor) const
+DenseTensorAttribute::prepare_set_tensor(DocId docid, const Value& tensor) const
{
checkTensorType(tensor);
if (_index) {
@@ -205,7 +120,7 @@ DenseTensorAttribute::prepare_set_tensor(DocId docid, const vespalib::eval::Valu
}
void
-DenseTensorAttribute::complete_set_tensor(DocId docid, const vespalib::eval::Value& tensor,
+DenseTensorAttribute::complete_set_tensor(DocId docid, const Value& tensor,
std::unique_ptr<PrepareResult> prepare_result)
{
if (_index && !prepare_result) {
@@ -218,7 +133,7 @@ DenseTensorAttribute::complete_set_tensor(DocId docid, const vespalib::eval::Val
}
}
-std::unique_ptr<vespalib::eval::Value>
+std::unique_ptr<Value>
DenseTensorAttribute::getTensor(DocId docId) const
{
EntryRef ref;
@@ -237,181 +152,6 @@ DenseTensorAttribute::extract_cells_ref(DocId docId) const
}
return _denseTensorStore.get_typed_cells(ref);
}
-namespace {
-class Loader {
-public:
- virtual ~Loader() = default;
- virtual void load(uint32_t lid, vespalib::datastore::EntryRef ref) = 0;
- virtual void wait_complete() = 0;
-};
-}
-
-/**
- * Will load and index documents in parallel. Note that indexing order is not guaranteed,
- * but that is inline with the guarantees vespa already has.
- */
-class DenseTensorAttribute::ThreadedLoader : public Loader {
-public:
- ThreadedLoader(DenseTensorAttribute & attr, vespalib::Executor & shared_executor)
- : _attr(attr),
- _shared_executor(shared_executor),
- _queue(MAX_PENDING),
- _pending(0)
- {}
- void load(uint32_t lid, vespalib::datastore::EntryRef ref) override;
- void wait_complete() override {
- drainUntilPending(0);
- }
-private:
- using Entry = std::pair<uint32_t, std::unique_ptr<PrepareResult>>;
- using Queue = vespalib::ArrayQueue<Entry>;
-
- bool pop(Entry & entry) {
- std::unique_lock guard(_mutex);
- if (_queue.empty()) return false;
- entry = std::move(_queue.front());
- _queue.pop();
- return true;
- }
- void drainQ() {
- Queue queue(MAX_PENDING);
- {
- std::unique_lock guard(_mutex);
- queue.swap(_queue);
- }
- while (!queue.empty()) {
- auto item = std::move(queue.front());
- queue.pop();
- complete(item.first, std::move(item.second));
- }
- }
-
- void complete(uint32_t lid, std::unique_ptr<PrepareResult> prepared) {
- _attr.setCommittedDocIdLimit(std::max(_attr.getCommittedDocIdLimit(), lid + 1));
- _attr._index->complete_add_document(lid, std::move(prepared));
- --_pending;
- if ((lid % LOAD_COMMIT_INTERVAL) == 0) {
- _attr.commit();
- };
- }
- void drainUntilPending(uint32_t maxPending) {
- while (_pending > maxPending) {
- {
- std::unique_lock guard(_mutex);
- while (_queue.empty()) {
- _cond.wait(guard);
- }
- }
- drainQ();
- }
- }
- static constexpr uint32_t MAX_PENDING = 1000;
- DenseTensorAttribute & _attr;
- vespalib::Executor & _shared_executor;
- std::mutex _mutex;
- std::condition_variable _cond;
- Queue _queue;
- uint64_t _pending; // _pending is only modified in forground thread
-};
-
-void
-DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::EntryRef ref) {
- Entry item;
- while (pop(item)) {
- // First process items that are ready to complete
- complete(item.first, std::move(item.second));
- }
- // Then ensure that there no mor ethan MAX_PENDING inflight
- drainUntilPending(MAX_PENDING);
-
- // Then we can issue a new one
- ++_pending;
- auto task = vespalib::makeLambdaTask([this, ref, lid]() {
- auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref),
- _attr.getGenerationHandler().takeGuard());
- std::unique_lock guard(_mutex);
- _queue.push(std::make_pair(lid, std::move(prepared)));
- if (_queue.size() == 1) {
- _cond.notify_all();
- }
- });
- _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP));
-}
-class DenseTensorAttribute::ForegroundLoader : public Loader {
-public:
- ForegroundLoader(DenseTensorAttribute & attr) : _attr(attr) {}
- void load(uint32_t lid, vespalib::datastore::EntryRef) override {
- // This ensures that get_vector() (via getTensor()) is able to find the newly added tensor.
- _attr.setCommittedDocIdLimit(lid + 1);
- _attr._index->add_document(lid);
- if ((lid % LOAD_COMMIT_INTERVAL) == 0) {
- _attr.commit();
- }
- }
- void wait_complete() override {
-
- }
-private:
- DenseTensorAttribute & _attr;
-};
-
-bool
-DenseTensorAttribute::onLoad(vespalib::Executor *executor)
-{
- BlobSequenceReader reader(*this, _index.get() != nullptr);
- if (!reader.hasData()) {
- return false;
- }
- setCreateSerialNum(reader.getCreateSerialNum());
- assert(reader.getVersion() == DENSE_TENSOR_ATTRIBUTE_VERSION);
- assert(getConfig().tensorType().to_spec() ==
- reader.getDatHeader().getTag(tensorTypeTag).asString());
- uint32_t numDocs(reader.getDocIdLimit());
- _refVector.reset();
- _refVector.unsafe_reserve(numDocs);
- std::unique_ptr<Loader> loader;
- if (_index && !reader.use_index_file()) {
- if (executor != nullptr) {
- loader = std::make_unique<ThreadedLoader>(*this, *executor);
- } else {
- loader = std::make_unique<ForegroundLoader>(*this);
- }
- }
- for (uint32_t lid = 0; lid < numDocs; ++lid) {
- if (reader.is_present()) {
- auto raw = _denseTensorStore.allocRawBuffer();
- reader.readTensor(raw.data, _denseTensorStore.getBufSize());
- _refVector.push_back(AtomicEntryRef(raw.ref));
- if (loader) {
- loader->load(lid, raw.ref);
- }
- } else {
- _refVector.push_back(AtomicEntryRef());
- }
- }
- if (loader) {
- loader->wait_complete();
- }
- commit();
- setNumDocs(numDocs);
- setCommittedDocIdLimit(numDocs);
- if (_index && reader.use_index_file()) {
- try {
- auto index_loader = _index->make_loader(reader.index_file());
- size_t cnt = 0;
- while (index_loader->load_next()) {
- if ((++cnt % LOAD_COMMIT_INTERVAL) == 0) {
- commit();
- }
- }
- } catch (const std::runtime_error& ex) {
- LOG(error, "Exception while loading nearest neighbor index for tensor attribute '%s': %s",
- getName().c_str(), ex.what());
- return false;
- }
- }
- return true;
-}
void
DenseTensorAttribute::onCommit()
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h
index c75c9288dea..fb9ece45182 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h
@@ -28,8 +28,6 @@ private:
vespalib::MemoryUsage update_stat() override;
vespalib::MemoryUsage memory_usage() const override;
void populate_address_space_usage(AddressSpaceUsage& usage) const override;
- class ThreadedLoader;
- class ForegroundLoader;
public:
DenseTensorAttribute(vespalib::stringref baseFileName, const Config& cfg,
const NearestNeighborIndexFactory& index_factory = DefaultNearestNeighborIndexFactory());
@@ -42,7 +40,6 @@ public:
std::unique_ptr<vespalib::eval::Value> getTensor(DocId docId) const override;
vespalib::eval::TypedCells extract_cells_ref(DocId docId) const override;
bool supports_extract_cells_ref() const override { return true; }
- bool onLoad(vespalib::Executor *executor) override;
void onCommit() override;
void before_inc_generation(generation_t current_gen) override;
void reclaim_memory(generation_t oldest_used_gen) override;
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp
index 6e4f2d8ddd9..c1ba8dba382 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp
@@ -190,4 +190,10 @@ DenseTensorStore::as_dense() const
return this;
}
+DenseTensorStore*
+DenseTensorStore::as_dense()
+{
+ return this;
+}
+
}
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h
index 2e8e280cf11..b23886c3ac3 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h
@@ -71,6 +71,7 @@ public:
std::unique_ptr<vespalib::eval::Value> get_tensor(EntryRef ref) const override;
bool encode_stored_tensor(EntryRef ref, vespalib::nbostream &target) const override;
const DenseTensorStore* as_dense() const override;
+ DenseTensorStore* as_dense() override;
vespalib::eval::TypedCells get_typed_cells(EntryRef ref) const {
return vespalib::eval::TypedCells(ref.valid() ? getRawBuffer(ref) : &_emptySpace[0],
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp
index c3dfe477e00..ddaf0780cab 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp
@@ -1,10 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "tensor_attribute.h"
-#include "blob_sequence_reader.h"
#include "nearest_neighbor_index.h"
#include "nearest_neighbor_index_saver.h"
#include "tensor_attribute_constants.h"
+#include "tensor_attribute_loader.h"
#include "tensor_attribute_saver.h"
#include <vespa/document/base/exceptions.h>
#include <vespa/document/datatype/tensor_data_type.h>
@@ -282,36 +282,10 @@ TensorAttribute::getRefCopy() const
}
bool
-TensorAttribute::onLoad(vespalib::Executor*)
+TensorAttribute::onLoad(vespalib::Executor* executor)
{
- BlobSequenceReader tensorReader(*this);
- if (!tensorReader.hasData()) {
- return false;
- }
- setCreateSerialNum(tensorReader.getCreateSerialNum());
- assert(tensorReader.getVersion() == getVersion());
- uint32_t numDocs = tensorReader.getDocIdLimit();
- _refVector.reset();
- _refVector.unsafe_reserve(numDocs);
- vespalib::Array<char> buffer(1024);
- for (uint32_t lid = 0; lid < numDocs; ++lid) {
- uint32_t tensorSize = tensorReader.getNextSize();
- if (tensorSize != 0) {
- if (tensorSize > buffer.size()) {
- buffer.resize(tensorSize + 1024);
- }
- tensorReader.readBlob(&buffer[0], tensorSize);
- vespalib::nbostream source(&buffer[0], tensorSize);
- EntryRef ref = _tensorStore.store_encoded_tensor(source);
- _refVector.push_back(AtomicEntryRef(ref));
- } else {
- EntryRef invalid;
- _refVector.push_back(AtomicEntryRef(invalid));
- }
- }
- setNumDocs(numDocs);
- setCommittedDocIdLimit(numDocs);
- return true;
+ TensorAttributeLoader loader(*this, getGenerationHandler(), _refVector, _tensorStore, _index.get());
+ return loader.on_load(executor);
}
std::unique_ptr<AttributeSaver>
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.cpp
new file mode 100644
index 00000000000..bce72cc4b16
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.cpp
@@ -0,0 +1,329 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "tensor_attribute_loader.h"
+#include "blob_sequence_reader.h"
+#include "dense_tensor_store.h"
+#include "nearest_neighbor_index.h"
+#include "nearest_neighbor_index_loader.h"
+#include "tensor_attribute_constants.h"
+#include "tensor_attribute_saver.h"
+#include <vespa/fastlib/io/bufferedfile.h>
+#include <vespa/searchcommon/attribute/config.h>
+#include <vespa/searchlib/attribute/attribute_header.h>
+#include <vespa/searchlib/attribute/load_utils.h>
+#include <vespa/searchlib/attribute/readerbase.h>
+#include <vespa/vespalib/util/arrayqueue.hpp>
+#include <vespa/vespalib/util/cpu_usage.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <mutex>
+#include <condition_variable>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchlib.tensor.tensor_attribute_loader");
+
+using search::attribute::AttributeHeader;
+using search::attribute::LoadUtils;
+using vespalib::CpuUsage;
+using vespalib::datastore::EntryRef;
+
+namespace search::tensor {
+
+inline namespace loader {
+
+constexpr uint32_t LOAD_COMMIT_INTERVAL = 256;
+const vespalib::string tensorTypeTag("tensortype");
+
+bool
+can_use_index_save_file(const search::attribute::Config &config, const AttributeHeader& header)
+{
+ if (!config.hnsw_index_params().has_value() || !header.get_hnsw_index_params().has_value()) {
+ return false;
+ }
+ const auto &config_params = config.hnsw_index_params().value();
+ const auto &header_params = header.get_hnsw_index_params().value();
+ if ((config_params.max_links_per_node() != header_params.max_links_per_node()) ||
+ (config_params.distance_metric() != header_params.distance_metric())) {
+ return false;
+ }
+ return true;
+}
+
+bool
+has_index_file(AttributeVector& attr)
+{
+ return LoadUtils::file_exists(attr, TensorAttributeSaver::index_file_suffix());
+}
+
+bool
+is_present(uint8_t presence_flag) {
+ if (presence_flag == tensorIsNotPresent) {
+ return false;
+ }
+ if (presence_flag != tensorIsPresent) {
+ LOG_ABORT("should not be reached");
+ }
+ return true;
+}
+
+class IndexBuilder {
+public:
+ virtual ~IndexBuilder() = default;
+ virtual void add(uint32_t lid, EntryRef ref) = 0;
+ virtual void wait_complete() = 0;
+};
+
+/**
+ * Will build nearest neighbor index in parallel. Note that indexing order is not guaranteed,
+ * but that is inline with the guarantees vespa already has.
+ */
+class ThreadedIndexBuilder : public IndexBuilder {
+public:
+ ThreadedIndexBuilder(AttributeVector& attr, vespalib::GenerationHandler& generation_handler, TensorStore& store, NearestNeighborIndex& index, vespalib::Executor& shared_executor)
+ : _attr(attr),
+ _generation_handler(generation_handler),
+ _store(store),
+ _index(index),
+ _shared_executor(shared_executor),
+ _queue(MAX_PENDING),
+ _pending(0)
+ {}
+ void add(uint32_t lid, EntryRef ref) override;
+ void wait_complete() override {
+ drainUntilPending(0);
+ }
+private:
+ using Entry = std::pair<uint32_t, std::unique_ptr<PrepareResult>>;
+ using Queue = vespalib::ArrayQueue<Entry>;
+
+ bool pop(Entry & entry) {
+ std::unique_lock guard(_mutex);
+ if (_queue.empty()) return false;
+ entry = std::move(_queue.front());
+ _queue.pop();
+ return true;
+ }
+ void drainQ() {
+ Queue queue(MAX_PENDING);
+ {
+ std::unique_lock guard(_mutex);
+ queue.swap(_queue);
+ }
+ while (!queue.empty()) {
+ auto item = std::move(queue.front());
+ queue.pop();
+ complete(item.first, std::move(item.second));
+ }
+ }
+
+ void complete(uint32_t lid, std::unique_ptr<PrepareResult> prepared) {
+ _index.complete_add_document(lid, std::move(prepared));
+ --_pending;
+ if ((lid % LOAD_COMMIT_INTERVAL) == 0) {
+ _attr.commit();
+ };
+ }
+ void drainUntilPending(uint32_t maxPending) {
+ while (_pending > maxPending) {
+ {
+ std::unique_lock guard(_mutex);
+ while (_queue.empty()) {
+ _cond.wait(guard);
+ }
+ }
+ drainQ();
+ }
+ }
+ static constexpr uint32_t MAX_PENDING = 1000;
+ AttributeVector& _attr;
+ const vespalib::GenerationHandler& _generation_handler;
+ TensorStore& _store;
+ NearestNeighborIndex& _index;
+ vespalib::Executor& _shared_executor;
+ std::mutex _mutex;
+ std::condition_variable _cond;
+ Queue _queue;
+ uint64_t _pending; // _pending is only modified in forground thread
+};
+
+void
+ThreadedIndexBuilder::add(uint32_t lid, EntryRef ref) {
+ Entry item;
+ while (pop(item)) {
+ // First process items that are ready to complete
+ complete(item.first, std::move(item.second));
+ }
+ // Then ensure that there no more than MAX_PENDING inflight
+ drainUntilPending(MAX_PENDING);
+
+ // Then we can issue a new one
+ ++_pending;
+ auto dense_store = _store.as_dense();
+ auto task = vespalib::makeLambdaTask([this, ref, lid, dense_store]() {
+ auto prepared = _index.prepare_add_document(lid, dense_store->get_typed_cells(ref),
+ _generation_handler.takeGuard());
+ std::unique_lock guard(_mutex);
+ _queue.push(std::make_pair(lid, std::move(prepared)));
+ if (_queue.size() == 1) {
+ _cond.notify_all();
+ }
+ });
+ _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP));
+}
+
+class ForegroundIndexBuilder : public IndexBuilder {
+public:
+ ForegroundIndexBuilder(AttributeVector& attr, NearestNeighborIndex& index)
+ : _attr(attr),
+ _index(index)
+ {
+ }
+ void add(uint32_t lid, EntryRef) override {
+ _index.add_document(lid);
+ if ((lid % LOAD_COMMIT_INTERVAL) == 0) {
+ _attr.commit();
+ }
+ }
+ void wait_complete() override {
+
+ }
+private:
+ AttributeVector& _attr;
+ NearestNeighborIndex& _index;
+};
+
+}
+
+TensorAttributeLoader::TensorAttributeLoader(AttributeVector& attr, GenerationHandler& generation_handler, RefVector& ref_vector, TensorStore& store, NearestNeighborIndex* index)
+ : _attr(attr),
+ _generation_handler(generation_handler),
+ _ref_vector(ref_vector),
+ _store(store),
+ _index(index),
+ _use_index_file(false)
+{
+}
+
+TensorAttributeLoader::~TensorAttributeLoader() = default;
+
+void
+TensorAttributeLoader::load_dense_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit, DenseTensorStore& dense_store)
+{
+ assert(reader.getVersion() == DENSE_TENSOR_ATTRIBUTE_VERSION);
+ uint8_t presence_flag = 0;
+ for (uint32_t lid = 0; lid < docid_limit; ++lid) {
+ reader.readBlob(&presence_flag, sizeof(presence_flag));
+ if (is_present(presence_flag)) {
+ auto raw = dense_store.allocRawBuffer();
+ reader.readBlob(raw.data, dense_store.getBufSize());
+ _ref_vector.push_back(AtomicEntryRef(raw.ref));
+ } else {
+ _ref_vector.push_back(AtomicEntryRef());
+ }
+ if ((lid % LOAD_COMMIT_INTERVAL) == 0) {
+ _attr.commit();
+ }
+ }
+}
+
+void
+TensorAttributeLoader::load_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit)
+{
+ assert(reader.getVersion() == TENSOR_ATTRIBUTE_VERSION);
+ vespalib::Array<char> buffer(1024);
+ for (uint32_t lid = 0; lid < docid_limit; ++lid) {
+ uint32_t tensorSize = reader.getNextSize();
+ if (tensorSize != 0) {
+ if (tensorSize > buffer.size()) {
+ buffer.resize(tensorSize + 1024);
+ }
+ reader.readBlob(&buffer[0], tensorSize);
+ vespalib::nbostream source(&buffer[0], tensorSize);
+ EntryRef ref = _store.store_encoded_tensor(source);
+ _ref_vector.push_back(AtomicEntryRef(ref));
+ } else {
+ EntryRef invalid;
+ _ref_vector.push_back(AtomicEntryRef(invalid));
+ }
+ if ((lid % LOAD_COMMIT_INTERVAL) == 0) {
+ _attr.commit();
+ }
+ }
+}
+
+void
+TensorAttributeLoader::build_index(vespalib::Executor* executor, uint32_t docid_limit)
+{
+ std::unique_ptr<IndexBuilder> builder;
+ if (_index && !_use_index_file) {
+ if (executor != nullptr) {
+ builder = std::make_unique<ThreadedIndexBuilder>(_attr, _generation_handler, _store, *_index, *executor);
+ } else {
+ builder = std::make_unique<ForegroundIndexBuilder>(_attr, *_index);
+ }
+ }
+ if (builder) {
+ for (uint32_t lid = 0; lid < docid_limit; ++lid) {
+ auto ref = _ref_vector[lid].load_relaxed();
+ if (ref.valid()) {
+ builder->add(lid, ref);
+ }
+ }
+ builder->wait_complete();
+ _attr.commit();
+ }
+}
+
+bool
+TensorAttributeLoader::load_index()
+{
+ if (_index && _use_index_file) {
+ FileWithHeader index_file(LoadUtils::openFile(_attr, TensorAttributeSaver::index_file_suffix()));
+ try {
+ auto index_loader = _index->make_loader(index_file.file());
+ size_t cnt = 0;
+ while (index_loader->load_next()) {
+ if ((++cnt % LOAD_COMMIT_INTERVAL) == 0) {
+ _attr.commit();
+ }
+ }
+ _attr.commit();
+ } catch (const std::runtime_error& ex) {
+ LOG(error, "Exception while loading nearest neighbor index for tensor attribute '%s': %s",
+ _attr.getName().c_str(), ex.what());
+ return false;
+ }
+ }
+ return true;
+}
+
+bool
+TensorAttributeLoader::on_load(vespalib::Executor* executor)
+{
+ BlobSequenceReader reader(_attr);
+ if (!reader.hasData()) {
+ return false;
+ }
+ if (_index != nullptr && has_index_file(_attr)) {
+ auto header = AttributeHeader::extractTags(reader.getDatHeader(), _attr.getBaseFileName());
+ _use_index_file = can_use_index_save_file(_attr.getConfig(), header);
+ }
+ _attr.setCreateSerialNum(reader.getCreateSerialNum());
+ assert(_attr.getConfig().tensorType().to_spec() ==
+ reader.getDatHeader().getTag(tensorTypeTag).asString());
+ uint32_t docid_limit(reader.getDocIdLimit());
+ _ref_vector.reset();
+ _ref_vector.unsafe_reserve(docid_limit);
+ auto dense_store = _store.as_dense();
+ if (dense_store != nullptr) {
+ load_dense_tensor_store(reader, docid_limit, *dense_store);
+ } else {
+ load_tensor_store(reader, docid_limit);
+ }
+ _attr.commit();
+ _attr.getStatus().setNumDocs(docid_limit);
+ _attr.setCommittedDocIdLimit(docid_limit);
+ build_index(executor, docid_limit);
+ return load_index();
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.h b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.h
new file mode 100644
index 00000000000..125bfd5abba
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.h
@@ -0,0 +1,45 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/datastore/atomic_entry_ref.h>
+#include <vespa/vespalib/util/rcuvector.h>
+
+namespace search { class AttributeVector; }
+namespace vespalib { class Executor; }
+
+namespace search::tensor {
+
+class BlobSequenceReader;
+class DenseTensorStore;
+class NearestNeighborIndex;
+class TensorStore;
+
+/**
+ * Class for loading a tensor attribute.
+ * Will also load the nearest neighbor index.
+ */
+class TensorAttributeLoader {
+ using AtomicEntryRef = vespalib::datastore::AtomicEntryRef;
+ using GenerationHandler = vespalib::GenerationHandler;
+ using RefVector = vespalib::RcuVectorBase<AtomicEntryRef>;
+ AttributeVector& _attr;
+ GenerationHandler& _generation_handler;
+ RefVector& _ref_vector;
+ TensorStore& _store;
+ NearestNeighborIndex* _index;
+ bool _use_index_file;
+
+ void load_dense_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit, DenseTensorStore& dense_store);
+ void load_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit);
+ void build_index(vespalib::Executor* executor, uint32_t docid_limit);
+ bool load_index();
+
+public:
+ TensorAttributeLoader(AttributeVector& attr, GenerationHandler& generation_handler, RefVector& ref_vector, TensorStore& store, NearestNeighborIndex* index);
+ ~TensorAttributeLoader();
+ bool on_load(vespalib::Executor* executor);
+};
+
+}
+
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp
index 5f07f378465..b88e94fe623 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp
@@ -17,4 +17,10 @@ TensorStore::as_dense() const
return nullptr;
}
+DenseTensorStore*
+TensorStore::as_dense()
+{
+ return nullptr;
+}
+
}
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_store.h b/searchlib/src/vespa/searchlib/tensor/tensor_store.h
index 11ab4158e8f..9a72d1cfccc 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_store.h
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_store.h
@@ -49,6 +49,7 @@ public:
virtual std::unique_ptr<vespalib::eval::Value> get_tensor(EntryRef ref) const = 0;
virtual bool encode_stored_tensor(EntryRef ref, vespalib::nbostream& target) const = 0;
virtual const DenseTensorStore* as_dense() const;
+ virtual DenseTensorStore* as_dense();
// Inherit doc from DataStoreBase
void reclaim_memory(generation_t oldest_used_gen) {