diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-11-01 11:34:06 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-11-01 11:34:06 +0100 |
commit | 48c4a42f24048813700c07a70840189b3f14ab89 (patch) | |
tree | 70f3f6c2c65f478eeb73dcf94e501c0af25b94e7 /searchlib | |
parent | 88463ea1b79873a63bf0aa6aa2813e7abfa85659 (diff) |
Consolidate tensor attribute loaders.
Diffstat (limited to 'searchlib')
10 files changed, 400 insertions, 300 deletions
diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt index 1a412b77270..4d98e7d59a8 100644 --- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt @@ -28,6 +28,7 @@ vespa_add_library(searchlib_tensor OBJECT serialized_fast_value_attribute.cpp small_subspaces_buffer_type.cpp tensor_attribute.cpp + tensor_attribute_loader.cpp tensor_attribute_saver.cpp tensor_buffer_operations.cpp tensor_buffer_store.cpp diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 12d5b2864a0..67215a99b71 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -2,111 +2,26 @@ #include "dense_tensor_attribute.h" #include "nearest_neighbor_index.h" -#include "nearest_neighbor_index_loader.h" -#include "tensor_attribute_constants.h" +#include "tensor_attribute_loader.h" #include "tensor_attribute_saver.h" #include <vespa/eval/eval/value.h> -#include <vespa/fastlib/io/bufferedfile.h> #include <vespa/searchcommon/attribute/config.h> -#include <vespa/searchlib/attribute/load_utils.h> -#include <vespa/searchlib/attribute/readerbase.h> #include <vespa/vespalib/data/slime/inserter.h> -#include <vespa/vespalib/util/cpu_usage.h> -#include <vespa/vespalib/util/lambdatask.h> -#include <vespa/vespalib/util/memory_allocator.h> -#include <vespa/vespalib/util/mmap_file_allocator_factory.h> -#include <vespa/vespalib/util/threadstackexecutor.h> -#include <thread> -#include <vespa/log/log.h> -LOG_SETUP(".searchlib.tensor.dense_tensor_attribute"); - -using search::attribute::LoadUtils; -using vespalib::CpuUsage; using vespalib::eval::Value; -using vespalib::eval::ValueType; using vespalib::slime::ObjectInserter; namespace search::tensor { -namespace { - -constexpr uint32_t LOAD_COMMIT_INTERVAL = 256; -const vespalib::string tensorTypeTag("tensortype"); - -class BlobSequenceReader : public ReaderBase -{ -private: - bool _use_index_file; - FileWithHeader _index_file; - -public: - BlobSequenceReader(AttributeVector& attr, bool has_index); - ~BlobSequenceReader(); - bool is_present(); - void readTensor(void *buf, size_t len) { _datFile.file().ReadBuf(buf, len); } - bool use_index_file() const { return _use_index_file; } - FastOS_FileInterface& index_file() { return _index_file.file(); } -}; - -bool -can_use_index_save_file(const search::attribute::Config &config, const search::attribute::AttributeHeader &header) -{ - if (!config.hnsw_index_params().has_value() || !header.get_hnsw_index_params().has_value()) { - return false; - } - const auto &config_params = config.hnsw_index_params().value(); - const auto &header_params = header.get_hnsw_index_params().value(); - if ((config_params.max_links_per_node() != header_params.max_links_per_node()) || - (config_params.distance_metric() != header_params.distance_metric())) { - return false; - } - return true; -} - bool -has_index_file(AttributeVector& attr) -{ - return LoadUtils::file_exists(attr, TensorAttributeSaver::index_file_suffix()); -} - -BlobSequenceReader::BlobSequenceReader(AttributeVector& attr, bool has_index) - : ReaderBase(attr), - _use_index_file(has_index && has_index_file(attr) && - can_use_index_save_file(attr.getConfig(), - search::attribute::AttributeHeader::extractTags(getDatHeader(), attr.getBaseFileName()))), - _index_file(_use_index_file ? - attribute::LoadUtils::openFile(attr, TensorAttributeSaver::index_file_suffix()) : - std::unique_ptr<Fast_BufferedFile>()) -{ -} - -BlobSequenceReader::~BlobSequenceReader() = default; - -bool -BlobSequenceReader::is_present() { - unsigned char detect; - _datFile.file().ReadBuf(&detect, sizeof(detect)); - if (detect == tensorIsNotPresent) { - return false; - } - if (detect != tensorIsPresent) { - LOG_ABORT("should not be reached"); - } - return true; -} - -} - -bool -DenseTensorAttribute::tensor_is_unchanged(DocId docid, const vespalib::eval::Value& new_tensor) const +DenseTensorAttribute::tensor_is_unchanged(DocId docid, const Value& new_tensor) const { auto old_tensor = extract_cells_ref(docid); return _comp.equals(old_tensor, new_tensor.cells()); } void -DenseTensorAttribute::internal_set_tensor(DocId docid, const vespalib::eval::Value& tensor) +DenseTensorAttribute::internal_set_tensor(DocId docid, const Value& tensor) { consider_remove_from_index(docid); EntryRef ref = _denseTensorStore.store_tensor(tensor); @@ -180,7 +95,7 @@ DenseTensorAttribute::clearDoc(DocId docId) } void -DenseTensorAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor) +DenseTensorAttribute::setTensor(DocId docId, const Value& tensor) { checkTensorType(tensor); internal_set_tensor(docId, tensor); @@ -190,7 +105,7 @@ DenseTensorAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor } std::unique_ptr<PrepareResult> -DenseTensorAttribute::prepare_set_tensor(DocId docid, const vespalib::eval::Value& tensor) const +DenseTensorAttribute::prepare_set_tensor(DocId docid, const Value& tensor) const { checkTensorType(tensor); if (_index) { @@ -205,7 +120,7 @@ DenseTensorAttribute::prepare_set_tensor(DocId docid, const vespalib::eval::Valu } void -DenseTensorAttribute::complete_set_tensor(DocId docid, const vespalib::eval::Value& tensor, +DenseTensorAttribute::complete_set_tensor(DocId docid, const Value& tensor, std::unique_ptr<PrepareResult> prepare_result) { if (_index && !prepare_result) { @@ -218,7 +133,7 @@ DenseTensorAttribute::complete_set_tensor(DocId docid, const vespalib::eval::Val } } -std::unique_ptr<vespalib::eval::Value> +std::unique_ptr<Value> DenseTensorAttribute::getTensor(DocId docId) const { EntryRef ref; @@ -237,181 +152,6 @@ DenseTensorAttribute::extract_cells_ref(DocId docId) const } return _denseTensorStore.get_typed_cells(ref); } -namespace { -class Loader { -public: - virtual ~Loader() = default; - virtual void load(uint32_t lid, vespalib::datastore::EntryRef ref) = 0; - virtual void wait_complete() = 0; -}; -} - -/** - * Will load and index documents in parallel. Note that indexing order is not guaranteed, - * but that is inline with the guarantees vespa already has. - */ -class DenseTensorAttribute::ThreadedLoader : public Loader { -public: - ThreadedLoader(DenseTensorAttribute & attr, vespalib::Executor & shared_executor) - : _attr(attr), - _shared_executor(shared_executor), - _queue(MAX_PENDING), - _pending(0) - {} - void load(uint32_t lid, vespalib::datastore::EntryRef ref) override; - void wait_complete() override { - drainUntilPending(0); - } -private: - using Entry = std::pair<uint32_t, std::unique_ptr<PrepareResult>>; - using Queue = vespalib::ArrayQueue<Entry>; - - bool pop(Entry & entry) { - std::unique_lock guard(_mutex); - if (_queue.empty()) return false; - entry = std::move(_queue.front()); - _queue.pop(); - return true; - } - void drainQ() { - Queue queue(MAX_PENDING); - { - std::unique_lock guard(_mutex); - queue.swap(_queue); - } - while (!queue.empty()) { - auto item = std::move(queue.front()); - queue.pop(); - complete(item.first, std::move(item.second)); - } - } - - void complete(uint32_t lid, std::unique_ptr<PrepareResult> prepared) { - _attr.setCommittedDocIdLimit(std::max(_attr.getCommittedDocIdLimit(), lid + 1)); - _attr._index->complete_add_document(lid, std::move(prepared)); - --_pending; - if ((lid % LOAD_COMMIT_INTERVAL) == 0) { - _attr.commit(); - }; - } - void drainUntilPending(uint32_t maxPending) { - while (_pending > maxPending) { - { - std::unique_lock guard(_mutex); - while (_queue.empty()) { - _cond.wait(guard); - } - } - drainQ(); - } - } - static constexpr uint32_t MAX_PENDING = 1000; - DenseTensorAttribute & _attr; - vespalib::Executor & _shared_executor; - std::mutex _mutex; - std::condition_variable _cond; - Queue _queue; - uint64_t _pending; // _pending is only modified in forground thread -}; - -void -DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::EntryRef ref) { - Entry item; - while (pop(item)) { - // First process items that are ready to complete - complete(item.first, std::move(item.second)); - } - // Then ensure that there no mor ethan MAX_PENDING inflight - drainUntilPending(MAX_PENDING); - - // Then we can issue a new one - ++_pending; - auto task = vespalib::makeLambdaTask([this, ref, lid]() { - auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), - _attr.getGenerationHandler().takeGuard()); - std::unique_lock guard(_mutex); - _queue.push(std::make_pair(lid, std::move(prepared))); - if (_queue.size() == 1) { - _cond.notify_all(); - } - }); - _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP)); -} -class DenseTensorAttribute::ForegroundLoader : public Loader { -public: - ForegroundLoader(DenseTensorAttribute & attr) : _attr(attr) {} - void load(uint32_t lid, vespalib::datastore::EntryRef) override { - // This ensures that get_vector() (via getTensor()) is able to find the newly added tensor. - _attr.setCommittedDocIdLimit(lid + 1); - _attr._index->add_document(lid); - if ((lid % LOAD_COMMIT_INTERVAL) == 0) { - _attr.commit(); - } - } - void wait_complete() override { - - } -private: - DenseTensorAttribute & _attr; -}; - -bool -DenseTensorAttribute::onLoad(vespalib::Executor *executor) -{ - BlobSequenceReader reader(*this, _index.get() != nullptr); - if (!reader.hasData()) { - return false; - } - setCreateSerialNum(reader.getCreateSerialNum()); - assert(reader.getVersion() == DENSE_TENSOR_ATTRIBUTE_VERSION); - assert(getConfig().tensorType().to_spec() == - reader.getDatHeader().getTag(tensorTypeTag).asString()); - uint32_t numDocs(reader.getDocIdLimit()); - _refVector.reset(); - _refVector.unsafe_reserve(numDocs); - std::unique_ptr<Loader> loader; - if (_index && !reader.use_index_file()) { - if (executor != nullptr) { - loader = std::make_unique<ThreadedLoader>(*this, *executor); - } else { - loader = std::make_unique<ForegroundLoader>(*this); - } - } - for (uint32_t lid = 0; lid < numDocs; ++lid) { - if (reader.is_present()) { - auto raw = _denseTensorStore.allocRawBuffer(); - reader.readTensor(raw.data, _denseTensorStore.getBufSize()); - _refVector.push_back(AtomicEntryRef(raw.ref)); - if (loader) { - loader->load(lid, raw.ref); - } - } else { - _refVector.push_back(AtomicEntryRef()); - } - } - if (loader) { - loader->wait_complete(); - } - commit(); - setNumDocs(numDocs); - setCommittedDocIdLimit(numDocs); - if (_index && reader.use_index_file()) { - try { - auto index_loader = _index->make_loader(reader.index_file()); - size_t cnt = 0; - while (index_loader->load_next()) { - if ((++cnt % LOAD_COMMIT_INTERVAL) == 0) { - commit(); - } - } - } catch (const std::runtime_error& ex) { - LOG(error, "Exception while loading nearest neighbor index for tensor attribute '%s': %s", - getName().c_str(), ex.what()); - return false; - } - } - return true; -} void DenseTensorAttribute::onCommit() diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h index c75c9288dea..fb9ece45182 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.h @@ -28,8 +28,6 @@ private: vespalib::MemoryUsage update_stat() override; vespalib::MemoryUsage memory_usage() const override; void populate_address_space_usage(AddressSpaceUsage& usage) const override; - class ThreadedLoader; - class ForegroundLoader; public: DenseTensorAttribute(vespalib::stringref baseFileName, const Config& cfg, const NearestNeighborIndexFactory& index_factory = DefaultNearestNeighborIndexFactory()); @@ -42,7 +40,6 @@ public: std::unique_ptr<vespalib::eval::Value> getTensor(DocId docId) const override; vespalib::eval::TypedCells extract_cells_ref(DocId docId) const override; bool supports_extract_cells_ref() const override { return true; } - bool onLoad(vespalib::Executor *executor) override; void onCommit() override; void before_inc_generation(generation_t current_gen) override; void reclaim_memory(generation_t oldest_used_gen) override; diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp index 6e4f2d8ddd9..c1ba8dba382 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.cpp @@ -190,4 +190,10 @@ DenseTensorStore::as_dense() const return this; } +DenseTensorStore* +DenseTensorStore::as_dense() +{ + return this; +} + } diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h index 2e8e280cf11..b23886c3ac3 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_store.h @@ -71,6 +71,7 @@ public: std::unique_ptr<vespalib::eval::Value> get_tensor(EntryRef ref) const override; bool encode_stored_tensor(EntryRef ref, vespalib::nbostream &target) const override; const DenseTensorStore* as_dense() const override; + DenseTensorStore* as_dense() override; vespalib::eval::TypedCells get_typed_cells(EntryRef ref) const { return vespalib::eval::TypedCells(ref.valid() ? getRawBuffer(ref) : &_emptySpace[0], diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp index c3dfe477e00..ddaf0780cab 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp @@ -1,10 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "tensor_attribute.h" -#include "blob_sequence_reader.h" #include "nearest_neighbor_index.h" #include "nearest_neighbor_index_saver.h" #include "tensor_attribute_constants.h" +#include "tensor_attribute_loader.h" #include "tensor_attribute_saver.h" #include <vespa/document/base/exceptions.h> #include <vespa/document/datatype/tensor_data_type.h> @@ -282,36 +282,10 @@ TensorAttribute::getRefCopy() const } bool -TensorAttribute::onLoad(vespalib::Executor*) +TensorAttribute::onLoad(vespalib::Executor* executor) { - BlobSequenceReader tensorReader(*this); - if (!tensorReader.hasData()) { - return false; - } - setCreateSerialNum(tensorReader.getCreateSerialNum()); - assert(tensorReader.getVersion() == getVersion()); - uint32_t numDocs = tensorReader.getDocIdLimit(); - _refVector.reset(); - _refVector.unsafe_reserve(numDocs); - vespalib::Array<char> buffer(1024); - for (uint32_t lid = 0; lid < numDocs; ++lid) { - uint32_t tensorSize = tensorReader.getNextSize(); - if (tensorSize != 0) { - if (tensorSize > buffer.size()) { - buffer.resize(tensorSize + 1024); - } - tensorReader.readBlob(&buffer[0], tensorSize); - vespalib::nbostream source(&buffer[0], tensorSize); - EntryRef ref = _tensorStore.store_encoded_tensor(source); - _refVector.push_back(AtomicEntryRef(ref)); - } else { - EntryRef invalid; - _refVector.push_back(AtomicEntryRef(invalid)); - } - } - setNumDocs(numDocs); - setCommittedDocIdLimit(numDocs); - return true; + TensorAttributeLoader loader(*this, getGenerationHandler(), _refVector, _tensorStore, _index.get()); + return loader.on_load(executor); } std::unique_ptr<AttributeSaver> diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.cpp new file mode 100644 index 00000000000..bce72cc4b16 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.cpp @@ -0,0 +1,329 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "tensor_attribute_loader.h" +#include "blob_sequence_reader.h" +#include "dense_tensor_store.h" +#include "nearest_neighbor_index.h" +#include "nearest_neighbor_index_loader.h" +#include "tensor_attribute_constants.h" +#include "tensor_attribute_saver.h" +#include <vespa/fastlib/io/bufferedfile.h> +#include <vespa/searchcommon/attribute/config.h> +#include <vespa/searchlib/attribute/attribute_header.h> +#include <vespa/searchlib/attribute/load_utils.h> +#include <vespa/searchlib/attribute/readerbase.h> +#include <vespa/vespalib/util/arrayqueue.hpp> +#include <vespa/vespalib/util/cpu_usage.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <mutex> +#include <condition_variable> + +#include <vespa/log/log.h> +LOG_SETUP(".searchlib.tensor.tensor_attribute_loader"); + +using search::attribute::AttributeHeader; +using search::attribute::LoadUtils; +using vespalib::CpuUsage; +using vespalib::datastore::EntryRef; + +namespace search::tensor { + +inline namespace loader { + +constexpr uint32_t LOAD_COMMIT_INTERVAL = 256; +const vespalib::string tensorTypeTag("tensortype"); + +bool +can_use_index_save_file(const search::attribute::Config &config, const AttributeHeader& header) +{ + if (!config.hnsw_index_params().has_value() || !header.get_hnsw_index_params().has_value()) { + return false; + } + const auto &config_params = config.hnsw_index_params().value(); + const auto &header_params = header.get_hnsw_index_params().value(); + if ((config_params.max_links_per_node() != header_params.max_links_per_node()) || + (config_params.distance_metric() != header_params.distance_metric())) { + return false; + } + return true; +} + +bool +has_index_file(AttributeVector& attr) +{ + return LoadUtils::file_exists(attr, TensorAttributeSaver::index_file_suffix()); +} + +bool +is_present(uint8_t presence_flag) { + if (presence_flag == tensorIsNotPresent) { + return false; + } + if (presence_flag != tensorIsPresent) { + LOG_ABORT("should not be reached"); + } + return true; +} + +class IndexBuilder { +public: + virtual ~IndexBuilder() = default; + virtual void add(uint32_t lid, EntryRef ref) = 0; + virtual void wait_complete() = 0; +}; + +/** + * Will build nearest neighbor index in parallel. Note that indexing order is not guaranteed, + * but that is inline with the guarantees vespa already has. + */ +class ThreadedIndexBuilder : public IndexBuilder { +public: + ThreadedIndexBuilder(AttributeVector& attr, vespalib::GenerationHandler& generation_handler, TensorStore& store, NearestNeighborIndex& index, vespalib::Executor& shared_executor) + : _attr(attr), + _generation_handler(generation_handler), + _store(store), + _index(index), + _shared_executor(shared_executor), + _queue(MAX_PENDING), + _pending(0) + {} + void add(uint32_t lid, EntryRef ref) override; + void wait_complete() override { + drainUntilPending(0); + } +private: + using Entry = std::pair<uint32_t, std::unique_ptr<PrepareResult>>; + using Queue = vespalib::ArrayQueue<Entry>; + + bool pop(Entry & entry) { + std::unique_lock guard(_mutex); + if (_queue.empty()) return false; + entry = std::move(_queue.front()); + _queue.pop(); + return true; + } + void drainQ() { + Queue queue(MAX_PENDING); + { + std::unique_lock guard(_mutex); + queue.swap(_queue); + } + while (!queue.empty()) { + auto item = std::move(queue.front()); + queue.pop(); + complete(item.first, std::move(item.second)); + } + } + + void complete(uint32_t lid, std::unique_ptr<PrepareResult> prepared) { + _index.complete_add_document(lid, std::move(prepared)); + --_pending; + if ((lid % LOAD_COMMIT_INTERVAL) == 0) { + _attr.commit(); + }; + } + void drainUntilPending(uint32_t maxPending) { + while (_pending > maxPending) { + { + std::unique_lock guard(_mutex); + while (_queue.empty()) { + _cond.wait(guard); + } + } + drainQ(); + } + } + static constexpr uint32_t MAX_PENDING = 1000; + AttributeVector& _attr; + const vespalib::GenerationHandler& _generation_handler; + TensorStore& _store; + NearestNeighborIndex& _index; + vespalib::Executor& _shared_executor; + std::mutex _mutex; + std::condition_variable _cond; + Queue _queue; + uint64_t _pending; // _pending is only modified in forground thread +}; + +void +ThreadedIndexBuilder::add(uint32_t lid, EntryRef ref) { + Entry item; + while (pop(item)) { + // First process items that are ready to complete + complete(item.first, std::move(item.second)); + } + // Then ensure that there no more than MAX_PENDING inflight + drainUntilPending(MAX_PENDING); + + // Then we can issue a new one + ++_pending; + auto dense_store = _store.as_dense(); + auto task = vespalib::makeLambdaTask([this, ref, lid, dense_store]() { + auto prepared = _index.prepare_add_document(lid, dense_store->get_typed_cells(ref), + _generation_handler.takeGuard()); + std::unique_lock guard(_mutex); + _queue.push(std::make_pair(lid, std::move(prepared))); + if (_queue.size() == 1) { + _cond.notify_all(); + } + }); + _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP)); +} + +class ForegroundIndexBuilder : public IndexBuilder { +public: + ForegroundIndexBuilder(AttributeVector& attr, NearestNeighborIndex& index) + : _attr(attr), + _index(index) + { + } + void add(uint32_t lid, EntryRef) override { + _index.add_document(lid); + if ((lid % LOAD_COMMIT_INTERVAL) == 0) { + _attr.commit(); + } + } + void wait_complete() override { + + } +private: + AttributeVector& _attr; + NearestNeighborIndex& _index; +}; + +} + +TensorAttributeLoader::TensorAttributeLoader(AttributeVector& attr, GenerationHandler& generation_handler, RefVector& ref_vector, TensorStore& store, NearestNeighborIndex* index) + : _attr(attr), + _generation_handler(generation_handler), + _ref_vector(ref_vector), + _store(store), + _index(index), + _use_index_file(false) +{ +} + +TensorAttributeLoader::~TensorAttributeLoader() = default; + +void +TensorAttributeLoader::load_dense_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit, DenseTensorStore& dense_store) +{ + assert(reader.getVersion() == DENSE_TENSOR_ATTRIBUTE_VERSION); + uint8_t presence_flag = 0; + for (uint32_t lid = 0; lid < docid_limit; ++lid) { + reader.readBlob(&presence_flag, sizeof(presence_flag)); + if (is_present(presence_flag)) { + auto raw = dense_store.allocRawBuffer(); + reader.readBlob(raw.data, dense_store.getBufSize()); + _ref_vector.push_back(AtomicEntryRef(raw.ref)); + } else { + _ref_vector.push_back(AtomicEntryRef()); + } + if ((lid % LOAD_COMMIT_INTERVAL) == 0) { + _attr.commit(); + } + } +} + +void +TensorAttributeLoader::load_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit) +{ + assert(reader.getVersion() == TENSOR_ATTRIBUTE_VERSION); + vespalib::Array<char> buffer(1024); + for (uint32_t lid = 0; lid < docid_limit; ++lid) { + uint32_t tensorSize = reader.getNextSize(); + if (tensorSize != 0) { + if (tensorSize > buffer.size()) { + buffer.resize(tensorSize + 1024); + } + reader.readBlob(&buffer[0], tensorSize); + vespalib::nbostream source(&buffer[0], tensorSize); + EntryRef ref = _store.store_encoded_tensor(source); + _ref_vector.push_back(AtomicEntryRef(ref)); + } else { + EntryRef invalid; + _ref_vector.push_back(AtomicEntryRef(invalid)); + } + if ((lid % LOAD_COMMIT_INTERVAL) == 0) { + _attr.commit(); + } + } +} + +void +TensorAttributeLoader::build_index(vespalib::Executor* executor, uint32_t docid_limit) +{ + std::unique_ptr<IndexBuilder> builder; + if (_index && !_use_index_file) { + if (executor != nullptr) { + builder = std::make_unique<ThreadedIndexBuilder>(_attr, _generation_handler, _store, *_index, *executor); + } else { + builder = std::make_unique<ForegroundIndexBuilder>(_attr, *_index); + } + } + if (builder) { + for (uint32_t lid = 0; lid < docid_limit; ++lid) { + auto ref = _ref_vector[lid].load_relaxed(); + if (ref.valid()) { + builder->add(lid, ref); + } + } + builder->wait_complete(); + _attr.commit(); + } +} + +bool +TensorAttributeLoader::load_index() +{ + if (_index && _use_index_file) { + FileWithHeader index_file(LoadUtils::openFile(_attr, TensorAttributeSaver::index_file_suffix())); + try { + auto index_loader = _index->make_loader(index_file.file()); + size_t cnt = 0; + while (index_loader->load_next()) { + if ((++cnt % LOAD_COMMIT_INTERVAL) == 0) { + _attr.commit(); + } + } + _attr.commit(); + } catch (const std::runtime_error& ex) { + LOG(error, "Exception while loading nearest neighbor index for tensor attribute '%s': %s", + _attr.getName().c_str(), ex.what()); + return false; + } + } + return true; +} + +bool +TensorAttributeLoader::on_load(vespalib::Executor* executor) +{ + BlobSequenceReader reader(_attr); + if (!reader.hasData()) { + return false; + } + if (_index != nullptr && has_index_file(_attr)) { + auto header = AttributeHeader::extractTags(reader.getDatHeader(), _attr.getBaseFileName()); + _use_index_file = can_use_index_save_file(_attr.getConfig(), header); + } + _attr.setCreateSerialNum(reader.getCreateSerialNum()); + assert(_attr.getConfig().tensorType().to_spec() == + reader.getDatHeader().getTag(tensorTypeTag).asString()); + uint32_t docid_limit(reader.getDocIdLimit()); + _ref_vector.reset(); + _ref_vector.unsafe_reserve(docid_limit); + auto dense_store = _store.as_dense(); + if (dense_store != nullptr) { + load_dense_tensor_store(reader, docid_limit, *dense_store); + } else { + load_tensor_store(reader, docid_limit); + } + _attr.commit(); + _attr.getStatus().setNumDocs(docid_limit); + _attr.setCommittedDocIdLimit(docid_limit); + build_index(executor, docid_limit); + return load_index(); +} + +} diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.h b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.h new file mode 100644 index 00000000000..125bfd5abba --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute_loader.h @@ -0,0 +1,45 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/datastore/atomic_entry_ref.h> +#include <vespa/vespalib/util/rcuvector.h> + +namespace search { class AttributeVector; } +namespace vespalib { class Executor; } + +namespace search::tensor { + +class BlobSequenceReader; +class DenseTensorStore; +class NearestNeighborIndex; +class TensorStore; + +/** + * Class for loading a tensor attribute. + * Will also load the nearest neighbor index. + */ +class TensorAttributeLoader { + using AtomicEntryRef = vespalib::datastore::AtomicEntryRef; + using GenerationHandler = vespalib::GenerationHandler; + using RefVector = vespalib::RcuVectorBase<AtomicEntryRef>; + AttributeVector& _attr; + GenerationHandler& _generation_handler; + RefVector& _ref_vector; + TensorStore& _store; + NearestNeighborIndex* _index; + bool _use_index_file; + + void load_dense_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit, DenseTensorStore& dense_store); + void load_tensor_store(BlobSequenceReader& reader, uint32_t docid_limit); + void build_index(vespalib::Executor* executor, uint32_t docid_limit); + bool load_index(); + +public: + TensorAttributeLoader(AttributeVector& attr, GenerationHandler& generation_handler, RefVector& ref_vector, TensorStore& store, NearestNeighborIndex* index); + ~TensorAttributeLoader(); + bool on_load(vespalib::Executor* executor); +}; + +} + diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp index 5f07f378465..b88e94fe623 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_store.cpp @@ -17,4 +17,10 @@ TensorStore::as_dense() const return nullptr; } +DenseTensorStore* +TensorStore::as_dense() +{ + return nullptr; +} + } diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_store.h b/searchlib/src/vespa/searchlib/tensor/tensor_store.h index 11ab4158e8f..9a72d1cfccc 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_store.h +++ b/searchlib/src/vespa/searchlib/tensor/tensor_store.h @@ -49,6 +49,7 @@ public: virtual std::unique_ptr<vespalib::eval::Value> get_tensor(EntryRef ref) const = 0; virtual bool encode_stored_tensor(EntryRef ref, vespalib::nbostream& target) const = 0; virtual const DenseTensorStore* as_dense() const; + virtual DenseTensorStore* as_dense(); // Inherit doc from DataStoreBase void reclaim_memory(generation_t oldest_used_gen) { |