// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "streamed_value_store.h" #include #include #include #include #include #include #include #include #include #include #include LOG_SETUP(".searchlib.tensor.streamed_value_store"); using vespalib::datastore::Handle; using vespalib::datastore::EntryRef; using namespace vespalib::eval; using vespalib::ConstArrayRef; using vespalib::MemoryUsage; using vespalib::string_id; using vespalib::StringIdVector; namespace search::tensor { //----------------------------------------------------------------------------- namespace { template void each_subspace(const Value &value, size_t num_mapped, size_t dense_size, F f) { size_t subspace; std::vector addr(num_mapped); std::vector refs; refs.reserve(addr.size()); for (string_id &label: addr) { refs.push_back(&label); } auto cells = value.cells().typify(); auto view = value.index().create_view({}); view->lookup({}); while (view->next_result(refs, subspace)) { size_t offset = subspace * dense_size; f(ConstArrayRef(addr), ConstArrayRef(cells.begin() + offset, dense_size)); } } using TensorEntry = StreamedValueStore::TensorEntry; struct CreateTensorEntry { template static TensorEntry::SP invoke(const Value &value, size_t num_mapped, size_t dense_size) { using EntryImpl = StreamedValueStore::TensorEntryImpl; return std::make_shared(value, num_mapped, dense_size); } }; struct MyFastValueView final : Value { const ValueType &my_type; FastValueIndex my_index; TypedCells my_cells; MyFastValueView(const ValueType &type_ref, const StringIdVector &handle_view, TypedCells cells, size_t num_mapped, size_t num_spaces) : my_type(type_ref), my_index(num_mapped, handle_view, num_spaces), my_cells(cells) { const StringIdVector &labels = handle_view; for (size_t i = 0; i < num_spaces; ++i) { ConstArrayRef addr(&labels[i * num_mapped], num_mapped); my_index.map.add_mapping(FastAddrMap::hash_labels(addr)); } assert(my_index.map.size() == num_spaces); } const ValueType &type() const override { return my_type; } const Value::Index &index() const override { return my_index; } TypedCells cells() const override { return my_cells; } MemoryUsage get_memory_usage() const override { MemoryUsage usage = self_memory_usage(); usage.merge(my_index.map.estimate_extra_memory_usage()); return usage; } }; } // //----------------------------------------------------------------------------- StreamedValueStore::TensorEntry::~TensorEntry() = default; StreamedValueStore::TensorEntry::SP StreamedValueStore::TensorEntry::create_shared_entry(const Value &value) { size_t num_mapped = value.type().count_mapped_dimensions(); size_t dense_size = value.type().dense_subspace_size(); return vespalib::typify_invoke<1,TypifyCellType,CreateTensorEntry>(value.type().cell_type(), value, num_mapped, dense_size); } template StreamedValueStore::TensorEntryImpl::TensorEntryImpl(const Value &value, size_t num_mapped, size_t dense_size) : handles(), cells() { handles.reserve(num_mapped * value.index().size()); cells.reserve(dense_size * value.index().size()); auto store_subspace = [&](auto addr, auto data) { for (string_id label: addr) { handles.push_back(label); } for (CT entry: data) { cells.push_back(entry); } }; each_subspace(value, num_mapped, dense_size, store_subspace); } template Value::UP StreamedValueStore::TensorEntryImpl::create_fast_value_view(const ValueType &type_ref) const { size_t num_mapped = type_ref.count_mapped_dimensions(); size_t dense_size = type_ref.dense_subspace_size(); size_t num_spaces = cells.size() / dense_size; assert(dense_size * num_spaces == cells.size()); assert(num_mapped * num_spaces == handles.view().size()); return std::make_unique(type_ref, handles.view(), TypedCells(cells), num_mapped, num_spaces); } template void StreamedValueStore::TensorEntryImpl::encode_value(const ValueType &type, vespalib::nbostream &target) const { size_t num_mapped = type.count_mapped_dimensions(); size_t dense_size = type.dense_subspace_size(); size_t num_spaces = cells.size() / dense_size; assert(dense_size * num_spaces == cells.size()); assert(num_mapped * num_spaces == handles.view().size()); StreamedValueView my_value(type, num_mapped, TypedCells(cells), num_spaces, handles.view()); ::vespalib::eval::encode_value(my_value, target); } template MemoryUsage StreamedValueStore::TensorEntryImpl::get_memory_usage() const { MemoryUsage usage = self_memory_usage>(); usage.merge(vector_extra_memory_usage(handles.view())); usage.merge(vector_extra_memory_usage(cells)); return usage; } template StreamedValueStore::TensorEntryImpl::~TensorEntryImpl() = default; //----------------------------------------------------------------------------- constexpr size_t MIN_BUFFER_ARRAYS = 8_Ki; StreamedValueStore::TensorBufferType::TensorBufferType() noexcept : ParentType(1, MIN_BUFFER_ARRAYS, TensorStoreType::RefType::offsetSize()) { } void StreamedValueStore::TensorBufferType::cleanHold(void* buffer, size_t offset, ElemCount num_elems, CleanContext clean_ctx) { TensorEntry::SP* elem = static_cast(buffer) + offset; const auto& empty = empty_entry(); for (size_t i = 0; i < num_elems; ++i) { clean_ctx.extraBytesCleaned((*elem)->get_memory_usage().allocatedBytes()); *elem = empty; ++elem; } } StreamedValueStore::StreamedValueStore(const ValueType &tensor_type) : TensorStore(_concrete_store), _concrete_store(std::make_unique()), _tensor_type(tensor_type) { _concrete_store.enableFreeLists(); } StreamedValueStore::~StreamedValueStore() = default; EntryRef StreamedValueStore::add_entry(TensorEntry::SP tensor) { auto ref = _concrete_store.addEntry(tensor); auto& state = _concrete_store.getBufferState(RefType(ref).bufferId()); state.incExtraUsedBytes(tensor->get_memory_usage().allocatedBytes()); return ref; } const StreamedValueStore::TensorEntry * StreamedValueStore::get_tensor_entry(EntryRef ref) const { if (!ref.valid()) { return nullptr; } const auto& entry = _concrete_store.getEntry(ref); assert(entry); return entry.get(); } void StreamedValueStore::holdTensor(EntryRef ref) { if (!ref.valid()) { return; } const auto& tensor = _concrete_store.getEntry(ref); assert(tensor); _concrete_store.holdElem(ref, 1, tensor->get_memory_usage().allocatedBytes()); } TensorStore::EntryRef StreamedValueStore::move(EntryRef ref) { if (!ref.valid()) { return EntryRef(); } const auto& old_tensor = _concrete_store.getEntry(ref); assert(old_tensor); auto new_ref = add_entry(old_tensor); _concrete_store.holdElem(ref, 1, old_tensor->get_memory_usage().allocatedBytes()); return new_ref; } bool StreamedValueStore::encode_tensor(EntryRef ref, vespalib::nbostream &target) const { if (const auto * entry = get_tensor_entry(ref)) { entry->encode_value(_tensor_type, target); return true; } else { return false; } } TensorStore::EntryRef StreamedValueStore::store_tensor(const Value &tensor) { assert(tensor.type() == _tensor_type); return add_entry(TensorEntry::create_shared_entry(tensor)); } TensorStore::EntryRef StreamedValueStore::store_encoded_tensor(vespalib::nbostream &encoded) { const auto &factory = StreamedValueBuilderFactory::get(); auto val = vespalib::eval::decode_value(encoded, factory); return store_tensor(*val); } }