diff options
author | Geir Storli <geirst@yahooinc.com> | 2022-03-21 16:06:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-21 16:06:11 +0100 |
commit | 4a6506e8e721d75d91e1398a876653df1e749d2b (patch) | |
tree | de32fcf0139a337bdac3b87aea49ce61eac714d9 | |
parent | 34cf5db6ceeb838f1850558984cbe5e06d63870f (diff) | |
parent | 439bced1645e7e86b43d5da74c3ac48d9cae9a67 (diff) |
Merge pull request #21759 from vespa-engine/toregge/use-atomic-entry-ref-in-tensor-attribute
Use AtomicEntryRef in tensor attribute.
6 files changed, 33 insertions, 28 deletions
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index d376fb020be..a3b197ac499 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -113,7 +113,7 @@ DenseTensorAttribute::internal_set_tensor(DocId docid, const vespalib::eval::Val void DenseTensorAttribute::consider_remove_from_index(DocId docid) { - if (_index && _refVector[docid].valid()) { + if (_index && _refVector[docid].load_relaxed().valid()) { _index->remove_document(docid); } } @@ -209,7 +209,7 @@ DenseTensorAttribute::getTensor(DocId docId) const { EntryRef ref; if (docId < getCommittedDocIdLimit()) { - ref = _refVector[docId]; + ref = acquire_entry_ref(docId); } if (!ref.valid()) { return {}; @@ -222,7 +222,7 @@ DenseTensorAttribute::extract_cells_ref(DocId docId) const { EntryRef ref; if (docId < getCommittedDocIdLimit()) { - ref = _refVector[docId]; + ref = acquire_entry_ref(docId); } return _denseTensorStore.get_typed_cells(ref); } @@ -370,12 +370,12 @@ DenseTensorAttribute::onLoad(vespalib::Executor *executor) if (reader.is_present()) { auto raw = _denseTensorStore.allocRawBuffer(); reader.readTensor(raw.data, _denseTensorStore.getBufSize()); - _refVector.push_back(raw.ref); + _refVector.push_back(AtomicEntryRef(raw.ref)); if (loader) { loader->load(lid, raw.ref); } } else { - _refVector.push_back(EntryRef()); + _refVector.push_back(AtomicEntryRef()); } } if (loader) { @@ -483,8 +483,7 @@ DenseTensorAttribute::onShrinkLidSpace() vespalib::eval::TypedCells DenseTensorAttribute::get_vector(uint32_t docid) const { - assert(docid < _refVector.size()); - EntryRef ref = _refVector[docid]; + EntryRef ref = acquire_entry_ref(docid); return _denseTensorStore.get_typed_cells(ref); } diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp index 0ae55a670da..fbb55bf396c 100644 --- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp @@ -50,10 +50,10 @@ DirectTensorAttribute::onLoad(vespalib::Executor *) tensorReader.readBlob(&buffer[0], tensorSize); auto tensor = deserialize_tensor(&buffer[0], tensorSize); EntryRef ref = _direct_store.store_tensor(std::move(tensor)); - _refVector.push_back(ref); + _refVector.push_back(AtomicEntryRef(ref)); } else { EntryRef invalid; - _refVector.push_back(invalid); + _refVector.push_back(AtomicEntryRef(invalid)); } } setNumDocs(numDocs); @@ -82,7 +82,7 @@ DirectTensorAttribute::update_tensor(DocId docId, { EntryRef ref; if (docId < getCommittedDocIdLimit()) { - ref = _refVector[docId]; + ref = _refVector[docId].load_relaxed(); } if (ref.valid()) { auto ptr = _direct_store.get_tensor(ref); @@ -107,7 +107,7 @@ DirectTensorAttribute::getTensor(DocId docId) const { EntryRef ref; if (docId < getCommittedDocIdLimit()) { - ref = _refVector[docId]; + ref = acquire_entry_ref(docId); } if (ref.valid()) { auto ptr = _direct_store.get_tensor(ref); @@ -124,7 +124,7 @@ DirectTensorAttribute::get_tensor_ref(DocId docId) const { EntryRef ref; if (docId < getCommittedDocIdLimit()) { - ref = _refVector[docId]; + ref = acquire_entry_ref(docId); } if (ref.valid()) { auto ptr = _direct_store.get_tensor(ref); diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp index a0ec04b98cb..87537298d1f 100644 --- a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp @@ -45,7 +45,7 @@ SerializedFastValueAttribute::getTensor(DocId docId) const { EntryRef ref; if (docId < getCommittedDocIdLimit()) { - ref = _refVector[docId]; + ref = acquire_entry_ref(docId); } if (!ref.valid()) { return {}; @@ -78,10 +78,10 @@ SerializedFastValueAttribute::onLoad(vespalib::Executor *) tensorReader.readBlob(&buffer[0], tensorSize); vespalib::nbostream source(&buffer[0], tensorSize); EntryRef ref = _streamedValueStore.store_encoded_tensor(source); - _refVector.push_back(ref); + _refVector.push_back(AtomicEntryRef(ref)); } else { EntryRef invalid; - _refVector.push_back(invalid); + _refVector.push_back(AtomicEntryRef(invalid)); } } setNumDocs(numDocs); diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp index 383f819659c..112396fd3fb 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp @@ -72,9 +72,9 @@ TensorAttribute::asTensorAttribute() const uint32_t TensorAttribute::clearDoc(DocId docId) { - EntryRef oldRef(_refVector[docId]); + EntryRef oldRef(_refVector[docId].load_relaxed()); updateUncommittedDocIdLimit(docId); - _refVector[docId] = EntryRef(); + _refVector[docId] = AtomicEntryRef(); if (oldRef.valid()) { _tensorStore.holdTensor(oldRef); return 1u; @@ -125,7 +125,7 @@ bool TensorAttribute::addDoc(DocId &docId) { bool incGen = _refVector.isFull(); - _refVector.push_back(EntryRef()); + _refVector.push_back(AtomicEntryRef()); AttributeVector::incNumDocs(); docId = AttributeVector::getNumDocs() - 1; updateUncommittedDocIdLimit(docId); @@ -155,8 +155,8 @@ TensorAttribute::setTensorRef(DocId docId, EntryRef ref) // TODO: validate if following fence is sufficient. std::atomic_thread_fence(std::memory_order_release); // TODO: Check if refVector must consist of std::atomic<EntryRef> - EntryRef oldRef(_refVector[docId]); - _refVector[docId] = ref; + EntryRef oldRef(_refVector[docId].load_relaxed()); + _refVector[docId].store_release(ref); if (oldRef.valid()) { _tensorStore.holdTensor(oldRef); } @@ -239,10 +239,11 @@ TensorAttribute::clearDocs(DocId lidLow, DocId lidLimit, bool) assert(lidLow <= lidLimit); assert(lidLimit <= this->getNumDocs()); for (DocId lid = lidLow; lid < lidLimit; ++lid) { - EntryRef &ref = _refVector[lid]; + AtomicEntryRef& atomic_ref = _refVector[lid]; + EntryRef ref = atomic_ref.load_relaxed(); if (ref.valid()) { _tensorStore.holdTensor(ref); - ref = EntryRef(); + atomic_ref.store_release(EntryRef()); } } } @@ -268,7 +269,12 @@ TensorAttribute::getRefCopy() const { uint32_t size = getCommittedDocIdLimit(); assert(size <= _refVector.size()); - return RefCopyVector(&_refVector[0], &_refVector[0] + size); + RefCopyVector result; + result.reserve(size); + for (uint32_t i = 0; i < size; ++i) { + result.push_back(_refVector[i].load_relaxed()); + } + return result; } void diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h index 9e2f1aabab1..518caef9dc9 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h @@ -19,8 +19,9 @@ namespace search::tensor { class TensorAttribute : public NotImplementedAttribute, public ITensorAttribute { protected: + using AtomicEntryRef = vespalib::datastore::AtomicEntryRef; using EntryRef = TensorStore::EntryRef; - using RefVector = vespalib::RcuVectorBase<EntryRef>; + using RefVector = vespalib::RcuVectorBase<AtomicEntryRef>; RefVector _refVector; // docId -> ref in data store for serialized tensor TensorStore &_tensorStore; // data store for serialized tensors @@ -37,6 +38,7 @@ protected: virtual vespalib::MemoryUsage memory_usage() const; void populate_state(vespalib::slime::Cursor& object) const; void populate_address_space_usage(AddressSpaceUsage& usage) const override; + EntryRef acquire_entry_ref(DocId doc_id) const noexcept { return _refVector.acquire_elem_ref(doc_id).load_acquire(); } public: DECLARE_IDENTIFIABLE_ABSTRACT(TensorAttribute); diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp index 1fd0c3c944b..749b0429975 100644 --- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp +++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp @@ -13,13 +13,11 @@ TensorAttribute::doCompactWorst() uint32_t bufferId = _tensorStore.startCompactWorstBuffer(); size_t lidLimit = _refVector.size(); for (uint32_t lid = 0; lid < lidLimit; ++lid) { - RefType ref = _refVector[lid]; + RefType ref = _refVector[lid].load_relaxed(); (void) ref; if (ref.valid() && ref.bufferId() == bufferId) { RefType newRef = _tensorStore.move(ref); - // TODO: validate if following fence is sufficient. - std::atomic_thread_fence(std::memory_order_release); - _refVector[lid] = newRef; + _refVector[lid].store_release(newRef); } } _tensorStore.finishCompactWorstBuffer(bufferId); |