diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2021-01-08 08:12:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-08 08:12:13 +0100 |
commit | b3de117aa2002b9dda6829bfb730d4ed5ebb5af0 (patch) | |
tree | 8b09fe1a425b8db0516883a960269498f1034eba /searchlib | |
parent | e253914031bb803e59f9dc0de8a626c781520043 (diff) | |
parent | d2ac897f41951ec86082b419b22e06369322f04c (diff) |
Merge pull request #15913 from vespa-engine/havardpe/use-string-ids-as-tensor-labels
Havardpe/use string ids as tensor labels
Diffstat (limited to 'searchlib')
5 files changed, 225 insertions, 328 deletions
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp index e1bd47af358..7b597af417d 100644 --- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp +++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp @@ -563,7 +563,7 @@ void Fixture::testCompaction() { if ((_traits.use_dense_tensor_attribute && _denseTensors) || - _traits.use_direct_tensor_attribute) + ! _traits.use_dense_tensor_attribute) { LOG(info, "Skipping compaction test for tensor '%s' which is using free-lists", _cfg.tensorType().to_spec().c_str()); return; diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp index 6e1fb1a0a2f..260ffa1a388 100644 --- a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp @@ -3,8 +3,7 @@ #include "serialized_fast_value_attribute.h" #include "streamed_value_saver.h" #include <vespa/eval/eval/value.h> -#include <vespa/eval/eval/fast_value.hpp> -#include <vespa/eval/streamed/streamed_value_utils.h> +#include <vespa/eval/eval/fast_value.h> #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/searchlib/attribute/readerbase.h> #include <vespa/searchlib/util/fileutil.h> @@ -21,127 +20,10 @@ using namespace vespalib::eval; namespace search::tensor { -namespace { - -struct ValueBlock : LabelBlock { - TypedCells cells; -}; - -class ValueBlockStream { -private: - const StreamedValueStore::DataFromType &_from_type; - LabelBlockStream _label_block_stream; - const char *_cells_ptr; - - size_t dsss() const { return _from_type.dense_subspace_size; } - auto cell_type() const { return _from_type.cell_type; } -public: - ValueBlock next_block() { - auto labels = _label_block_stream.next_block(); - if (labels) { - TypedCells subspace_cells(_cells_ptr, cell_type(), dsss()); - _cells_ptr += CellTypeUtils::mem_size(cell_type(), dsss()); - return ValueBlock{labels, subspace_cells}; - } else { - TypedCells none(nullptr, cell_type(), 0); - return ValueBlock{labels, none}; - } - } - - ValueBlockStream(const StreamedValueStore::DataFromType &from_type, - const StreamedValueStore::StreamedValueData &from_store) - : _from_type(from_type), - _label_block_stream(from_store.num_subspaces, - from_store.labels_buffer, - from_type.num_mapped_dimensions), - _cells_ptr((const char *)from_store.cells_ref.data) - { - _label_block_stream.reset(); - } - - ~ValueBlockStream(); -}; - -ValueBlockStream::~ValueBlockStream() = default; - -void report_problematic_subspace(size_t idx, - const StreamedValueStore::DataFromType &from_type, - const StreamedValueStore::StreamedValueData &from_store) -{ - LOG(error, "PROBLEM: add_mapping returned same index=%zu twice", idx); - FastValueIndex temp_index(from_type.num_mapped_dimensions, - from_store.num_subspaces); - auto from_start = ValueBlockStream(from_type, from_store); - while (auto redo_block = from_start.next_block()) { - if (idx == temp_index.map.add_mapping(redo_block.address)) { - vespalib::string msg = "Block with address[ "; - for (vespalib::stringref ref : redo_block.address) { - msg.append("'").append(ref).append("' "); - } - msg.append("]"); - LOG(error, "%s maps to subspace %zu", msg.c_str(), idx); - } - } -} - -/** - * This Value implementation is almost exactly like FastValue, but - * instead of owning its type and cells it just has a reference to - * data stored elsewhere. - * XXX: we should find a better name for this, and move it - * (together with the helper classes above) to its own file, - * and add associated unit tests. - **/ -class OnlyFastValueIndex : public Value { -private: - const ValueType &_type; - TypedCells _cells; - FastValueIndex my_index; -public: - OnlyFastValueIndex(const ValueType &type, - const StreamedValueStore::DataFromType &from_type, - const StreamedValueStore::StreamedValueData &from_store) - : _type(type), - _cells(from_store.cells_ref), - my_index(from_type.num_mapped_dimensions, - from_store.num_subspaces) - { - assert(_type.cell_type() == _cells.type); - std::vector<vespalib::stringref> address(from_type.num_mapped_dimensions); - auto block_stream = ValueBlockStream(from_type, from_store); - size_t ss = 0; - while (auto block = block_stream.next_block()) { - size_t idx = my_index.map.add_mapping(block.address); - if (idx != ss) { - report_problematic_subspace(idx, from_type, from_store); - } - ++ss; - } - assert(ss == from_store.num_subspaces); - } - - - ~OnlyFastValueIndex(); - - const ValueType &type() const final override { return _type; } - TypedCells cells() const final override { return _cells; } - const Index &index() const final override { return my_index; } - vespalib::MemoryUsage get_memory_usage() const final override { - auto usage = self_memory_usage<OnlyFastValueIndex>(); - usage.merge(my_index.map.estimate_extra_memory_usage()); - return usage; - } -}; - -OnlyFastValueIndex::~OnlyFastValueIndex() = default; - -} - SerializedFastValueAttribute::SerializedFastValueAttribute(stringref name, const Config &cfg) : TensorAttribute(name, cfg, _streamedValueStore), _tensor_type(cfg.tensorType()), - _streamedValueStore(_tensor_type), - _data_from_type(_tensor_type) + _streamedValueStore(_tensor_type) { } @@ -171,10 +53,8 @@ SerializedFastValueAttribute::getTensor(DocId docId) const if (!ref.valid()) { return {}; } - if (auto data_from_store = _streamedValueStore.get_tensor_data(ref)) { - return std::make_unique<OnlyFastValueIndex>(_tensor_type, - _data_from_type, - data_from_store); + if (const auto * ptr = _streamedValueStore.get_tensor_entry(ref)) { + return ptr->create_fast_value_view(_tensor_type); } return {}; } diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h index a8c1df4913a..cc559d9b758 100644 --- a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h +++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h @@ -19,7 +19,6 @@ namespace search::tensor { class SerializedFastValueAttribute : public TensorAttribute { vespalib::eval::ValueType _tensor_type; StreamedValueStore _streamedValueStore; // data store for serialized tensors - const StreamedValueStore::DataFromType _data_from_type; public: SerializedFastValueAttribute(vespalib::stringref baseFileName, const Config &cfg); virtual ~SerializedFastValueAttribute(); diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp index c4579880409..ef4b711b86f 100644 --- a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp @@ -1,99 +1,204 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "streamed_value_store.h" -#include "tensor_deserialize.h" #include <vespa/eval/eval/value.h> #include <vespa/eval/eval/value_codec.h> +#include <vespa/eval/eval/fast_value.hpp> #include <vespa/eval/streamed/streamed_value_builder_factory.h> #include <vespa/eval/streamed/streamed_value_view.h> #include <vespa/vespalib/datastore/datastore.hpp> #include <vespa/vespalib/objects/nbostream.h> +#include <vespa/vespalib/util/typify.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/log.h> LOG_SETUP(".searchlib.tensor.streamed_value_store"); using vespalib::datastore::Handle; +using vespalib::datastore::EntryRef; using namespace vespalib::eval; +using vespalib::ConstArrayRef; +using vespalib::MemoryUsage; namespace search::tensor { +//----------------------------------------------------------------------------- + namespace { -constexpr size_t MIN_BUFFER_ARRAYS = 1024; - -struct CellsMemBlock { - uint32_t num; - uint32_t total_sz; - const char *ptr; - CellsMemBlock(TypedCells cells) - : num(cells.size), - total_sz(CellTypeUtils::mem_size(cells.type, num)), - ptr((const char *)cells.data) - {} +template <typename CT, typename F> +void each_subspace(const Value &value, size_t num_mapped, size_t dense_size, F f) { + size_t subspace; + std::vector<label_t> addr(num_mapped); + std::vector<label_t*> refs; + refs.reserve(addr.size()); + for (label_t &label: addr) { + refs.push_back(&label); + } + auto cells = value.cells().typify<CT>(); + auto view = value.index().create_view({}); + view->lookup({}); + while (view->next_result(refs, subspace)) { + size_t offset = subspace * dense_size; + f(ConstArrayRef<label_t>(addr), ConstArrayRef<CT>(cells.begin() + offset, dense_size)); + } +} + +using TensorEntry = StreamedValueStore::TensorEntry; + +struct CreateTensorEntry { + template <typename CT> + static TensorEntry::SP invoke(const Value &value, size_t num_mapped, size_t dense_size) { + using EntryImpl = StreamedValueStore::TensorEntryImpl<CT>; + return std::make_shared<EntryImpl>(value, num_mapped, dense_size); + } }; -template<typename T> -void check_alignment(T *ptr, size_t align) +using HandleView = vespalib::SharedStringRepo::HandleView; + +struct MyFastValueView final : Value { + const ValueType &my_type; + FastValueIndex my_index; + TypedCells my_cells; + MyFastValueView(const ValueType &type_ref, HandleView handle_view, TypedCells cells, size_t num_mapped, size_t num_spaces) + : my_type(type_ref), + my_index(num_mapped, handle_view, num_spaces), + my_cells(cells) + { + const std::vector<label_t> &labels = handle_view.handles(); + for (size_t i = 0; i < num_spaces; ++i) { + ConstArrayRef<label_t> addr(&labels[i * num_mapped], num_mapped); + my_index.map.add_mapping(FastAddrMap::hash_labels(addr)); + } + assert(my_index.map.size() == num_spaces); + } + const ValueType &type() const override { return my_type; } + const Value::Index &index() const override { return my_index; } + TypedCells cells() const override { return my_cells; } + MemoryUsage get_memory_usage() const override { + MemoryUsage usage = self_memory_usage<MyFastValueView>(); + usage.merge(my_index.map.estimate_extra_memory_usage()); + return usage; + } +}; + +} // <unnamed> + +//----------------------------------------------------------------------------- + +StreamedValueStore::TensorEntry::~TensorEntry() = default; + +StreamedValueStore::TensorEntry::SP +StreamedValueStore::TensorEntry::create_shared_entry(const Value &value) { - static_assert(sizeof(T) == 1); - size_t ptr_val = (size_t)ptr; - size_t unalign = ptr_val & (align - 1); - assert(unalign == 0); + size_t num_mapped = value.type().count_mapped_dimensions(); + size_t dense_size = value.type().dense_subspace_size(); + return vespalib::typify_invoke<1,TypifyCellType,CreateTensorEntry>(value.type().cell_type(), value, num_mapped, dense_size); } -} // namespace <unnamed> +template <typename CT> +StreamedValueStore::TensorEntryImpl<CT>::TensorEntryImpl(const Value &value, size_t num_mapped, size_t dense_size) + : handles(num_mapped * value.index().size()), + cells() +{ + cells.reserve(dense_size * value.index().size()); + auto store_subspace = [&](auto addr, auto data) { + for (label_t label: addr) { + handles.add(label); + } + for (CT entry: data) { + cells.push_back(entry); + } + }; + each_subspace<CT>(value, num_mapped, dense_size, store_subspace); +} -StreamedValueStore::StreamedValueStore(const ValueType &tensor_type) - : TensorStore(_concreteStore), - _concreteStore(), - _bufferType(RefType::align(1), - MIN_BUFFER_ARRAYS, - RefType::offsetSize() / RefType::align(1)), - _tensor_type(tensor_type), - _data_from_type(_tensor_type) +template <typename CT> +Value::UP +StreamedValueStore::TensorEntryImpl<CT>::create_fast_value_view(const ValueType &type_ref) const { - _store.addType(&_bufferType); - _store.initActiveBuffers(); - size_t align = CellTypeUtils::alignment(_data_from_type.cell_type); - // max alignment we can handle is 8: - assert(align <= 8); - // alignment must be a power of two: - assert((align & (align-1)) == 0); + size_t num_mapped = type_ref.count_mapped_dimensions(); + size_t dense_size = type_ref.dense_subspace_size(); + size_t num_spaces = cells.size() / dense_size; + assert(dense_size * num_spaces == cells.size()); + assert(num_mapped * num_spaces == handles.view().handles().size()); + return std::make_unique<MyFastValueView>(type_ref, handles.view(), TypedCells(cells), num_mapped, num_spaces); } -StreamedValueStore::~StreamedValueStore() +template <typename CT> +void +StreamedValueStore::TensorEntryImpl<CT>::encode_value(const ValueType &type, vespalib::nbostream &target) const { - _store.dropBuffers(); + size_t num_mapped = type.count_mapped_dimensions(); + size_t dense_size = type.dense_subspace_size(); + size_t num_spaces = cells.size() / dense_size; + assert(dense_size * num_spaces == cells.size()); + assert(num_mapped * num_spaces == handles.view().handles().size()); + StreamedValueView my_value(type, num_mapped, TypedCells(cells), num_spaces, handles.view().handles()); + ::vespalib::eval::encode_value(my_value, target); } -std::pair<const char *, uint32_t> -StreamedValueStore::getRawBuffer(RefType ref) const +template <typename CT> +MemoryUsage +StreamedValueStore::TensorEntryImpl<CT>::get_memory_usage() const +{ + MemoryUsage usage = self_memory_usage<TensorEntryImpl<CT>>(); + usage.merge(vector_extra_memory_usage(handles.view().handles())); + usage.merge(vector_extra_memory_usage(cells)); + return usage; +} + +template <typename CT> +StreamedValueStore::TensorEntryImpl<CT>::~TensorEntryImpl() = default; + +//----------------------------------------------------------------------------- + +constexpr size_t MIN_BUFFER_ARRAYS = 8192; + +StreamedValueStore::TensorBufferType::TensorBufferType() + : ParentType(1, MIN_BUFFER_ARRAYS, TensorStoreType::RefType::offsetSize()) { - if (!ref.valid()) { - return std::make_pair(nullptr, 0u); - } - const char *buf = _store.getEntry<char>(ref); - uint32_t len = *reinterpret_cast<const uint32_t *>(buf); - return std::make_pair(buf + sizeof(uint32_t), len); } -Handle<char> -StreamedValueStore::allocRawBuffer(uint32_t size) +void +StreamedValueStore::TensorBufferType::cleanHold(void* buffer, size_t offset, size_t num_elems, CleanContext clean_ctx) { - if (size == 0) { - return Handle<char>(); + TensorEntry::SP* elem = static_cast<TensorEntry::SP*>(buffer) + offset; + for (size_t i = 0; i < num_elems; ++i) { + clean_ctx.extraBytesCleaned((*elem)->get_memory_usage().allocatedBytes()); + *elem = _emptyEntry; + ++elem; } - size_t extSize = size + sizeof(uint32_t); - size_t bufSize = RefType::align(extSize); - auto result = _concreteStore.rawAllocator<char>(_typeId).alloc(bufSize); - *reinterpret_cast<uint32_t *>(result.data) = size; - char *padWritePtr = result.data + extSize; - for (size_t i = extSize; i < bufSize; ++i) { - *padWritePtr++ = 0; +} + +StreamedValueStore::StreamedValueStore(const ValueType &tensor_type) + : TensorStore(_concrete_store), + _concrete_store(), + _tensor_type(tensor_type) +{ + _concrete_store.enableFreeLists(); +} + +StreamedValueStore::~StreamedValueStore() = default; + +EntryRef +StreamedValueStore::add_entry(TensorEntry::SP tensor) +{ + auto ref = _concrete_store.addEntry(tensor); + auto& state = _concrete_store.getBufferState(RefType(ref).bufferId()); + state.incExtraUsedBytes(tensor->get_memory_usage().allocatedBytes()); + return ref; +} + +const StreamedValueStore::TensorEntry * +StreamedValueStore::get_tensor_entry(EntryRef ref) const +{ + if (!ref.valid()) { + return nullptr; } - // Hide length of buffer (first 4 bytes) from users of the buffer. - return Handle<char>(result.ref, result.data + sizeof(uint32_t)); + const auto& entry = _concrete_store.getEntry(ref); + assert(entry); + return entry.get(); } void @@ -102,111 +207,40 @@ StreamedValueStore::holdTensor(EntryRef ref) if (!ref.valid()) { return; } - RefType iRef(ref); - const char *buf = _store.getEntry<char>(iRef); - uint32_t len = *reinterpret_cast<const uint32_t *>(buf); - _concreteStore.holdElem(ref, len + sizeof(uint32_t)); + const auto& tensor = _concrete_store.getEntry(ref); + assert(tensor); + _concrete_store.holdElem(ref, 1, tensor->get_memory_usage().allocatedBytes()); } TensorStore::EntryRef StreamedValueStore::move(EntryRef ref) { if (!ref.valid()) { - return RefType(); + return EntryRef(); } - auto oldraw = getRawBuffer(ref); - auto newraw = allocRawBuffer(oldraw.second); - memcpy(newraw.data, oldraw.first, oldraw.second); - _concreteStore.holdElem(ref, oldraw.second + sizeof(uint32_t)); - return newraw.ref; -} - -StreamedValueStore::StreamedValueData -StreamedValueStore::get_tensor_data(EntryRef ref) const -{ - StreamedValueData retval; - retval.valid = false; - auto raw = getRawBuffer(ref); - if (raw.second == 0u) { - return retval; - } - vespalib::nbostream source(raw.first, raw.second); - uint32_t num_cells = source.readValue<uint32_t>(); - check_alignment(source.peek(), CellTypeUtils::alignment(_data_from_type.cell_type)); - retval.cells_ref = TypedCells(source.peek(), _data_from_type.cell_type, num_cells); - source.adjustReadPos(CellTypeUtils::mem_size(_data_from_type.cell_type, num_cells)); - assert((num_cells % _data_from_type.dense_subspace_size) == 0); - retval.num_subspaces = num_cells / _data_from_type.dense_subspace_size; - retval.labels_buffer = vespalib::ConstArrayRef<char>(source.peek(), source.size()); - retval.valid = true; - return retval; + const auto& old_tensor = _concrete_store.getEntry(ref); + assert(old_tensor); + auto new_ref = add_entry(old_tensor); + _concrete_store.holdElem(ref, 1, old_tensor->get_memory_usage().allocatedBytes()); + return new_ref; } bool StreamedValueStore::encode_tensor(EntryRef ref, vespalib::nbostream &target) const { - if (auto data = get_tensor_data(ref)) { - StreamedValueView value( - _tensor_type, _data_from_type.num_mapped_dimensions, - data.cells_ref, data.num_subspaces, data.labels_buffer); - vespalib::eval::encode_value(value, target); + if (const auto * entry = get_tensor_entry(ref)) { + entry->encode_value(_tensor_type, target); return true; } else { return false; } } -void -StreamedValueStore::serialize_labels(const Value::Index &index, - vespalib::nbostream &target) const -{ - uint32_t num_subspaces = index.size(); - uint32_t num_mapped_dims = _data_from_type.num_mapped_dimensions; - std::vector<vespalib::stringref> labels(num_mapped_dims * num_subspaces); - auto view = index.create_view({}); - view->lookup({}); - std::vector<vespalib::stringref> addr(num_mapped_dims); - std::vector<vespalib::stringref *> addr_refs; - for (auto & label : addr) { - addr_refs.push_back(&label); - } - size_t subspace; - for (size_t ss = 0; ss < num_subspaces; ++ss) { - bool ok = view->next_result(addr_refs, subspace); - assert(ok); - size_t idx = subspace * num_mapped_dims; - for (auto label : addr) { - labels[idx++] = label; - } - } - bool ok = view->next_result(addr_refs, subspace); - assert(!ok); - for (auto label : labels) { - target.writeSmallString(label); - } -} - TensorStore::EntryRef StreamedValueStore::store_tensor(const Value &tensor) { assert(tensor.type() == _tensor_type); - CellsMemBlock cells_mem(tensor.cells()); - vespalib::nbostream stream; - stream << uint32_t(cells_mem.num); - serialize_labels(tensor.index(), stream); - size_t mem_size = stream.size() + cells_mem.total_sz; - auto raw = allocRawBuffer(mem_size); - char *target = raw.data; - memcpy(target, stream.peek(), sizeof(uint32_t)); - stream.adjustReadPos(sizeof(uint32_t)); - target += sizeof(uint32_t); - check_alignment(target, CellTypeUtils::alignment(_data_from_type.cell_type)); - memcpy(target, cells_mem.ptr, cells_mem.total_sz); - target += cells_mem.total_sz; - memcpy(target, stream.peek(), stream.size()); - target += stream.size(); - assert(target <= raw.data + mem_size); - return raw.ref; + return add_entry(TensorEntry::create_shared_entry(tensor)); } TensorStore::EntryRef diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h index de94dc043d3..3a9d9a0b7b4 100644 --- a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h +++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h @@ -5,87 +5,71 @@ #include "tensor_store.h" #include <vespa/eval/eval/value_type.h> #include <vespa/eval/eval/value.h> +#include <vespa/eval/streamed/streamed_value.h> #include <vespa/vespalib/objects/nbostream.h> -#include <vespa/vespalib/util/typify.h> +#include <vespa/vespalib/util/shared_string_repo.h> namespace search::tensor { /** - * Class for storing tensors in memory, with a special serialization - * format that can be used directly to make a StreamedValueView. - * - * The tensor type is owned by the store itself and will not be - * serialized at all. - * - * The parameters for serialization (see DataFromType) are: - * - number of mapped dimensions [MD] - * - dense subspace size [DS] - * - size of each cell [CS] - currently 4 (float) or 8 (double) - * - alignment for cells - currently 4 (float) or 8 (double) - * While the tensor value to be serialized has: - * - number of dense subspaces [ND] - * - labels for dense subspaces, ND * MD strings - * - cell values, ND * DS cells (each either float or double) - * The serialization format looks like: - * - * [bytes] : [format] : [description] - * 4 : n.b.o. uint32_ t : num cells = ND * DS - * CS * ND * DS : native float or double : cells - * (depends) : n.b.o. strings : ND * MD label strings - * - * Here, n.b.o. means network byte order, or more precisely - * it's the format vespalib::nbostream uses for the given data type, - * including strings (where exact format depends on the string length). - * Note that the only unpredictably-sized data (the labels) are kept - * last. - * If we ever make a "hbostream" which uses host byte order, we - * could switch to that instead since these data are only kept in - * memory. + * Class for StreamedValue tensors in memory. */ class StreamedValueStore : public TensorStore { public: - using RefType = vespalib::datastore::AlignedEntryRefT<22, 3>; - using DataStoreType = vespalib::datastore::DataStoreT<RefType>; + using Value = vespalib::eval::Value; + using ValueType = vespalib::eval::ValueType; + using Handles = vespalib::SharedStringRepo::StrongHandles; + using MemoryUsage = vespalib::MemoryUsage; - struct StreamedValueData { - bool valid; - vespalib::eval::TypedCells cells_ref; - size_t num_subspaces; - vespalib::ConstArrayRef<char> labels_buffer; - operator bool() const { return valid; } + // interface for tensor entries + struct TensorEntry { + using SP = std::shared_ptr<TensorEntry>; + virtual Value::UP create_fast_value_view(const ValueType &type_ref) const = 0; + virtual void encode_value(const ValueType &type, vespalib::nbostream &target) const = 0; + virtual MemoryUsage get_memory_usage() const = 0; + virtual ~TensorEntry(); + static TensorEntry::SP create_shared_entry(const Value &value); }; - struct DataFromType { - uint32_t num_mapped_dimensions; - uint32_t dense_subspace_size; - vespalib::eval::CellType cell_type; - - DataFromType(const vespalib::eval::ValueType& type) - : num_mapped_dimensions(type.count_mapped_dimensions()), - dense_subspace_size(type.dense_subspace_size()), - cell_type(type.cell_type()) - {} + // implementation of tensor entries + template <typename CT> + struct TensorEntryImpl : public TensorEntry { + Handles handles; + std::vector<CT> cells; + TensorEntryImpl(const Value &value, size_t num_mapped, size_t dense_size); + Value::UP create_fast_value_view(const ValueType &type_ref) const override; + void encode_value(const ValueType &type, vespalib::nbostream &target) const override; + MemoryUsage get_memory_usage() const override; + ~TensorEntryImpl() override; }; private: - DataStoreType _concreteStore; - vespalib::datastore::BufferType<char> _bufferType; - vespalib::eval::ValueType _tensor_type; - DataFromType _data_from_type; - - void serialize_labels(const vespalib::eval::Value::Index &index, - vespalib::nbostream &target) const; + // Note: Must use SP (instead of UP) because of fallbackCopy() and initializeReservedElements() in BufferType, + // and implementation of move(). + using TensorStoreType = vespalib::datastore::DataStore<TensorEntry::SP>; - std::pair<const char *, uint32_t> getRawBuffer(RefType ref) const; - vespalib::datastore::Handle<char> allocRawBuffer(uint32_t size); + class TensorBufferType : public vespalib::datastore::BufferType<TensorEntry::SP> { + private: + using ParentType = BufferType<TensorEntry::SP>; + using ParentType::_emptyEntry; + using CleanContext = typename ParentType::CleanContext; + public: + TensorBufferType(); + virtual void cleanHold(void* buffer, size_t offset, size_t num_elems, CleanContext clean_ctx) override; + }; + TensorStoreType _concrete_store; + const vespalib::eval::ValueType _tensor_type; + EntryRef add_entry(TensorEntry::SP tensor); public: StreamedValueStore(const vespalib::eval::ValueType &tensor_type); - virtual ~StreamedValueStore(); + ~StreamedValueStore() override; + + using RefType = TensorStoreType::RefType; - virtual void holdTensor(EntryRef ref) override; - virtual EntryRef move(EntryRef ref) override; + void holdTensor(EntryRef ref) override; + EntryRef move(EntryRef ref) override; - StreamedValueData get_tensor_data(EntryRef ref) const; + const TensorEntry * get_tensor_entry(EntryRef ref) const; bool encode_tensor(EntryRef ref, vespalib::nbostream &target) const; EntryRef store_tensor(const vespalib::eval::Value &tensor); |