summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2020-11-17 20:52:33 +0000
committerArne Juul <arnej@verizonmedia.com>2020-11-26 13:35:08 +0000
commit1644ca6e82ca22275e8d350724d995682ed87b1f (patch)
tree3beb80ee15744c3d39483b8dc7c9e5848455c823 /searchlib
parent01fbc29b61328fdad6c8607b5099ffd3b5cf45a1 (diff)
add SerializedFastValueAttribute
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/features/tensor/tensor_test.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/tensor/CMakeLists.txt3
-rw-r--r--searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp210
-rw-r--r--searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h26
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp48
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h35
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp241
-rw-r--r--searchlib/src/vespa/searchlib/tensor/streamed_value_store.h67
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp15
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h4
11 files changed, 649 insertions, 10 deletions
diff --git a/searchlib/src/tests/features/tensor/tensor_test.cpp b/searchlib/src/tests/features/tensor/tensor_test.cpp
index 116b4ed2bb5..c0b25eeeb1b 100644
--- a/searchlib/src/tests/features/tensor/tensor_test.cpp
+++ b/searchlib/src/tests/features/tensor/tensor_test.cpp
@@ -164,9 +164,9 @@ TEST_F("require that tensor attribute can be extracted as tensor in attribute fe
ExecFixture("attribute(tensorattr)"))
{
EXPECT_EQUAL(*makeTensor<Value>(TensorSpec("tensor(x{})")
- .add({{"x", "b"}}, 5)
- .add({{"x", "c"}}, 7)
- .add({{"x", "a"}}, 3)), f.execute());
+ .add({{"x", "b"}}, 5)
+ .add({{"x", "c"}}, 7)
+ .add({{"x", "a"}}, 3)), f.execute());
}
TEST_F("require that direct tensor attribute can be extracted in attribute feature",
diff --git a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
index 148d18f79ff..7325ab0d414 100644
--- a/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
+++ b/searchlib/src/vespa/searchlib/attribute/createsinglestd.cpp
@@ -8,7 +8,7 @@
#include "singlestringattribute.h"
#include "singleboolattribute.h"
#include <vespa/searchlib/tensor/dense_tensor_attribute.h>
-#include <vespa/searchlib/tensor/serialized_tensor_attribute.h>
+#include <vespa/searchlib/tensor/serialized_fast_value_attribute.h>
namespace search {
@@ -46,7 +46,7 @@ AttributeFactory::createSingleStd(stringref name, const Config & info)
if (info.tensorType().is_dense()) {
return std::make_shared<tensor::DenseTensorAttribute>(name, info);
} else {
- return std::make_shared<tensor::SerializedTensorAttribute>(name, info);
+ return std::make_shared<tensor::SerializedFastValueAttribute>(name, info);
}
case BasicType::REFERENCE:
return std::make_shared<attribute::ReferenceAttribute>(name, info);
diff --git a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
index fac6d015a5f..79b18a57a34 100644
--- a/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/tensor/CMakeLists.txt
@@ -25,5 +25,8 @@ vespa_add_library(searchlib_tensor OBJECT
tensor_attribute.cpp
tensor_deserialize.cpp
tensor_store.cpp
+ serialized_fast_value_attribute.cpp
+ streamed_value_saver.cpp
+ streamed_value_store.cpp
DEPENDS
)
diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
new file mode 100644
index 00000000000..35c6ca42fe3
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
@@ -0,0 +1,210 @@
+#include "serialized_fast_value_attribute.h"
+#include "streamed_value_saver.h"
+#include <vespa/eval/eval/value.h>
+#include <vespa/eval/eval/fast_value.hpp>
+#include <vespa/eval/streamed/streamed_value_utils.h>
+#include <vespa/fastlib/io/bufferedfile.h>
+#include <vespa/searchlib/attribute/readerbase.h>
+#include <vespa/searchlib/util/fileutil.h>
+#include <vespa/vespalib/util/rcuvector.hpp>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".searchlib.tensor.serialized_fast_value_attribute");
+
+#include "blob_sequence_reader.h"
+#include "tensor_attribute.hpp"
+
+using namespace vespalib;
+using namespace vespalib::eval;
+
+namespace search::tensor {
+
+namespace {
+
+constexpr uint32_t TENSOR_ATTRIBUTE_VERSION = 0;
+
+struct ValueBlock : LabelBlock {
+ TypedCells cells;
+};
+
+class ValueBlockStream {
+private:
+ const StreamedValueStore::DataFromType &_from_type;
+ LabelBlockStream _label_block_stream;
+ const char *_cells_ptr;
+
+ size_t dsss() const { return _from_type.dense_subspace_size; }
+ auto cell_type() const { return _from_type.cell_type; }
+public:
+ ValueBlock next_block() {
+ auto labels = _label_block_stream.next_block();
+ if (labels) {
+ TypedCells subspace_cells(_cells_ptr, cell_type(), dsss());
+ _cells_ptr += CellTypeUtils::mem_size(cell_type(), dsss());
+ return ValueBlock{labels, subspace_cells};
+ } else {
+ TypedCells none(nullptr, cell_type(), 0);
+ return ValueBlock{labels, none};
+ }
+ }
+
+ ValueBlockStream(const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+ : _from_type(from_type),
+ _label_block_stream(from_store.num_subspaces,
+ from_store.labels_buffer,
+ from_type.num_mapped_dimensions),
+ _cells_ptr((const char *)from_store.cells_ref.data)
+ {
+ _label_block_stream.reset();
+ }
+
+ ~ValueBlockStream();
+};
+
+ValueBlockStream::~ValueBlockStream() = default;
+
+class OnlyFastValueIndex : public Value {
+private:
+ const ValueType &_type;
+ TypedCells _cells;
+ FastValueIndex my_index;
+public:
+ OnlyFastValueIndex(const ValueType &type,
+ const StreamedValueStore::DataFromType &from_type,
+ const StreamedValueStore::StreamedValueData &from_store)
+ : _type(type),
+ _cells(from_store.cells_ref),
+ my_index(from_type.num_mapped_dimensions,
+ from_store.num_subspaces)
+ {
+ assert(_type.cell_type() == _cells.type);
+ std::vector<vespalib::stringref> address(from_type.num_mapped_dimensions);
+ auto block_stream = ValueBlockStream(from_type, from_store);
+ size_t ss = 0;
+ while (auto block = block_stream.next_block()) {
+ size_t idx = my_index.map.add_mapping(block.address);
+ if (idx != ss) {
+ LOG(error, "add_mapping returned idx=%zu for subspace %zu", idx, ss);
+ }
+ ++ss;
+ }
+ if (ss != from_store.num_subspaces) {
+ LOG(error, "expected %zu subspaces but got %zu", from_store.num_subspaces, ss);
+ abort();
+ }
+ }
+
+ ~OnlyFastValueIndex();
+
+ const ValueType &type() const final override { return _type; }
+ TypedCells cells() const final override { return _cells; }
+ const Index &index() const final override { return my_index; }
+ vespalib::MemoryUsage get_memory_usage() const final override {
+ auto usage = self_memory_usage<OnlyFastValueIndex>();
+ usage.merge(my_index.map.estimate_extra_memory_usage());
+ return usage;
+ }
+};
+
+OnlyFastValueIndex::~OnlyFastValueIndex() = default;
+
+}
+
+SerializedFastValueAttribute::SerializedFastValueAttribute(stringref name, const Config &cfg)
+ : TensorAttribute(name, cfg, _streamedValueStore),
+ _tensor_type(cfg.tensorType()),
+ _streamedValueStore(_tensor_type),
+ _data_from_type(_tensor_type)
+{
+}
+
+
+SerializedFastValueAttribute::~SerializedFastValueAttribute()
+{
+ getGenerationHolder().clearHoldLists();
+ _tensorStore.clearHoldLists();
+}
+
+void
+SerializedFastValueAttribute::setTensor(DocId docId, const vespalib::eval::Value &tensor)
+{
+ EntryRef ref = _streamedValueStore.store_tensor(tensor);
+ setTensorRef(docId, ref);
+ if (!ref.valid()) {
+ checkTensorType(tensor);
+ }
+}
+
+std::unique_ptr<Value>
+SerializedFastValueAttribute::getTensor(DocId docId) const
+{
+ EntryRef ref;
+ if (docId < getCommittedDocIdLimit()) {
+ ref = _refVector[docId];
+ }
+ if (!ref.valid()) {
+ return {};
+ }
+ if (auto data_from_store = _streamedValueStore.get_tensor_data(ref)) {
+ return std::make_unique<OnlyFastValueIndex>(_tensor_type,
+ _data_from_type,
+ data_from_store);
+ }
+ return {};
+}
+
+
+bool
+SerializedFastValueAttribute::onLoad()
+{
+ BlobSequenceReader tensorReader(*this);
+ if (!tensorReader.hasData()) {
+ return false;
+ }
+ setCreateSerialNum(tensorReader.getCreateSerialNum());
+ assert(tensorReader.getVersion() == TENSOR_ATTRIBUTE_VERSION);
+ uint32_t numDocs(tensorReader.getDocIdLimit());
+ _refVector.reset();
+ _refVector.unsafe_reserve(numDocs);
+ vespalib::Array<char> buffer(1024);
+ for (uint32_t lid = 0; lid < numDocs; ++lid) {
+ uint32_t tensorSize = tensorReader.getNextSize();
+ if (tensorSize != 0) {
+ if (tensorSize > buffer.size()) {
+ buffer.resize(tensorSize + 1024);
+ }
+ tensorReader.readBlob(&buffer[0], tensorSize);
+ vespalib::nbostream source(&buffer[0], tensorSize);
+ EntryRef ref = _streamedValueStore.store_encoded_tensor(source);
+ _refVector.push_back(ref);
+ } else {
+ EntryRef invalid;
+ _refVector.push_back(invalid);
+ }
+ }
+ setNumDocs(numDocs);
+ setCommittedDocIdLimit(numDocs);
+ return true;
+}
+
+
+std::unique_ptr<AttributeSaver>
+SerializedFastValueAttribute::onInitSave(vespalib::stringref fileName)
+{
+ vespalib::GenerationHandler::Guard guard(getGenerationHandler().
+ takeGuard());
+ return std::make_unique<StreamedValueSaver>
+ (std::move(guard),
+ this->createAttributeHeader(fileName),
+ getRefCopy(),
+ _streamedValueStore);
+}
+
+void
+SerializedFastValueAttribute::compactWorst()
+{
+ doCompactWorst<StreamedValueStore::RefType>();
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h
new file mode 100644
index 00000000000..a1adaaa5632
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.h
@@ -0,0 +1,26 @@
+
+#pragma once
+
+#include "tensor_attribute.h"
+#include "streamed_value_store.h"
+
+namespace search::tensor {
+
+/**
+ * Attribute vector class storing FastValue-like mixed tensors for all documents in memory.
+ */
+class SerializedFastValueAttribute : public TensorAttribute {
+ vespalib::eval::ValueType _tensor_type;
+ StreamedValueStore _streamedValueStore; // data store for serialized tensors
+ const StreamedValueStore::DataFromType _data_from_type;
+public:
+ SerializedFastValueAttribute(vespalib::stringref baseFileName, const Config &cfg);
+ virtual ~SerializedFastValueAttribute();
+ virtual void setTensor(DocId docId, const vespalib::eval::Value &tensor) override;
+ virtual std::unique_ptr<vespalib::eval::Value> getTensor(DocId docId) const override;
+ virtual bool onLoad() override;
+ virtual std::unique_ptr<AttributeSaver> onInitSave(vespalib::stringref fileName) override;
+ virtual void compactWorst() override;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp
new file mode 100644
index 00000000000..d4fd681f2cb
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.cpp
@@ -0,0 +1,48 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "streamed_value_saver.h"
+#include "streamed_value_store.h"
+
+#include <vespa/searchlib/attribute/iattributesavetarget.h>
+#include <vespa/searchlib/util/bufferwriter.h>
+#include <vespa/vespalib/objects/nbostream.h>
+
+using vespalib::GenerationHandler;
+
+namespace search::tensor {
+
+StreamedValueSaver::
+StreamedValueSaver(GenerationHandler::Guard &&guard,
+ const attribute::AttributeHeader &header,
+ RefCopyVector &&refs,
+ const StreamedValueStore &tensorStore)
+ : AttributeSaver(std::move(guard), header),
+ _refs(std::move(refs)),
+ _tensorStore(tensorStore)
+{
+}
+
+StreamedValueSaver::~StreamedValueSaver() = default;
+
+bool
+StreamedValueSaver::onSave(IAttributeSaveTarget &saveTarget)
+{
+ auto datWriter = saveTarget.datWriter().allocBufferWriter();
+ const uint32_t docIdLimit(_refs.size());
+ vespalib::nbostream stream;
+ for (uint32_t lid = 0; lid < docIdLimit; ++lid) {
+ if (_tensorStore.encode_tensor(_refs[lid], stream)) {
+ uint32_t sz = stream.size();
+ datWriter->write(&sz, sizeof(sz));
+ datWriter->write(stream.peek(), stream.size());
+ stream.clear();
+ } else {
+ uint32_t sz = 0;
+ datWriter->write(&sz, sizeof(sz));
+ }
+ }
+ datWriter->flush();
+ return true;
+}
+
+} // namespace search::tensor
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h
new file mode 100644
index 00000000000..71d56539679
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_saver.h
@@ -0,0 +1,35 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchlib/attribute/attributesaver.h>
+#include "tensor_attribute.h"
+
+namespace search::tensor {
+
+class StreamedValueStore;
+
+/*
+ * Class for saving a tensor attribute.
+ */
+class StreamedValueSaver : public AttributeSaver
+{
+public:
+ using RefCopyVector = TensorAttribute::RefCopyVector;
+private:
+ using GenerationHandler = vespalib::GenerationHandler;
+
+ RefCopyVector _refs;
+ const StreamedValueStore &_tensorStore;
+
+ bool onSave(IAttributeSaveTarget &saveTarget) override;
+public:
+ StreamedValueSaver(GenerationHandler::Guard &&guard,
+ const attribute::AttributeHeader &header,
+ RefCopyVector &&refs,
+ const StreamedValueStore &tensorStore);
+
+ virtual ~StreamedValueSaver();
+};
+
+} // namespace search::tensor
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
new file mode 100644
index 00000000000..0210bb00617
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.cpp
@@ -0,0 +1,241 @@
+
+#include "streamed_value_store.h"
+#include "tensor_deserialize.h"
+#include <vespa/eval/eval/value.h>
+#include <vespa/eval/eval/value_codec.h>
+#include <vespa/eval/streamed/streamed_value_builder_factory.h>
+#include <vespa/eval/streamed/streamed_value_view.h>
+#include <vespa/vespalib/datastore/datastore.hpp>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".searchlib.tensor.streamed_value_store");
+
+using vespalib::datastore::Handle;
+using namespace vespalib::eval;
+
+namespace search::tensor {
+
+namespace {
+
+constexpr size_t MIN_BUFFER_ARRAYS = 1024;
+
+struct CellsMemBlock {
+ uint32_t num;
+ uint32_t total_sz;
+ const char *ptr;
+ CellsMemBlock(TypedCells cells)
+ : num(cells.size),
+ total_sz(CellTypeUtils::mem_size(cells.type, num)),
+ ptr((const char *)cells.data)
+ {}
+};
+
+template<typename T>
+T *fix_alignment(T *ptr, size_t align)
+{
+ static_assert(sizeof(T) == 1);
+ assert((align & (align-1)) == 0); // must be 2^N
+ size_t ptr_val = (size_t)ptr;
+ size_t unalign = ptr_val & (align - 1);
+ if (unalign == 0) {
+ return ptr;
+ } else {
+ return ptr + (align - unalign);
+ }
+}
+
+} // namespace <unnamed>
+
+StreamedValueStore::StreamedValueStore(const ValueType &tensor_type)
+ : TensorStore(_concreteStore),
+ _concreteStore(),
+ _bufferType(RefType::align(1),
+ MIN_BUFFER_ARRAYS,
+ RefType::offsetSize() / RefType::align(1)),
+ _tensor_type(tensor_type),
+ _data_from_type(_tensor_type)
+{
+ _store.addType(&_bufferType);
+ _store.initActiveBuffers();
+}
+
+StreamedValueStore::~StreamedValueStore()
+{
+ _store.dropBuffers();
+}
+
+std::pair<const char *, uint32_t>
+StreamedValueStore::getRawBuffer(RefType ref) const
+{
+ if (!ref.valid()) {
+ return std::make_pair(nullptr, 0u);
+ }
+ const char *buf = _store.getEntry<char>(ref);
+ uint32_t len = *reinterpret_cast<const uint32_t *>(buf);
+ return std::make_pair(buf + sizeof(uint32_t), len);
+}
+
+Handle<char>
+StreamedValueStore::allocRawBuffer(uint32_t size)
+{
+ if (size == 0) {
+ return Handle<char>();
+ }
+ size_t extSize = size + sizeof(uint32_t);
+ size_t bufSize = RefType::align(extSize);
+ auto result = _concreteStore.rawAllocator<char>(_typeId).alloc(bufSize);
+ *reinterpret_cast<uint32_t *>(result.data) = size;
+ char *padWritePtr = result.data + extSize;
+ for (size_t i = extSize; i < bufSize; ++i) {
+ *padWritePtr++ = 0;
+ }
+ // Hide length of buffer (first 4 bytes) from users of the buffer.
+ return Handle<char>(result.ref, result.data + sizeof(uint32_t));
+}
+
+void
+StreamedValueStore::holdTensor(EntryRef ref)
+{
+ if (!ref.valid()) {
+ return;
+ }
+ RefType iRef(ref);
+ const char *buf = _store.getEntry<char>(iRef);
+ uint32_t len = *reinterpret_cast<const uint32_t *>(buf);
+ _concreteStore.holdElem(ref, len + sizeof(uint32_t));
+}
+
+TensorStore::EntryRef
+StreamedValueStore::move(EntryRef ref)
+{
+ if (!ref.valid()) {
+ return RefType();
+ }
+ auto oldraw = getRawBuffer(ref);
+ auto newraw = allocRawBuffer(oldraw.second);
+ memcpy(newraw.data, oldraw.first, oldraw.second);
+ _concreteStore.holdElem(ref, oldraw.second + sizeof(uint32_t));
+ return newraw.ref;
+}
+
+StreamedValueStore::StreamedValueData
+StreamedValueStore::get_tensor_data(EntryRef ref) const
+{
+ StreamedValueData retval;
+ retval.valid = false;
+ auto raw = getRawBuffer(ref);
+ if (raw.second == 0u) {
+ return retval;
+ }
+ vespalib::nbostream_longlivedbuf source(raw.first, raw.second);
+ uint32_t num_cells = source.readValue<uint32_t>();
+ {
+ uint32_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type);
+ const char *aligned_ptr = fix_alignment(source.peek(), alignment);
+ size_t adjustment = aligned_ptr - source.peek();
+ source.adjustReadPos(adjustment);
+ }
+ retval.cells_ref = TypedCells(source.peek(), _data_from_type.cell_type, num_cells);
+ source.adjustReadPos(CellTypeUtils::mem_size(_data_from_type.cell_type, num_cells));
+ retval.num_subspaces = source.readValue<uint32_t>();
+ retval.labels_buffer = vespalib::ConstArrayRef<char>(source.peek(), source.size());
+
+ if (retval.num_subspaces * _data_from_type.dense_subspace_size == num_cells) {
+ retval.valid = true;
+ return retval;
+ }
+ LOG(warning, "inconsistent stored tensor data: "
+ "num_subspaces[%zu] * dense_subspace_size[%u] = %zu != num_cells[%u]",
+ retval.num_subspaces, _data_from_type.dense_subspace_size,
+ retval.num_subspaces * _data_from_type.dense_subspace_size,
+ num_cells);
+ return retval;
+}
+
+bool
+StreamedValueStore::encode_tensor(EntryRef ref, vespalib::nbostream &target) const
+{
+ if (auto data = get_tensor_data(ref)) {
+ StreamedValueView value(
+ _tensor_type, _data_from_type.num_mapped_dimensions,
+ data.cells_ref, data.num_subspaces, data.labels_buffer);
+ vespalib::eval::encode_value(value, target);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void
+StreamedValueStore::my_encode(const Value::Index &index,
+ vespalib::nbostream &target) const
+{
+ uint32_t num_subspaces = index.size();
+ target << num_subspaces;
+ uint32_t num_mapped_dims = _data_from_type.num_mapped_dimensions;
+ std::vector<vespalib::stringref> labels(num_mapped_dims * num_subspaces);
+ auto view = index.create_view({});
+ view->lookup({});
+ std::vector<vespalib::stringref> addr(num_mapped_dims);
+ std::vector<vespalib::stringref *> addr_refs;
+ for (auto & label : addr) {
+ addr_refs.push_back(&label);
+ }
+ size_t subspace;
+ for (size_t ss = 0; ss < num_subspaces; ++ss) {
+ bool ok = view->next_result(addr_refs, subspace);
+ assert(ok);
+ size_t idx = subspace * num_mapped_dims;
+ for (auto label : addr) {
+ labels[idx++] = label;
+ }
+ }
+ bool ok = view->next_result(addr_refs, subspace);
+ assert(!ok);
+ for (auto label : labels) {
+ target.writeSmallString(label);
+ }
+}
+
+TensorStore::EntryRef
+StreamedValueStore::store_tensor(const Value &tensor)
+{
+ if (tensor.type() == _tensor_type) {
+ CellsMemBlock cells_mem(tensor.cells());
+ size_t alignment = CellTypeUtils::alignment(_data_from_type.cell_type);
+ size_t padding = alignment - 1;
+ vespalib::nbostream stream;
+ stream << uint32_t(cells_mem.num);
+ my_encode(tensor.index(), stream);
+ size_t mem_size = stream.size() + cells_mem.total_sz + padding;
+ auto raw = allocRawBuffer(mem_size);
+ char *target = raw.data;
+ memcpy(target, stream.peek(), sizeof(uint32_t));
+ stream.adjustReadPos(sizeof(uint32_t));
+ target += sizeof(uint32_t);
+ target = fix_alignment(target, alignment);
+ memcpy(target, cells_mem.ptr, cells_mem.total_sz);
+ target += cells_mem.total_sz;
+ memcpy(target, stream.peek(), stream.size());
+ target += stream.size();
+ assert(target <= raw.data + mem_size);
+ return raw.ref;
+ } else {
+ LOG(error, "trying to store tensor of type %s in store only allowing %s",
+ tensor.type().to_spec().c_str(), _tensor_type.to_spec().c_str());
+ TensorStore::EntryRef invalid;
+ return invalid;
+ }
+}
+
+TensorStore::EntryRef
+StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded)
+{
+ const auto &factory = StreamedValueBuilderFactory::get();
+ auto val = vespalib::eval::decode_value(encoded, factory);
+ return store_tensor(*val);
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
new file mode 100644
index 00000000000..94f1f7a4790
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/tensor/streamed_value_store.h
@@ -0,0 +1,67 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "tensor_store.h"
+#include <vespa/eval/eval/value_type.h>
+#include <vespa/eval/eval/value.h>
+#include <vespa/vespalib/objects/nbostream.h>
+#include <vespa/vespalib/util/typify.h>
+
+namespace search::tensor {
+
+/**
+ * Class for storing serialized tensors in memory
+ */
+class StreamedValueStore : public TensorStore {
+public:
+ using RefType = vespalib::datastore::AlignedEntryRefT<22, 2>;
+ using DataStoreType = vespalib::datastore::DataStoreT<RefType>;
+
+ struct StreamedValueData {
+ bool valid;
+ vespalib::eval::TypedCells cells_ref;
+ size_t num_subspaces;
+ vespalib::ConstArrayRef<char> labels_buffer;
+ operator bool() const { return valid; }
+ };
+
+ struct DataFromType {
+ uint32_t num_mapped_dimensions;
+ uint32_t dense_subspace_size;
+ vespalib::eval::CellType cell_type;
+
+ DataFromType(const vespalib::eval::ValueType& type)
+ : num_mapped_dimensions(type.count_mapped_dimensions()),
+ dense_subspace_size(type.dense_subspace_size()),
+ cell_type(type.cell_type())
+ {}
+ };
+
+private:
+ DataStoreType _concreteStore;
+ vespalib::datastore::BufferType<char> _bufferType;
+ vespalib::eval::ValueType _tensor_type;
+ DataFromType _data_from_type;
+
+ void my_encode(const vespalib::eval::Value::Index &index,
+ vespalib::nbostream &target) const;
+
+ std::pair<const char *, uint32_t> getRawBuffer(RefType ref) const;
+ vespalib::datastore::Handle<char> allocRawBuffer(uint32_t size);
+public:
+ StreamedValueStore(const vespalib::eval::ValueType &tensor_type);
+ virtual ~StreamedValueStore();
+
+ virtual void holdTensor(EntryRef ref) override;
+ virtual EntryRef move(EntryRef ref) override;
+
+ StreamedValueData get_tensor_data(EntryRef ref) const;
+ bool encode_tensor(EntryRef ref, vespalib::nbostream &target) const;
+
+ EntryRef store_tensor(const vespalib::eval::Value &tensor);
+ EntryRef store_encoded_tensor(vespalib::nbostream &encoded);
+};
+
+
+}
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
index 83988a3af11..35be27bc03b 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.cpp
@@ -1,9 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "tensor_deserialize.h"
#include <vespa/document/util/serializableexceptions.h>
#include <vespa/eval/eval/engine_or_factory.h>
#include <vespa/eval/eval/value.h>
-#include <vespa/vespalib/objects/nbostream.h>
using document::DeserializeException;
using vespalib::eval::EngineOrFactory;
@@ -11,14 +11,19 @@ using vespalib::eval::Value;
namespace search::tensor {
-std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size)
+std::unique_ptr<Value> deserialize_tensor(vespalib::nbostream &buffer)
{
- vespalib::nbostream wrapStream(data, size);
- auto tensor = EngineOrFactory::get().decode(wrapStream);
- if (wrapStream.size() != 0) {
+ auto tensor = EngineOrFactory::get().decode(buffer);
+ if (buffer.size() != 0) {
throw DeserializeException("Leftover bytes deserializing tensor attribute value.", VESPA_STRLOC);
}
return tensor;
}
+std::unique_ptr<Value> deserialize_tensor(const void *data, size_t size)
+{
+ vespalib::nbostream wrapStream(data, size);
+ return deserialize_tensor(wrapStream);
+}
+
} // namespace
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
index 18e166543d6..6f9521c1355 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_deserialize.h
@@ -1,10 +1,14 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/eval/eval/value.h>
+#include <vespa/vespalib/objects/nbostream.h>
namespace search::tensor {
extern std::unique_ptr<vespalib::eval::Value>
deserialize_tensor(const void *data, size_t size);
+extern std::unique_ptr<vespalib::eval::Value>
+deserialize_tensor(vespalib::nbostream &stream);
+
} // namespace