diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2020-12-01 15:02:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-01 15:02:51 +0100 |
commit | 8d9680d2101c76229190598547362ab2760d3d2f (patch) | |
tree | 45d4dd185eda20e678cc6ab78b04fc557f929d43 /searchlib | |
parent | 80ed20529925faa4e2bcece3f22ea7b8f2b49f7f (diff) | |
parent | 4ac10dad24980734a161533c36b232cfc5d3a2f9 (diff) |
Merge pull request #15484 from vespa-engine/arnej/add-serialized_fast_value_attribute
Arnej/add serialized fast value attribute
Diffstat (limited to 'searchlib')
11 files changed, 700 insertions, 8 deletions
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp index daa85c91b2c..bf46a2cc7d0 100644 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp +++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp @@ -16,7 +16,7 @@ #include <vespa/searchlib/tensor/nearest_neighbor_index.h> #include <vespa/searchlib/tensor/nearest_neighbor_index_factory.h> #include <vespa/searchlib/tensor/nearest_neighbor_index_saver.h> -#include <vespa/searchlib/tensor/serialized_tensor_attribute.h> +#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h> #include <vespa/searchlib/tensor/tensor_attribute.h> #include <vespa/searchlib/test/directory_handler.h> #include <vespa/searchlib/util/fileutil.h> @@ -40,7 +40,7 @@ using search::tensor::DefaultNearestNeighborIndexFactory; using search::tensor::DenseTensorAttribute; using search::tensor::DirectTensorAttribute; using search::tensor::DocVectorAccess; -using search::tensor::SerializedTensorAttribute; +using search::tensor::SerializedFastValueAttribute; using search::tensor::HnswIndex; using search::tensor::HnswNode; using search::tensor::NearestNeighborIndex; @@ -344,7 +344,7 @@ struct Fixture { } else if (_traits.use_direct_tensor_attribute) { return std::make_shared<DirectTensorAttribute>(_name, _cfg); } else { - return std::make_shared<SerializedTensorAttribute>(_name, _cfg); + return std::make_shared<SerializedFastValueAttribute>(_name, _cfg); } } diff --git a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp index 148d18f79ff..29033944d4b 100644 --- a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp +++ b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp @@ -7,8 +7,10 @@ #include "singlenumericattribute.hpp" #include "singlestringattribute.h" #include "singleboolattribute.h" +#include <vespa/eval/eval/engine_or_factory.h> #include <vespa/searchlib/tensor/dense_tensor_attribute.h> #include <vespa/searchlib/tensor/serialized_tensor_attribute.h> +#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h> namespace search { @@ -45,6 +47,8 @@ AttributeFactory::createSingleStd(stringref name, const Config & info) case BasicType::TENSOR: if (info.tensorType().is_dense()) { return std::make_shared<tensor::DenseTensorAttribute>(name, info); + } else if (vespalib::eval::EngineOrFactory::get().is_factory()) { + return std::make_shared<tensor::SerializedFastValueAttribute>(name, info); } else { return std::make_shared<tensor::SerializedTensorAttribute>(name, info); } diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt index fac6d015a5f..79b18a57a34 100644 --- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt @@ -25,5 +25,8 @@ vespa_add_library(searchlib_tensor OBJECT tensor_attribute.cpp tensor_deserialize.cpp tensor_store.cpp + serialized_fast_value_attribute.cpp + streamed_value_saver.cpp + streamed_value_store.cpp DEPENDS ) diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp new file mode 100644 index 00000000000..6e1fb1a0a2f --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp @@ -0,0 +1,234 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "serialized_fast_value_attribute.h" +#include "streamed_value_saver.h" +#include <vespa/eval/eval/value.h> +#include <vespa/eval/eval/fast_value.hpp> +#include <vespa/eval/streamed/streamed_value_utils.h> +#include <vespa/fastlib/io/bufferedfile.h> +#include <vespa/searchlib/attribute/readerbase.h> +#include <vespa/searchlib/util/fileutil.h> +#include <vespa/vespalib/util/rcuvector.hpp> +#include <vespa/log/log.h> + +LOG_SETUP(".searchlib.tensor.serialized_fast_value_attribute"); + +#include "blob_sequence_reader.h" +#include "tensor_attribute.hpp" + +using namespace vespalib; +using namespace vespalib::eval; + +namespace search::tensor { + +namespace { + +struct ValueBlock : LabelBlock { + TypedCells cells; +}; + +class ValueBlockStream { +private: + const StreamedValueStore::DataFromType &_from_type; + LabelBlockStream _label_block_stream; + const char *_cells_ptr; + + size_t dsss() const { return _from_type.dense_subspace_size; } + auto cell_type() const { return _from_type.cell_type; } +public: + ValueBlock next_block() { + auto labels = _label_block_stream.next_block(); + if (labels) { + TypedCells subspace_cells(_cells_ptr, cell_type(), dsss()); + _cells_ptr += CellTypeUtils::mem_size(cell_type(), dsss()); + return ValueBlock{labels, subspace_cells}; + } else { + TypedCells none(nullptr, cell_type(), 0); + return ValueBlock{labels, none}; + } + } + + ValueBlockStream(const StreamedValueStore::DataFromType &from_type, + const StreamedValueStore::StreamedValueData &from_store) + : _from_type(from_type), + _label_block_stream(from_store.num_subspaces, + from_store.labels_buffer, + from_type.num_mapped_dimensions), + _cells_ptr((const char *)from_store.cells_ref.data) + { + _label_block_stream.reset(); + } + + ~ValueBlockStream(); +}; + +ValueBlockStream::~ValueBlockStream() = default; + +void report_problematic_subspace(size_t idx, + const StreamedValueStore::DataFromType &from_type, + const StreamedValueStore::StreamedValueData &from_store) +{ + LOG(error, "PROBLEM: add_mapping returned same index=%zu twice", idx); + FastValueIndex temp_index(from_type.num_mapped_dimensions, + from_store.num_subspaces); + auto from_start = ValueBlockStream(from_type, from_store); + while (auto redo_block = from_start.next_block()) { + if (idx == temp_index.map.add_mapping(redo_block.address)) { + vespalib::string msg = "Block with address[ "; + for (vespalib::stringref ref : redo_block.address) { + msg.append("'").append(ref).append("' "); + } + msg.append("]"); + LOG(error, "%s maps to subspace %zu", msg.c_str(), idx); + } + } +} + +/** + * This Value implementation is almost exactly like FastValue, but + * instead of owning its type and cells it just has a reference to + * data stored elsewhere. + * XXX: we should find a better name for this, and move it + * (together with the helper classes above) to its own file, + * and add associated unit tests. + **/ +class OnlyFastValueIndex : public Value { +private: + const ValueType &_type; + TypedCells _cells; + FastValueIndex my_index; +public: + OnlyFastValueIndex(const ValueType &type, + const StreamedValueStore::DataFromType &from_type, + const StreamedValueStore::StreamedValueData &from_store) + : _type(type), + _cells(from_store.cells_ref), + my_index(from_type.num_mapped_dimensions, + from_store.num_subspaces) + { + assert(_type.cell_type() == _cells.type); + std::vector<vespalib::stringref> address(from_type.num_mapped_dimensions); + auto block_stream = ValueBlockStream(from_type, from_store); + size_t ss = 0; + while (auto block = block_stream.next_block()) { + size_t idx = my_index.map.add_mapping(block.address); + if (idx != ss) { + report_problematic_subspace(idx, from_type, from_store); + } + ++ss; + } + assert(ss == from_store.num_subspaces); + } + + + ~OnlyFastValueIndex(); + + const ValueType &type() const final override { return _type; } + TypedCells cells() const final override { return _cells; } + const Index &index() const final override { return my_index; } + vespalib::MemoryUsage get_memory_usage() const final override { + auto usage = self_memory_usage<OnlyFastValueIndex>(); + usage.merge(my_index.map.estimate_extra_memory_usage()); + return usage; + } +}; + +OnlyFastValueIndex::~OnlyFastValueIndex() = default; + +} + +SerializedFastValueAttribute::SerializedFastValueAttribute(stringref name, const Config &cfg) + : TensorAttribute(name, cfg, _streamedValueStore), + _tensor_type(cfg.tensorType()), + _streamedValueStore(_tensor_type), + _data_from_type(_tensor_type) +{ +} + + +SerializedFastValueAttribute::~SerializedFastValueAttribute() +{ + getGenerationHolder().clearHoldLists(); + _tensorStore.clearHoldLists(); +} + +void +SerializedFastValueAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor) +{ + checkTensorType(tensor); + EntryRef ref = _streamedValueStore.store_tensor(tensor); + assert(ref.valid()); + setTensorRef(docId, ref); +} + +std::unique_ptr<Value> +SerializedFastValueAttribute::getTensor(DocId docId) const +{ + EntryRef ref; + if (docId < getCommittedDocIdLimit()) { + ref = _refVector[docId]; + } + if (!ref.valid()) { + return {}; + } + if (auto data_from_store = _streamedValueStore.get_tensor_data(ref)) { + return std::make_unique<OnlyFastValueIndex>(_tensor_type, + _data_from_type, + data_from_store); + } + return {}; +} + +bool +SerializedFastValueAttribute::onLoad() +{ + BlobSequenceReader tensorReader(*this); + if (!tensorReader.hasData()) { + return false; + } + setCreateSerialNum(tensorReader.getCreateSerialNum()); + assert(tensorReader.getVersion() == getVersion()); + uint32_t numDocs(tensorReader.getDocIdLimit()); + _refVector.reset(); + _refVector.unsafe_reserve(numDocs); + vespalib::Array<char> buffer(1024); + for (uint32_t lid = 0; lid < numDocs; ++lid) { + uint32_t tensorSize = tensorReader.getNextSize(); + if (tensorSize != 0) { + if (tensorSize > buffer.size()) { + buffer.resize(tensorSize + 1024); + } + tensorReader.readBlob(&buffer[0], tensorSize); + vespalib::nbostream source(&buffer[0], tensorSize); + EntryRef ref = _streamedValueStore.store_encoded_tensor(source); + _refVector.push_back(ref); + } else { + EntryRef invalid; + _refVector.push_back(invalid); + } + } + setNumDocs(numDocs); + setCommittedDocIdLimit(numDocs); + return true; +} + + +std::unique_ptr<AttributeSaver> +SerializedFastValueAttribute::onInitSave(vespalib::stringref fileName) +{ + vespalib::GenerationHandler::Guard guard(getGenerationHandler(). + takeGuard()); + return std::make_unique<StreamedValueSaver> + (std::move(guard), + this->createAttributeHeader(fileName), + getRefCopy(), + _streamedValueStore); +} + +void +SerializedFastValueAttribute::compactWorst() +{ + doCompactWorst<StreamedValueStore::RefType>(); +} + +} diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h new file mode 100644 index 00000000000..a8c1df4913a --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "tensor_attribute.h" +#include "streamed_value_store.h" + +namespace search::tensor { + +/** + * Attribute vector class storing serialized tensors for all documents in memory. + * + * When fetching a tensor with getTensor(docId) the returned Value + * will have a FastValueIndex (constructed on the fly) for its sparse + * mapping, but refer to a common type, while cells() will refer to + * memory in the serialized store without copying. + * + */ +class SerializedFastValueAttribute : public TensorAttribute { + vespalib::eval::ValueType _tensor_type; + StreamedValueStore _streamedValueStore; // data store for serialized tensors + const StreamedValueStore::DataFromType _data_from_type; +public: + SerializedFastValueAttribute(vespalib::stringref baseFileName, const Config &cfg); + virtual ~SerializedFastValueAttribute(); + virtual void setTensor(DocId docId, const vespalib::eval::Value &tensor) override; + virtual std::unique_ptr<vespalib::eval::Value> getTensor(DocId docId) const override; + virtual bool onLoad() override; + virtual std::unique_ptr<AttributeSaver> onInitSave(vespalib::stringref fileName) override; + virtual void compactWorst() override; +}; + +} diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp new file mode 100644 index 00000000000..d4fd681f2cb --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "streamed_value_saver.h" +#include "streamed_value_store.h" + +#include <vespa/searchlib/attribute/iattributesavetarget.h> +#include <vespa/searchlib/util/bufferwriter.h> +#include <vespa/vespalib/objects/nbostream.h> + +using vespalib::GenerationHandler; + +namespace search::tensor { + +StreamedValueSaver:: +StreamedValueSaver(GenerationHandler::Guard &&guard, + const attribute::AttributeHeader &header, + RefCopyVector &&refs, + const StreamedValueStore &tensorStore) + : AttributeSaver(std::move(guard), header), + _refs(std::move(refs)), + _tensorStore(tensorStore) +{ +} + +StreamedValueSaver::~StreamedValueSaver() = default; + +bool +StreamedValueSaver::onSave(IAttributeSaveTarget &saveTarget) +{ + auto datWriter = saveTarget.datWriter().allocBufferWriter(); + const uint32_t docIdLimit(_refs.size()); + vespalib::nbostream stream; + for (uint32_t lid = 0; lid < docIdLimit; ++lid) { + if (_tensorStore.encode_tensor(_refs[lid], stream)) { + uint32_t sz = stream.size(); + datWriter->write(&sz, sizeof(sz)); + datWriter->write(stream.peek(), stream.size()); + stream.clear(); + } else { + uint32_t sz = 0; + datWriter->write(&sz, sizeof(sz)); + } + } + datWriter->flush(); + return true; +} + +} // namespace search::tensor diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h new file mode 100644 index 00000000000..71d56539679 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h @@ -0,0 +1,35 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchlib/attribute/attributesaver.h> +#include "tensor_attribute.h" + +namespace search::tensor { + +class StreamedValueStore; + +/* + * Class for saving a tensor attribute. + */ +class StreamedValueSaver : public AttributeSaver +{ +public: + using RefCopyVector = TensorAttribute::RefCopyVector; +private: + using GenerationHandler = vespalib::GenerationHandler; + + RefCopyVector _refs; + const StreamedValueStore &_tensorStore; + + bool onSave(IAttributeSaveTarget &saveTarget) override; +public: + StreamedValueSaver(GenerationHandler::Guard &&guard, + const attribute::AttributeHeader &header, + RefCopyVector &&refs, + const StreamedValueStore &tensorStore); + + virtual ~StreamedValueSaver(); +}; + +} // namespace search::tensor diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp new file mode 100644 index 00000000000..ae2e0e7ed10 --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp @@ -0,0 +1,228 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "streamed_value_store.h" +#include "tensor_deserialize.h" +#include <vespa/eval/eval/value.h> +#include <vespa/eval/eval/value_codec.h> +#include <vespa/eval/streamed/streamed_value_builder_factory.h> +#include <vespa/eval/streamed/streamed_value_view.h> +#include <vespa/vespalib/datastore/datastore.hpp> +#include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/log/log.h> + +LOG_SETUP(".searchlib.tensor.streamed_value_store"); + +using vespalib::datastore::Handle; +using namespace vespalib::eval; + +namespace search::tensor { + +namespace { + +constexpr size_t MIN_BUFFER_ARRAYS = 1024; + +struct CellsMemBlock { + uint32_t num; + uint32_t total_sz; + const char *ptr; + CellsMemBlock(TypedCells cells) + : num(cells.size), + total_sz(CellTypeUtils::mem_size(cells.type, num)), + ptr((const char *)cells.data) + {} +}; + +template<typename T> +T *fix_alignment(T *ptr, size_t align) +{ + static_assert(sizeof(T) == 1); + assert((align & (align-1)) == 0); // must be 2^N + size_t ptr_val = (size_t)ptr; + size_t unalign = ptr_val & (align - 1); + if (unalign == 0) { + return ptr; + } else { + return ptr + (align - unalign); + } +} + +} // namespace <unnamed> + +StreamedValueStore::StreamedValueStore(const ValueType &tensor_type) + : TensorStore(_concreteStore), + _concreteStore(), + _bufferType(RefType::align(1), + MIN_BUFFER_ARRAYS, + RefType::offsetSize() / RefType::align(1)), + _tensor_type(tensor_type), + _data_from_type(_tensor_type) +{ + _store.addType(&_bufferType); + _store.initActiveBuffers(); +} + +StreamedValueStore::~StreamedValueStore() +{ + _store.dropBuffers(); +} + +std::pair<const char *, uint32_t> +StreamedValueStore::getRawBuffer(RefType ref) const +{ + if (!ref.valid()) { + return std::make_pair(nullptr, 0u); + } + const char *buf = _store.getEntry<char>(ref); + uint32_t len = *reinterpret_cast<const uint32_t *>(buf); + return std::make_pair(buf + sizeof(uint32_t), len); +} + +Handle<char> +StreamedValueStore::allocRawBuffer(uint32_t size) +{ + if (size == 0) { + return Handle<char>(); + } + size_t extSize = size + sizeof(uint32_t); + size_t bufSize = RefType::align(extSize); + auto result = _concreteStore.rawAllocator<char>(_typeId).alloc(bufSize); + *reinterpret_cast<uint32_t *>(result.data) = size; + char *padWritePtr = result.data + extSize; + for (size_t i = extSize; i < bufSize; ++i) { + *padWritePtr++ = 0; + } + // Hide length of buffer (first 4 bytes) from users of the buffer. + return Handle<char>(result.ref, result.data + sizeof(uint32_t)); +} + +void +StreamedValueStore::holdTensor(EntryRef ref) +{ + if (!ref.valid()) { + return; + } + RefType iRef(ref); + const char *buf = _store.getEntry<char>(iRef); + uint32_t len = *reinterpret_cast<const uint32_t *>(buf); + _concreteStore.holdElem(ref, len + sizeof(uint32_t)); +} + +TensorStore::EntryRef +StreamedValueStore::move(EntryRef ref) +{ + if (!ref.valid()) { + return RefType(); + } + auto oldraw = getRawBuffer(ref); + auto newraw = allocRawBuffer(oldraw.second); + memcpy(newraw.data, oldraw.first, oldraw.second); + _concreteStore.holdElem(ref, oldraw.second + sizeof(uint32_t)); + return newraw.ref; +} + +StreamedValueStore::StreamedValueData +StreamedValueStore::get_tensor_data(EntryRef ref) const +{ + StreamedValueData retval; + retval.valid = false; + auto raw = getRawBuffer(ref); + if (raw.second == 0u) { + return retval; + } + vespalib::nbostream source(raw.first, raw.second); + uint32_t num_cells = source.readValue<uint32_t>(); + { + uint32_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type); + const char *aligned_ptr = fix_alignment(source.peek(), alignment); + size_t adjustment = aligned_ptr - source.peek(); + source.adjustReadPos(adjustment); + } + retval.cells_ref = TypedCells(source.peek(), _data_from_type.cell_type, num_cells); + source.adjustReadPos(CellTypeUtils::mem_size(_data_from_type.cell_type, num_cells)); + retval.num_subspaces = source.readValue<uint32_t>(); + retval.labels_buffer = vespalib::ConstArrayRef<char>(source.peek(), source.size()); + assert(retval.num_subspaces * _data_from_type.dense_subspace_size == num_cells); + retval.valid = true; + return retval; +} + +bool +StreamedValueStore::encode_tensor(EntryRef ref, vespalib::nbostream &target) const +{ + if (auto data = get_tensor_data(ref)) { + StreamedValueView value( + _tensor_type, _data_from_type.num_mapped_dimensions, + data.cells_ref, data.num_subspaces, data.labels_buffer); + vespalib::eval::encode_value(value, target); + return true; + } else { + return false; + } +} + +void +StreamedValueStore::serialize_labels(const Value::Index &index, + vespalib::nbostream &target) const +{ + uint32_t num_subspaces = index.size(); + target << num_subspaces; + uint32_t num_mapped_dims = _data_from_type.num_mapped_dimensions; + std::vector<vespalib::stringref> labels(num_mapped_dims * num_subspaces); + auto view = index.create_view({}); + view->lookup({}); + std::vector<vespalib::stringref> addr(num_mapped_dims); + std::vector<vespalib::stringref *> addr_refs; + for (auto & label : addr) { + addr_refs.push_back(&label); + } + size_t subspace; + for (size_t ss = 0; ss < num_subspaces; ++ss) { + bool ok = view->next_result(addr_refs, subspace); + assert(ok); + size_t idx = subspace * num_mapped_dims; + for (auto label : addr) { + labels[idx++] = label; + } + } + bool ok = view->next_result(addr_refs, subspace); + assert(!ok); + for (auto label : labels) { + target.writeSmallString(label); + } +} + +TensorStore::EntryRef +StreamedValueStore::store_tensor(const Value &tensor) +{ + assert(tensor.type() == _tensor_type); + CellsMemBlock cells_mem(tensor.cells()); + size_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type); + size_t padding = alignment - 1; + vespalib::nbostream stream; + stream << uint32_t(cells_mem.num); + serialize_labels(tensor.index(), stream); + size_t mem_size = stream.size() + cells_mem.total_sz + padding; + auto raw = allocRawBuffer(mem_size); + char *target = raw.data; + memcpy(target, stream.peek(), sizeof(uint32_t)); + stream.adjustReadPos(sizeof(uint32_t)); + target += sizeof(uint32_t); + target = fix_alignment(target, alignment); + memcpy(target, cells_mem.ptr, cells_mem.total_sz); + target += cells_mem.total_sz; + memcpy(target, stream.peek(), stream.size()); + target += stream.size(); + assert(target <= raw.data + mem_size); + return raw.ref; +} + +TensorStore::EntryRef +StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded) +{ + const auto &factory = StreamedValueBuilderFactory::get(); + auto val = vespalib::eval::decode_value(encoded, factory); + return store_tensor(*val); +} + +} diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h new file mode 100644 index 00000000000..4e12296916d --- /dev/null +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h @@ -0,0 +1,98 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "tensor_store.h" +#include <vespa/eval/eval/value_type.h> +#include <vespa/eval/eval/value.h> +#include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/util/typify.h> + +namespace search::tensor { + +/** + * Class for storing tensors in memory, with a special serialization + * format that can be used directly to make a StreamedValueView. + * + * The tensor type is owned by the store itself and will not be + * serialized at all. + * + * The parameters for serialization (see DataFromType) are: + * - number of mapped dimensions [MD] + * - dense subspace size [DS] + * - size of each cell [CS] - currently 4 (float) or 8 (double) + * - alignment for cells [CA] - currently 4 (float) or 8 (double) + * While the tensor value to be serialized has: + * - number of dense subspaces [ND] + * - labels for dense subspaces, ND * MD strings + * - cell values, ND * DS cells (each either float or double) + * The serialization format looks like: + * + * [bytes] : [format] : [description] + * 4 : n.b.o. uint32_ t : num cells = ND * DS + * 1-7 : (none) : padding to cell alignment CA + * CS * ND * DS : native float or double : cells + * 4 : n.b.o. uint32_t : number of subspaces = ND + * (depends) : n.b.o. strings : labels + * + * Here, n.b.o. means network byte order, or more precisely + * it's the format vespalib::nbostream uses for the given data type, + * including strings (where exact format depends on the string length). + * Note that the only unpredictably-sized data (the labels) are kept + * last. + * If we ever make a "hbostream" which uses host byte order, we + * could switch to that instead since these data are only kept in + * memory. + */ +class StreamedValueStore : public TensorStore { +public: + using RefType = vespalib::datastore::AlignedEntryRefT<22, 3>; + using DataStoreType = vespalib::datastore::DataStoreT<RefType>; + + struct StreamedValueData { + bool valid; + vespalib::eval::TypedCells cells_ref; + size_t num_subspaces; + vespalib::ConstArrayRef<char> labels_buffer; + operator bool() const { return valid; } + }; + + struct DataFromType { + uint32_t num_mapped_dimensions; + uint32_t dense_subspace_size; + vespalib::eval::CellType cell_type; + + DataFromType(const vespalib::eval::ValueType& type) + : num_mapped_dimensions(type.count_mapped_dimensions()), + dense_subspace_size(type.dense_subspace_size()), + cell_type(type.cell_type()) + {} + }; + +private: + DataStoreType _concreteStore; + vespalib::datastore::BufferType<char> _bufferType; + vespalib::eval::ValueType _tensor_type; + DataFromType _data_from_type; + + void serialize_labels(const vespalib::eval::Value::Index &index, + vespalib::nbostream &target) const; + + std::pair<const char *, uint32_t> getRawBuffer(RefType ref) const; + vespalib::datastore::Handle<char> allocRawBuffer(uint32_t size); +public: + StreamedValueStore(const vespalib::eval::ValueType &tensor_type); + virtual ~StreamedValueStore(); + + virtual void holdTensor(EntryRef ref) override; + virtual EntryRef move(EntryRef ref) override; + + StreamedValueData get_tensor_data(EntryRef ref) const; + bool encode_tensor(EntryRef ref, vespalib::nbostream &target) const; + + EntryRef store_tensor(const vespalib::eval::Value &tensor); + EntryRef store_encoded_tensor(vespalib::nbostream &encoded); +}; + + +} diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp index 83988a3af11..35be27bc03b 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp @@ -1,9 +1,9 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "tensor_deserialize.h" #include <vespa/document/util/serializableexceptions.h> #include <vespa/eval/eval/engine_or_factory.h> #include <vespa/eval/eval/value.h> -#include <vespa/vespalib/objects/nbostream.h> using document::DeserializeException; using vespalib::eval::EngineOrFactory; @@ -11,14 +11,19 @@ using vespalib::eval::Value; namespace search::tensor { -std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size) +std::unique_ptr<Value> deserialize_tensor(vespalib::nbostream &buffer) { - vespalib::nbostream wrapStream(data, size); - auto tensor = EngineOrFactory::get().decode(wrapStream); - if (wrapStream.size() != 0) { + auto tensor = EngineOrFactory::get().decode(buffer); + if (buffer.size() != 0) { throw DeserializeException("Leftover bytes deserializing tensor attribute value.", VESPA_STRLOC); } return tensor; } +std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size) +{ + vespalib::nbostream wrapStream(data, size); + return deserialize_tensor(wrapStream); +} + } // namespace diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h index 18e166543d6..6f9521c1355 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h +++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h @@ -1,10 +1,14 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/eval/eval/value.h> +#include <vespa/vespalib/objects/nbostream.h> namespace search::tensor { extern std::unique_ptr<vespalib::eval::Value> deserialize_tensor(const void *data, size_t size); +extern std::unique_ptr<vespalib::eval::Value> +deserialize_tensor(vespalib::nbostream &stream); + } // namespace |