summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2022-03-21 16:06:11 +0100
committerGitHub <noreply@github.com>2022-03-21 16:06:11 +0100
commit4a6506e8e721d75d91e1398a876653df1e749d2b (patch)
treede32fcf0139a337bdac3b87aea49ce61eac714d9
parent34cf5db6ceeb838f1850558984cbe5e06d63870f (diff)
parent439bced1645e7e86b43d5da74c3ac48d9cae9a67 (diff)
Merge pull request #21759 from vespa-engine/toregge/use-atomic-entry-ref-in-tensor-attribute
Use AtomicEntryRef in tensor attribute.
-rw-r--r--searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp22
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_attribute.h4
-rw-r--r--searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp6
6 files changed, 33 insertions, 28 deletions
diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
index d376fb020be..a3b197ac499 100644
--- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp
@@ -113,7 +113,7 @@ DenseTensorAttribute::internal_set_tensor(DocId docid, const vespalib::eval::Val
void
DenseTensorAttribute::consider_remove_from_index(DocId docid)
{
- if (_index && _refVector[docid].valid()) {
+ if (_index && _refVector[docid].load_relaxed().valid()) {
_index->remove_document(docid);
}
}
@@ -209,7 +209,7 @@ DenseTensorAttribute::getTensor(DocId docId) const
{
EntryRef ref;
if (docId < getCommittedDocIdLimit()) {
- ref = _refVector[docId];
+ ref = acquire_entry_ref(docId);
}
if (!ref.valid()) {
return {};
@@ -222,7 +222,7 @@ DenseTensorAttribute::extract_cells_ref(DocId docId) const
{
EntryRef ref;
if (docId < getCommittedDocIdLimit()) {
- ref = _refVector[docId];
+ ref = acquire_entry_ref(docId);
}
return _denseTensorStore.get_typed_cells(ref);
}
@@ -370,12 +370,12 @@ DenseTensorAttribute::onLoad(vespalib::Executor *executor)
if (reader.is_present()) {
auto raw = _denseTensorStore.allocRawBuffer();
reader.readTensor(raw.data, _denseTensorStore.getBufSize());
- _refVector.push_back(raw.ref);
+ _refVector.push_back(AtomicEntryRef(raw.ref));
if (loader) {
loader->load(lid, raw.ref);
}
} else {
- _refVector.push_back(EntryRef());
+ _refVector.push_back(AtomicEntryRef());
}
}
if (loader) {
@@ -483,8 +483,7 @@ DenseTensorAttribute::onShrinkLidSpace()
vespalib::eval::TypedCells
DenseTensorAttribute::get_vector(uint32_t docid) const
{
- assert(docid < _refVector.size());
- EntryRef ref = _refVector[docid];
+ EntryRef ref = acquire_entry_ref(docid);
return _denseTensorStore.get_typed_cells(ref);
}
diff --git a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp
index 0ae55a670da..fbb55bf396c 100644
--- a/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/direct_tensor_attribute.cpp
@@ -50,10 +50,10 @@ DirectTensorAttribute::onLoad(vespalib::Executor *)
tensorReader.readBlob(&buffer[0], tensorSize);
auto tensor = deserialize_tensor(&buffer[0], tensorSize);
EntryRef ref = _direct_store.store_tensor(std::move(tensor));
- _refVector.push_back(ref);
+ _refVector.push_back(AtomicEntryRef(ref));
} else {
EntryRef invalid;
- _refVector.push_back(invalid);
+ _refVector.push_back(AtomicEntryRef(invalid));
}
}
setNumDocs(numDocs);
@@ -82,7 +82,7 @@ DirectTensorAttribute::update_tensor(DocId docId,
{
EntryRef ref;
if (docId < getCommittedDocIdLimit()) {
- ref = _refVector[docId];
+ ref = _refVector[docId].load_relaxed();
}
if (ref.valid()) {
auto ptr = _direct_store.get_tensor(ref);
@@ -107,7 +107,7 @@ DirectTensorAttribute::getTensor(DocId docId) const
{
EntryRef ref;
if (docId < getCommittedDocIdLimit()) {
- ref = _refVector[docId];
+ ref = acquire_entry_ref(docId);
}
if (ref.valid()) {
auto ptr = _direct_store.get_tensor(ref);
@@ -124,7 +124,7 @@ DirectTensorAttribute::get_tensor_ref(DocId docId) const
{
EntryRef ref;
if (docId < getCommittedDocIdLimit()) {
- ref = _refVector[docId];
+ ref = acquire_entry_ref(docId);
}
if (ref.valid()) {
auto ptr = _direct_store.get_tensor(ref);
diff --git a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
index a0ec04b98cb..87537298d1f 100644
--- a/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/serialized_fast_value_attribute.cpp
@@ -45,7 +45,7 @@ SerializedFastValueAttribute::getTensor(DocId docId) const
{
EntryRef ref;
if (docId < getCommittedDocIdLimit()) {
- ref = _refVector[docId];
+ ref = acquire_entry_ref(docId);
}
if (!ref.valid()) {
return {};
@@ -78,10 +78,10 @@ SerializedFastValueAttribute::onLoad(vespalib::Executor *)
tensorReader.readBlob(&buffer[0], tensorSize);
vespalib::nbostream source(&buffer[0], tensorSize);
EntryRef ref = _streamedValueStore.store_encoded_tensor(source);
- _refVector.push_back(ref);
+ _refVector.push_back(AtomicEntryRef(ref));
} else {
EntryRef invalid;
- _refVector.push_back(invalid);
+ _refVector.push_back(AtomicEntryRef(invalid));
}
}
setNumDocs(numDocs);
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp
index 383f819659c..112396fd3fb 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.cpp
@@ -72,9 +72,9 @@ TensorAttribute::asTensorAttribute() const
uint32_t
TensorAttribute::clearDoc(DocId docId)
{
- EntryRef oldRef(_refVector[docId]);
+ EntryRef oldRef(_refVector[docId].load_relaxed());
updateUncommittedDocIdLimit(docId);
- _refVector[docId] = EntryRef();
+ _refVector[docId] = AtomicEntryRef();
if (oldRef.valid()) {
_tensorStore.holdTensor(oldRef);
return 1u;
@@ -125,7 +125,7 @@ bool
TensorAttribute::addDoc(DocId &docId)
{
bool incGen = _refVector.isFull();
- _refVector.push_back(EntryRef());
+ _refVector.push_back(AtomicEntryRef());
AttributeVector::incNumDocs();
docId = AttributeVector::getNumDocs() - 1;
updateUncommittedDocIdLimit(docId);
@@ -155,8 +155,8 @@ TensorAttribute::setTensorRef(DocId docId, EntryRef ref)
// TODO: validate if following fence is sufficient.
std::atomic_thread_fence(std::memory_order_release);
// TODO: Check if refVector must consist of std::atomic<EntryRef>
- EntryRef oldRef(_refVector[docId]);
- _refVector[docId] = ref;
+ EntryRef oldRef(_refVector[docId].load_relaxed());
+ _refVector[docId].store_release(ref);
if (oldRef.valid()) {
_tensorStore.holdTensor(oldRef);
}
@@ -239,10 +239,11 @@ TensorAttribute::clearDocs(DocId lidLow, DocId lidLimit, bool)
assert(lidLow <= lidLimit);
assert(lidLimit <= this->getNumDocs());
for (DocId lid = lidLow; lid < lidLimit; ++lid) {
- EntryRef &ref = _refVector[lid];
+ AtomicEntryRef& atomic_ref = _refVector[lid];
+ EntryRef ref = atomic_ref.load_relaxed();
if (ref.valid()) {
_tensorStore.holdTensor(ref);
- ref = EntryRef();
+ atomic_ref.store_release(EntryRef());
}
}
}
@@ -268,7 +269,12 @@ TensorAttribute::getRefCopy() const
{
uint32_t size = getCommittedDocIdLimit();
assert(size <= _refVector.size());
- return RefCopyVector(&_refVector[0], &_refVector[0] + size);
+ RefCopyVector result;
+ result.reserve(size);
+ for (uint32_t i = 0; i < size; ++i) {
+ result.push_back(_refVector[i].load_relaxed());
+ }
+ return result;
}
void
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h
index 9e2f1aabab1..518caef9dc9 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.h
@@ -19,8 +19,9 @@ namespace search::tensor {
class TensorAttribute : public NotImplementedAttribute, public ITensorAttribute
{
protected:
+ using AtomicEntryRef = vespalib::datastore::AtomicEntryRef;
using EntryRef = TensorStore::EntryRef;
- using RefVector = vespalib::RcuVectorBase<EntryRef>;
+ using RefVector = vespalib::RcuVectorBase<AtomicEntryRef>;
RefVector _refVector; // docId -> ref in data store for serialized tensor
TensorStore &_tensorStore; // data store for serialized tensors
@@ -37,6 +38,7 @@ protected:
virtual vespalib::MemoryUsage memory_usage() const;
void populate_state(vespalib::slime::Cursor& object) const;
void populate_address_space_usage(AddressSpaceUsage& usage) const override;
+ EntryRef acquire_entry_ref(DocId doc_id) const noexcept { return _refVector.acquire_elem_ref(doc_id).load_acquire(); }
public:
DECLARE_IDENTIFIABLE_ABSTRACT(TensorAttribute);
diff --git a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp
index 1fd0c3c944b..749b0429975 100644
--- a/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp
+++ b/searchlib/src/vespa/searchlib/tensor/tensor_attribute.hpp
@@ -13,13 +13,11 @@ TensorAttribute::doCompactWorst()
uint32_t bufferId = _tensorStore.startCompactWorstBuffer();
size_t lidLimit = _refVector.size();
for (uint32_t lid = 0; lid < lidLimit; ++lid) {
- RefType ref = _refVector[lid];
+ RefType ref = _refVector[lid].load_relaxed();
(void) ref;
if (ref.valid() && ref.bufferId() == bufferId) {
RefType newRef = _tensorStore.move(ref);
- // TODO: validate if following fence is sufficient.
- std::atomic_thread_fence(std::memory_order_release);
- _refVector[lid] = newRef;
+ _refVector[lid].store_release(newRef);
}
}
_tensorStore.finishCompactWorstBuffer(bufferId);