summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorArne H Juul <arnej27959@users.noreply.github.com>2020-12-01 15:02:51 +0100
committerGitHub <noreply@github.com>2020-12-01 15:02:51 +0100
commit8d9680d2101c76229190598547362ab2760d3d2f (patch)
tree45d4dd185eda20e678cc6ab78b04fc557f929d43 /searchlib
parent80ed20529925faa4e2bcece3f22ea7b8f2b49f7f (diff)
parent4ac10dad24980734a161533c36b232cfc5d3a2f9 (diff)
Merge pull request #15484 from vespa-engine/arnej/add-serialized_fast_value_attribute
Arnej/add serialized fast value attribute
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/tensor/CMakeLists.txt3
-rw-r--r--searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp234
-rw-r--r--searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h33
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp48
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h35
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp228
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.h98
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp15
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h4
11 files changed, 700 insertions, 8 deletions
diff --git a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
index daa85c91b2c..bf46a2cc7d0 100644
--- a/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
+++ b/searchlib/src/tests/attribute/tensorattribute/tensorattribute_test.cpp
@@ -16,7 +16,7 @@
#include <vespa/searchlib/tensor/nearest_neighbor_index.h>
#include <vespa/searchlib/tensor/nearest_neighbor_index_factory.h>
#include <vespa/searchlib/tensor/nearest_neighbor_index_saver.h>
-#include <vespa/searchlib/tensor/serialized_tensor_attribute.h>
+#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h>
#include <vespa/searchlib/tensor/tensor_attribute.h>
#include <vespa/searchlib/test/directory_handler.h>
#include <vespa/searchlib/util/fileutil.h>
@@ -40,7 +40,7 @@ using search::tensor::DefaultNearestNeighborIndexFactory;
using search::tensor::DenseTensorAttribute;
using search::tensor::DirectTensorAttribute;
using search::tensor::DocVectorAccess;
-using search::tensor::SerializedTensorAttribute;
+using search::tensor::SerializedFastValueAttribute;
using search::tensor::HnswIndex;
using search::tensor::HnswNode;
using search::tensor::NearestNeighborIndex;
@@ -344,7 +344,7 @@ struct Fixture {
} else if (_traits.use_direct_tensor_attribute) {
return std::make_shared<DirectTensorAttribute>(_name, _cfg);
} else {
- return std::make_shared<SerializedTensorAttribute>(_name, _cfg);
+ return std::make_shared<SerializedFastValueAttribute>(_name, _cfg);
}
}
diff --git a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
index 148d18f79ff..29033944d4b 100644
--- a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
@@ -7,8 +7,10 @@
#include "singlenumericattribute.hpp"
#include "singlestringattribute.h"
#include "singleboolattribute.h"
+#include <vespa/eval/eval/engine_or_factory.h>
#include <vespa/searchlib/tensor/dense_tensor_attribute.h>
#include <vespa/searchlib/tensor/serialized_tensor_attribute.h>
+#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h>
namespace search {
@@ -45,6 +47,8 @@ AttributeFactory::createSingleStd(stringref name, const Config & info)
case BasicType::TENSOR:
if (info.tensorType().is_dense()) {
return std::make_shared<tensor::DenseTensorAttribute>(name, info);
+ } else if (vespalib::eval::EngineOrFactory::get().is_factory()) {
+ return std::make_shared<tensor::SerializedFastValueAttribute>(name, info);
} else {
return std::make_shared<tensor::SerializedTensorAttribute>(name, info);
}
diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
index fac6d015a5f..79b18a57a34 100644
--- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
@@ -25,5 +25,8 @@ vespa_add_library(searchlib_tensor OBJECT
tensor_attribute.cpp
tensor_deserialize.cpp
tensor_store.cpp
+ serialized_fast_value_attribute.cpp
+ streamed_value_saver.cpp
+ streamed_value_store.cpp
DEPENDS
)
diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
new file mode 100644
index 00000000000..6e1fb1a0a2f
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
@@ -0,0 +1,234 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "serialized_fast_value_attribute.h"
+#include "streamed_value_saver.h"
+#include <vespa/eval/eval/value.h>
+#include <vespa/eval/eval/fast_value.hpp>
+#include <vespa/eval/streamed/streamed_value_utils.h>
+#include <vespa/fastlib/io/bufferedfile.h>
+#include <vespa/searchlib/attribute/readerbase.h>
+#include <vespa/searchlib/util/fileutil.h>
+#include <vespa/vespalib/util/rcuvector.hpp>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".searchlib.tensor.serialized_fast_value_attribute");
+
+#include "blob_sequence_reader.h"
+#include "tensor_attribute.hpp"
+
+using namespace vespalib;
+using namespace vespalib::eval;
+
+namespace search::tensor {
+
+namespace {
+
+struct ValueBlock : LabelBlock {
+ TypedCells cells;
+};
+
+class ValueBlockStream {
+private:
+ const StreamedValueStore::DataFromType &_from_type;
+ LabelBlockStream _label_block_stream;
+ const char *_cells_ptr;
+
+ size_t dsss() const { return _from_type.dense_subspace_size; }
+ auto cell_type() const { return _from_type.cell_type; }
+public:
+ ValueBlock next_block() {
+ auto labels = _label_block_stream.next_block();
+ if (labels) {
+ TypedCells subspace_cells(_cells_ptr, cell_type(), dsss());
+ _cells_ptr += CellTypeUtils::mem_size(cell_type(), dsss());
+ return ValueBlock{labels, subspace_cells};
+ } else {
+ TypedCells none(nullptr, cell_type(), 0);
+ return ValueBlock{labels, none};
+ }
+ }
+
+ ValueBlockStream(const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+ : _from_type(from_type),
+ _label_block_stream(from_store.num_subspaces,
+ from_store.labels_buffer,
+ from_type.num_mapped_dimensions),
+ _cells_ptr((const char *)from_store.cells_ref.data)
+ {
+ _label_block_stream.reset();
+ }
+
+ ~ValueBlockStream();
+};
+
+ValueBlockStream::~ValueBlockStream() = default;
+
+void report_problematic_subspace(size_t idx,
+ const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+{
+ LOG(error, "PROBLEM: add_mapping returned same index=%zu twice", idx);
+ FastValueIndex temp_index(from_type.num_mapped_dimensions,
+ from_store.num_subspaces);
+ auto from_start = ValueBlockStream(from_type, from_store);
+ while (auto redo_block = from_start.next_block()) {
+ if (idx == temp_index.map.add_mapping(redo_block.address)) {
+ vespalib::string msg = "Block with address[ ";
+ for (vespalib::stringref ref : redo_block.address) {
+ msg.append("'").append(ref).append("' ");
+ }
+ msg.append("]");
+ LOG(error, "%s maps to subspace %zu", msg.c_str(), idx);
+ }
+ }
+}
+
+/**
+ * This Value implementation is almost exactly like FastValue, but
+ * instead of owning its type and cells it just has a reference to
+ * data stored elsewhere.
+ * XXX: we should find a better name for this, and move it
+ * (together with the helper classes above) to its own file,
+ * and add associated unit tests.
+ **/
+class OnlyFastValueIndex : public Value {
+private:
+ const ValueType &_type;
+ TypedCells _cells;
+ FastValueIndex my_index;
+public:
+ OnlyFastValueIndex(const ValueType &type,
+ const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+ : _type(type),
+ _cells(from_store.cells_ref),
+ my_index(from_type.num_mapped_dimensions,
+ from_store.num_subspaces)
+ {
+ assert(_type.cell_type() == _cells.type);
+ std::vector<vespalib::stringref> address(from_type.num_mapped_dimensions);
+ auto block_stream = ValueBlockStream(from_type, from_store);
+ size_t ss = 0;
+ while (auto block = block_stream.next_block()) {
+ size_t idx = my_index.map.add_mapping(block.address);
+ if (idx != ss) {
+ report_problematic_subspace(idx, from_type, from_store);
+ }
+ ++ss;
+ }
+ assert(ss == from_store.num_subspaces);
+ }
+
+
+ ~OnlyFastValueIndex();
+
+ const ValueType &type() const final override { return _type; }
+ TypedCells cells() const final override { return _cells; }
+ const Index &index() const final override { return my_index; }
+ vespalib::MemoryUsage get_memory_usage() const final override {
+ auto usage = self_memory_usage<OnlyFastValueIndex>();
+ usage.merge(my_index.map.estimate_extra_memory_usage());
+ return usage;
+ }
+};
+
+OnlyFastValueIndex::~OnlyFastValueIndex() = default;
+
+}
+
+SerializedFastValueAttribute::SerializedFastValueAttribute(stringref name, const Config &cfg)
+ : TensorAttribute(name, cfg, _streamedValueStore),
+ _tensor_type(cfg.tensorType()),
+ _streamedValueStore(_tensor_type),
+ _data_from_type(_tensor_type)
+{
+}
+
+
+SerializedFastValueAttribute::~SerializedFastValueAttribute()
+{
+ getGenerationHolder().clearHoldLists();
+ _tensorStore.clearHoldLists();
+}
+
+void
+SerializedFastValueAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor)
+{
+ checkTensorType(tensor);
+ EntryRef ref = _streamedValueStore.store_tensor(tensor);
+ assert(ref.valid());
+ setTensorRef(docId, ref);
+}
+
+std::unique_ptr<Value>
+SerializedFastValueAttribute::getTensor(DocId docId) const
+{
+ EntryRef ref;
+ if (docId < getCommittedDocIdLimit()) {
+ ref = _refVector[docId];
+ }
+ if (!ref.valid()) {
+ return {};
+ }
+ if (auto data_from_store = _streamedValueStore.get_tensor_data(ref)) {
+ return std::make_unique<OnlyFastValueIndex>(_tensor_type,
+ _data_from_type,
+ data_from_store);
+ }
+ return {};
+}
+
+bool
+SerializedFastValueAttribute::onLoad()
+{
+ BlobSequenceReader tensorReader(*this);
+ if (!tensorReader.hasData()) {
+ return false;
+ }
+ setCreateSerialNum(tensorReader.getCreateSerialNum());
+ assert(tensorReader.getVersion() == getVersion());
+ uint32_t numDocs(tensorReader.getDocIdLimit());
+ _refVector.reset();
+ _refVector.unsafe_reserve(numDocs);
+ vespalib::Array<char> buffer(1024);
+ for (uint32_t lid = 0; lid < numDocs; ++lid) {
+ uint32_t tensorSize = tensorReader.getNextSize();
+ if (tensorSize != 0) {
+ if (tensorSize > buffer.size()) {
+ buffer.resize(tensorSize + 1024);
+ }
+ tensorReader.readBlob(&buffer[0], tensorSize);
+ vespalib::nbostream source(&buffer[0], tensorSize);
+ EntryRef ref = _streamedValueStore.store_encoded_tensor(source);
+ _refVector.push_back(ref);
+ } else {
+ EntryRef invalid;
+ _refVector.push_back(invalid);
+ }
+ }
+ setNumDocs(numDocs);
+ setCommittedDocIdLimit(numDocs);
+ return true;
+}
+
+
+std::unique_ptr<AttributeSaver>
+SerializedFastValueAttribute::onInitSave(vespalib::stringref fileName)
+{
+ vespalib::GenerationHandler::Guard guard(getGenerationHandler().
+ takeGuard());
+ return std::make_unique<StreamedValueSaver>
+ (std::move(guard),
+ this->createAttributeHeader(fileName),
+ getRefCopy(),
+ _streamedValueStore);
+}
+
+void
+SerializedFastValueAttribute::compactWorst()
+{
+ doCompactWorst<StreamedValueStore::RefType>();
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h
new file mode 100644
index 00000000000..a8c1df4913a
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h
@@ -0,0 +1,33 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "tensor_attribute.h"
+#include "streamed_value_store.h"
+
+namespace search::tensor {
+
+/**
+ * Attribute vector class storing serialized tensors for all documents in memory.
+ *
+ * When fetching a tensor with getTensor(docId) the returned Value
+ * will have a FastValueIndex (constructed on the fly) for its sparse
+ * mapping, but refer to a common type, while cells() will refer to
+ * memory in the serialized store without copying.
+ *
+ */
+class SerializedFastValueAttribute : public TensorAttribute {
+ vespalib::eval::ValueType _tensor_type;
+ StreamedValueStore _streamedValueStore; // data store for serialized tensors
+ const StreamedValueStore::DataFromType _data_from_type;
+public:
+ SerializedFastValueAttribute(vespalib::stringref baseFileName, const Config &cfg);
+ virtual ~SerializedFastValueAttribute();
+ virtual void setTensor(DocId docId, const vespalib::eval::Value &tensor) override;
+ virtual std::unique_ptr<vespalib::eval::Value> getTensor(DocId docId) const override;
+ virtual bool onLoad() override;
+ virtual std::unique_ptr<AttributeSaver> onInitSave(vespalib::stringref fileName) override;
+ virtual void compactWorst() override;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp
new file mode 100644
index 00000000000..d4fd681f2cb
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp
@@ -0,0 +1,48 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "streamed_value_saver.h"
+#include "streamed_value_store.h"
+
+#include <vespa/searchlib/attribute/iattributesavetarget.h>
+#include <vespa/searchlib/util/bufferwriter.h>
+#include <vespa/vespalib/objects/nbostream.h>
+
+using vespalib::GenerationHandler;
+
+namespace search::tensor {
+
+StreamedValueSaver::
+StreamedValueSaver(GenerationHandler::Guard &&guard,
+ const attribute::AttributeHeader &header,
+ RefCopyVector &&refs,
+ const StreamedValueStore &tensorStore)
+ : AttributeSaver(std::move(guard), header),
+ _refs(std::move(refs)),
+ _tensorStore(tensorStore)
+{
+}
+
+StreamedValueSaver::~StreamedValueSaver() = default;
+
+bool
+StreamedValueSaver::onSave(IAttributeSaveTarget &saveTarget)
+{
+ auto datWriter = saveTarget.datWriter().allocBufferWriter();
+ const uint32_t docIdLimit(_refs.size());
+ vespalib::nbostream stream;
+ for (uint32_t lid = 0; lid < docIdLimit; ++lid) {
+ if (_tensorStore.encode_tensor(_refs[lid], stream)) {
+ uint32_t sz = stream.size();
+ datWriter->write(&sz, sizeof(sz));
+ datWriter->write(stream.peek(), stream.size());
+ stream.clear();
+ } else {
+ uint32_t sz = 0;
+ datWriter->write(&sz, sizeof(sz));
+ }
+ }
+ datWriter->flush();
+ return true;
+}
+
+} // namespace search::tensor
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h
new file mode 100644
index 00000000000..71d56539679
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h
@@ -0,0 +1,35 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchlib/attribute/attributesaver.h>
+#include "tensor_attribute.h"
+
+namespace search::tensor {
+
+class StreamedValueStore;
+
+/*
+ * Class for saving a tensor attribute.
+ */
+class StreamedValueSaver : public AttributeSaver
+{
+public:
+ using RefCopyVector = TensorAttribute::RefCopyVector;
+private:
+ using GenerationHandler = vespalib::GenerationHandler;
+
+ RefCopyVector _refs;
+ const StreamedValueStore &_tensorStore;
+
+ bool onSave(IAttributeSaveTarget &saveTarget) override;
+public:
+ StreamedValueSaver(GenerationHandler::Guard &&guard,
+ const attribute::AttributeHeader &header,
+ RefCopyVector &&refs,
+ const StreamedValueStore &tensorStore);
+
+ virtual ~StreamedValueSaver();
+};
+
+} // namespace search::tensor
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
new file mode 100644
index 00000000000..ae2e0e7ed10
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
@@ -0,0 +1,228 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "streamed_value_store.h"
+#include "tensor_deserialize.h"
+#include <vespa/eval/eval/value.h>
+#include <vespa/eval/eval/value_codec.h>
+#include <vespa/eval/streamed/streamed_value_builder_factory.h>
+#include <vespa/eval/streamed/streamed_value_view.h>
+#include <vespa/vespalib/datastore/datastore.hpp>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".searchlib.tensor.streamed_value_store");
+
+using vespalib::datastore::Handle;
+using namespace vespalib::eval;
+
+namespace search::tensor {
+
+namespace {
+
+constexpr size_t MIN_BUFFER_ARRAYS = 1024;
+
+struct CellsMemBlock {
+ uint32_t num;
+ uint32_t total_sz;
+ const char *ptr;
+ CellsMemBlock(TypedCells cells)
+ : num(cells.size),
+ total_sz(CellTypeUtils::mem_size(cells.type, num)),
+ ptr((const char *)cells.data)
+ {}
+};
+
+template<typename T>
+T *fix_alignment(T *ptr, size_t align)
+{
+ static_assert(sizeof(T) == 1);
+ assert((align & (align-1)) == 0); // must be 2^N
+ size_t ptr_val = (size_t)ptr;
+ size_t unalign = ptr_val & (align - 1);
+ if (unalign == 0) {
+ return ptr;
+ } else {
+ return ptr + (align - unalign);
+ }
+}
+
+} // namespace <unnamed>
+
+StreamedValueStore::StreamedValueStore(const ValueType &tensor_type)
+ : TensorStore(_concreteStore),
+ _concreteStore(),
+ _bufferType(RefType::align(1),
+ MIN_BUFFER_ARRAYS,
+ RefType::offsetSize() / RefType::align(1)),
+ _tensor_type(tensor_type),
+ _data_from_type(_tensor_type)
+{
+ _store.addType(&_bufferType);
+ _store.initActiveBuffers();
+}
+
+StreamedValueStore::~StreamedValueStore()
+{
+ _store.dropBuffers();
+}
+
+std::pair<const char *, uint32_t>
+StreamedValueStore::getRawBuffer(RefType ref) const
+{
+ if (!ref.valid()) {
+ return std::make_pair(nullptr, 0u);
+ }
+ const char *buf = _store.getEntry<char>(ref);
+ uint32_t len = *reinterpret_cast<const uint32_t *>(buf);
+ return std::make_pair(buf + sizeof(uint32_t), len);
+}
+
+Handle<char>
+StreamedValueStore::allocRawBuffer(uint32_t size)
+{
+ if (size == 0) {
+ return Handle<char>();
+ }
+ size_t extSize = size + sizeof(uint32_t);
+ size_t bufSize = RefType::align(extSize);
+ auto result = _concreteStore.rawAllocator<char>(_typeId).alloc(bufSize);
+ *reinterpret_cast<uint32_t *>(result.data) = size;
+ char *padWritePtr = result.data + extSize;
+ for (size_t i = extSize; i < bufSize; ++i) {
+ *padWritePtr++ = 0;
+ }
+ // Hide length of buffer (first 4 bytes) from users of the buffer.
+ return Handle<char>(result.ref, result.data + sizeof(uint32_t));
+}
+
+void
+StreamedValueStore::holdTensor(EntryRef ref)
+{
+ if (!ref.valid()) {
+ return;
+ }
+ RefType iRef(ref);
+ const char *buf = _store.getEntry<char>(iRef);
+ uint32_t len = *reinterpret_cast<const uint32_t *>(buf);
+ _concreteStore.holdElem(ref, len + sizeof(uint32_t));
+}
+
+TensorStore::EntryRef
+StreamedValueStore::move(EntryRef ref)
+{
+ if (!ref.valid()) {
+ return RefType();
+ }
+ auto oldraw = getRawBuffer(ref);
+ auto newraw = allocRawBuffer(oldraw.second);
+ memcpy(newraw.data, oldraw.first, oldraw.second);
+ _concreteStore.holdElem(ref, oldraw.second + sizeof(uint32_t));
+ return newraw.ref;
+}
+
+StreamedValueStore::StreamedValueData
+StreamedValueStore::get_tensor_data(EntryRef ref) const
+{
+ StreamedValueData retval;
+ retval.valid = false;
+ auto raw = getRawBuffer(ref);
+ if (raw.second == 0u) {
+ return retval;
+ }
+ vespalib::nbostream source(raw.first, raw.second);
+ uint32_t num_cells = source.readValue<uint32_t>();
+ {
+ uint32_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type);
+ const char *aligned_ptr = fix_alignment(source.peek(), alignment);
+ size_t adjustment = aligned_ptr - source.peek();
+ source.adjustReadPos(adjustment);
+ }
+ retval.cells_ref = TypedCells(source.peek(), _data_from_type.cell_type, num_cells);
+ source.adjustReadPos(CellTypeUtils::mem_size(_data_from_type.cell_type, num_cells));
+ retval.num_subspaces = source.readValue<uint32_t>();
+ retval.labels_buffer = vespalib::ConstArrayRef<char>(source.peek(), source.size());
+ assert(retval.num_subspaces * _data_from_type.dense_subspace_size == num_cells);
+ retval.valid = true;
+ return retval;
+}
+
+bool
+StreamedValueStore::encode_tensor(EntryRef ref, vespalib::nbostream &target) const
+{
+ if (auto data = get_tensor_data(ref)) {
+ StreamedValueView value(
+ _tensor_type, _data_from_type.num_mapped_dimensions,
+ data.cells_ref, data.num_subspaces, data.labels_buffer);
+ vespalib::eval::encode_value(value, target);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void
+StreamedValueStore::serialize_labels(const Value::Index &index,
+ vespalib::nbostream &target) const
+{
+ uint32_t num_subspaces = index.size();
+ target << num_subspaces;
+ uint32_t num_mapped_dims = _data_from_type.num_mapped_dimensions;
+ std::vector<vespalib::stringref> labels(num_mapped_dims * num_subspaces);
+ auto view = index.create_view({});
+ view->lookup({});
+ std::vector<vespalib::stringref> addr(num_mapped_dims);
+ std::vector<vespalib::stringref *> addr_refs;
+ for (auto & label : addr) {
+ addr_refs.push_back(&label);
+ }
+ size_t subspace;
+ for (size_t ss = 0; ss < num_subspaces; ++ss) {
+ bool ok = view->next_result(addr_refs, subspace);
+ assert(ok);
+ size_t idx = subspace * num_mapped_dims;
+ for (auto label : addr) {
+ labels[idx++] = label;
+ }
+ }
+ bool ok = view->next_result(addr_refs, subspace);
+ assert(!ok);
+ for (auto label : labels) {
+ target.writeSmallString(label);
+ }
+}
+
+TensorStore::EntryRef
+StreamedValueStore::store_tensor(const Value &tensor)
+{
+ assert(tensor.type() == _tensor_type);
+ CellsMemBlock cells_mem(tensor.cells());
+ size_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type);
+ size_t padding = alignment - 1;
+ vespalib::nbostream stream;
+ stream << uint32_t(cells_mem.num);
+ serialize_labels(tensor.index(), stream);
+ size_t mem_size = stream.size() + cells_mem.total_sz + padding;
+ auto raw = allocRawBuffer(mem_size);
+ char *target = raw.data;
+ memcpy(target, stream.peek(), sizeof(uint32_t));
+ stream.adjustReadPos(sizeof(uint32_t));
+ target += sizeof(uint32_t);
+ target = fix_alignment(target, alignment);
+ memcpy(target, cells_mem.ptr, cells_mem.total_sz);
+ target += cells_mem.total_sz;
+ memcpy(target, stream.peek(), stream.size());
+ target += stream.size();
+ assert(target <= raw.data + mem_size);
+ return raw.ref;
+}
+
+TensorStore::EntryRef
+StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded)
+{
+ const auto &factory = StreamedValueBuilderFactory::get();
+ auto val = vespalib::eval::decode_value(encoded, factory);
+ return store_tensor(*val);
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
new file mode 100644
index 00000000000..4e12296916d
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
@@ -0,0 +1,98 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "tensor_store.h"
+#include <vespa/eval/eval/value_type.h>
+#include <vespa/eval/eval/value.h>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/util/typify.h>
+
+namespace search::tensor {
+
+/**
+ * Class for storing tensors in memory, with a special serialization
+ * format that can be used directly to make a StreamedValueView.
+ *
+ * The tensor type is owned by the store itself and will not be
+ * serialized at all.
+ *
+ * The parameters for serialization (see DataFromType) are:
+ * - number of mapped dimensions [MD]
+ * - dense subspace size [DS]
+ * - size of each cell [CS] - currently 4 (float) or 8 (double)
+ * - alignment for cells [CA] - currently 4 (float) or 8 (double)
+ * While the tensor value to be serialized has:
+ * - number of dense subspaces [ND]
+ * - labels for dense subspaces, ND * MD strings
+ * - cell values, ND * DS cells (each either float or double)
+ * The serialization format looks like:
+ *
+ * [bytes] : [format] : [description]
+ * 4 : n.b.o. uint32_ t : num cells = ND * DS
+ * 1-7 : (none) : padding to cell alignment CA
+ * CS * ND * DS : native float or double : cells
+ * 4 : n.b.o. uint32_t : number of subspaces = ND
+ * (depends) : n.b.o. strings : labels
+ *
+ * Here, n.b.o. means network byte order, or more precisely
+ * it's the format vespalib::nbostream uses for the given data type,
+ * including strings (where exact format depends on the string length).
+ * Note that the only unpredictably-sized data (the labels) are kept
+ * last.
+ * If we ever make a "hbostream" which uses host byte order, we
+ * could switch to that instead since these data are only kept in
+ * memory.
+ */
+class StreamedValueStore : public TensorStore {
+public:
+ using RefType = vespalib::datastore::AlignedEntryRefT<22, 3>;
+ using DataStoreType = vespalib::datastore::DataStoreT<RefType>;
+
+ struct StreamedValueData {
+ bool valid;
+ vespalib::eval::TypedCells cells_ref;
+ size_t num_subspaces;
+ vespalib::ConstArrayRef<char> labels_buffer;
+ operator bool() const { return valid; }
+ };
+
+ struct DataFromType {
+ uint32_t num_mapped_dimensions;
+ uint32_t dense_subspace_size;
+ vespalib::eval::CellType cell_type;
+
+ DataFromType(const vespalib::eval::ValueType& type)
+ : num_mapped_dimensions(type.count_mapped_dimensions()),
+ dense_subspace_size(type.dense_subspace_size()),
+ cell_type(type.cell_type())
+ {}
+ };
+
+private:
+ DataStoreType _concreteStore;
+ vespalib::datastore::BufferType<char> _bufferType;
+ vespalib::eval::ValueType _tensor_type;
+ DataFromType _data_from_type;
+
+ void serialize_labels(const vespalib::eval::Value::Index &index,
+ vespalib::nbostream &target) const;
+
+ std::pair<const char *, uint32_t> getRawBuffer(RefType ref) const;
+ vespalib::datastore::Handle<char> allocRawBuffer(uint32_t size);
+public:
+ StreamedValueStore(const vespalib::eval::ValueType &tensor_type);
+ virtual ~StreamedValueStore();
+
+ virtual void holdTensor(EntryRef ref) override;
+ virtual EntryRef move(EntryRef ref) override;
+
+ StreamedValueData get_tensor_data(EntryRef ref) const;
+ bool encode_tensor(EntryRef ref, vespalib::nbostream &target) const;
+
+ EntryRef store_tensor(const vespalib::eval::Value &tensor);
+ EntryRef store_encoded_tensor(vespalib::nbostream &encoded);
+};
+
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
index 83988a3af11..35be27bc03b 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
@@ -1,9 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "tensor_deserialize.h"
#include <vespa/document/util/serializableexceptions.h>
#include <vespa/eval/eval/engine_or_factory.h>
#include <vespa/eval/eval/value.h>
-#include <vespa/vespalib/objects/nbostream.h>
using document::DeserializeException;
using vespalib::eval::EngineOrFactory;
@@ -11,14 +11,19 @@ using vespalib::eval::Value;
namespace search::tensor {
-std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size)
+std::unique_ptr<Value> deserialize_tensor(vespalib::nbostream &buffer)
{
- vespalib::nbostream wrapStream(data, size);
- auto tensor = EngineOrFactory::get().decode(wrapStream);
- if (wrapStream.size() != 0) {
+ auto tensor = EngineOrFactory::get().decode(buffer);
+ if (buffer.size() != 0) {
throw DeserializeException("Leftover bytes deserializing tensor attribute value.", VESPA_STRLOC);
}
return tensor;
}
+std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size)
+{
+ vespalib::nbostream wrapStream(data, size);
+ return deserialize_tensor(wrapStream);
+}
+
} // namespace
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
index 18e166543d6..6f9521c1355 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
@@ -1,10 +1,14 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/eval/eval/value.h>
+#include <vespa/vespalib/objects/nbostream.h>
namespace search::tensor {
extern std::unique_ptr<vespalib::eval::Value>
deserialize_tensor(const void *data, size_t size);
+extern std::unique_ptr<vespalib::eval::Value>
+deserialize_tensor(vespalib::nbostream &stream);
+
} // namespace