summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-02-24 10:46:39 +0100
committerGitHub <noreply@github.com>2022-02-24 10:46:39 +0100
commit122aa5b6f4f7c61b8662df6e2115a91af640de33 (patch)
tree22b2b3ee0c2bce5eb1b7107e0de7ad3e77524ad7 /vespalib
parentc1ef92cf92c1c7e2789b9ad9abcb89457cc421ac (diff)
parente3561b6cad8fbb804dc70acc98a6a032e2988997 (diff)
Merge pull request #21343 from vespa-engine/toregge/use-atomic-ops-for-reading-and-writing-data-store-buffer-pointer
Use atomic read with acquire memory ordering when reading data store …
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/vespa/vespalib/datastore/bufferstate.cpp31
-rw-r--r--vespalib/src/vespa/vespalib/datastore/bufferstate.h8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.h22
4 files changed, 34 insertions, 35 deletions
diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
index df5e350c8dd..e0104c8bb71 100644
--- a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
@@ -101,9 +101,9 @@ void
BufferState::onActive(uint32_t bufferId, uint32_t typeId,
BufferTypeBase *typeHandler,
size_t elementsNeeded,
- void *&buffer)
+ std::atomic<void*>& buffer)
{
- assert(buffer == nullptr);
+ assert(buffer.load(std::memory_order_relaxed) == nullptr);
assert(_buffer.get() == nullptr);
assert(_state == FREE);
assert(_typeHandler == nullptr);
@@ -125,15 +125,15 @@ BufferState::onActive(uint32_t bufferId, uint32_t typeId,
auto allocator = typeHandler->get_memory_allocator();
_buffer = (allocator != nullptr) ? Alloc::alloc_with_allocator(allocator) : Alloc::alloc(0, MemoryAllocator::HUGEPAGE_SIZE);
_buffer.create(alloc.bytes).swap(_buffer);
- buffer = _buffer.get();
- assert(buffer != nullptr || alloc.elements == 0u);
+ assert(_buffer.get() != nullptr || alloc.elements == 0u);
+ buffer.store(_buffer.get(), std::memory_order_release);
_allocElems = alloc.elements;
_state = ACTIVE;
_typeHandler = typeHandler;
assert(typeId <= std::numeric_limits<uint16_t>::max());
_typeId = typeId;
_arraySize = _typeHandler->getArraySize();
- typeHandler->onActive(bufferId, &_usedElems, &_deadElems, buffer);
+ typeHandler->onActive(bufferId, &_usedElems, &_deadElems, buffer.load(std::memory_order::relaxed));
}
@@ -161,9 +161,9 @@ BufferState::onHold(uint32_t buffer_id)
void
-BufferState::onFree(void *&buffer)
+BufferState::onFree(std::atomic<void*>& buffer)
{
- assert(buffer == _buffer.get());
+ assert(buffer.load(std::memory_order_relaxed) == _buffer.get());
assert(_state == HOLD);
assert(_typeHandler != nullptr);
assert(_deadElems <= _usedElems);
@@ -171,7 +171,7 @@ BufferState::onFree(void *&buffer)
_typeHandler->destroyElements(buffer, _usedElems);
Alloc::alloc().swap(_buffer);
_typeHandler->onFree(_usedElems);
- buffer = nullptr;
+ buffer.store(nullptr, std::memory_order_release);
_usedElems = 0;
_allocElems = 0;
_deadElems = 0u;
@@ -191,13 +191,13 @@ BufferState::onFree(void *&buffer)
void
-BufferState::dropBuffer(uint32_t buffer_id, void *&buffer)
+BufferState::dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer)
{
if (_state == FREE) {
- assert(buffer == nullptr);
+ assert(buffer.load(std::memory_order_relaxed) == nullptr);
return;
}
- assert(buffer != nullptr || _allocElems == 0);
+ assert(buffer.load(std::memory_order_relaxed) != nullptr || _allocElems == 0);
if (_state == ACTIVE) {
onHold(buffer_id);
}
@@ -205,7 +205,7 @@ BufferState::dropBuffer(uint32_t buffer_id, void *&buffer)
onFree(buffer);
}
assert(_state == FREE);
- assert(buffer == nullptr);
+ assert(buffer.load(std::memory_order_relaxed) == nullptr);
}
@@ -282,7 +282,7 @@ BufferState::disableElemHoldList()
void
BufferState::fallbackResize(uint32_t bufferId,
size_t elementsNeeded,
- void *&buffer,
+ std::atomic<void*>& buffer,
Alloc &holdBuffer)
{
assert(_state == ACTIVE);
@@ -292,13 +292,12 @@ BufferState::fallbackResize(uint32_t bufferId,
assert(alloc.elements >= _usedElems + elementsNeeded);
assert(alloc.elements > _allocElems);
Alloc newBuffer = _buffer.create(alloc.bytes);
- _typeHandler->fallbackCopy(newBuffer.get(), buffer, _usedElems);
+ _typeHandler->fallbackCopy(newBuffer.get(), buffer.load(std::memory_order_relaxed), _usedElems);
holdBuffer.swap(_buffer);
std::atomic_thread_fence(std::memory_order_release);
_buffer = std::move(newBuffer);
- buffer = _buffer.get();
+ buffer.store(_buffer.get(), std::memory_order_release);
_allocElems = alloc.elements;
- std::atomic_thread_fence(std::memory_order_release);
}
void
diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.h b/vespalib/src/vespa/vespalib/datastore/bufferstate.h
index ee06c928f54..7862f58cfe1 100644
--- a/vespalib/src/vespa/vespalib/datastore/bufferstate.h
+++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.h
@@ -94,7 +94,7 @@ public:
* @param buffer Start of allocated buffer return value.
*/
void onActive(uint32_t bufferId, uint32_t typeId, BufferTypeBase *typeHandler,
- size_t elementsNeeded, void *&buffer);
+ size_t elementsNeeded, std::atomic<void*>& buffer);
/**
* Transition from ACTIVE to HOLD state.
@@ -104,7 +104,7 @@ public:
/**
* Transition from HOLD to FREE state.
*/
- void onFree(void *&buffer);
+ void onFree(std::atomic<void*>& buffer);
/**
* Set list of buffer states with nonempty free lists.
@@ -157,7 +157,7 @@ public:
void cleanHold(void *buffer, size_t offset, ElemCount numElems) {
_typeHandler->cleanHold(buffer, offset, numElems, BufferTypeBase::CleanContext(_extraUsedBytes, _extraHoldBytes));
}
- void dropBuffer(uint32_t buffer_id, void *&buffer);
+ void dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer);
uint32_t getTypeId() const { return _typeId; }
uint32_t getArraySize() const { return _arraySize; }
size_t getDeadElems() const { return _deadElems; }
@@ -167,7 +167,7 @@ public:
bool getCompacting() const { return _compacting; }
void setCompacting() { _compacting = true; }
uint32_t get_used_arrays() const noexcept { return _usedElems / _arraySize; }
- void fallbackResize(uint32_t bufferId, size_t elementsNeeded, void *&buffer, Alloc &holdBuffer);
+ void fallbackResize(uint32_t bufferId, size_t elementsNeeded, std::atomic<void*>& buffer, Alloc &holdBuffer);
bool isActive(uint32_t typeId) const {
return ((_state == ACTIVE) && (_typeId == typeId));
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
index a4207dd13bd..43e47296029 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
@@ -242,7 +242,7 @@ DataStoreBase::doneHoldBuffer(uint32_t bufferId)
{
assert(_hold_buffer_count > 0);
--_hold_buffer_count;
- _states[bufferId].onFree(_buffers[bufferId].getBuffer());
+ _states[bufferId].onFree(_buffers[bufferId].get_atomic_buffer());
}
void
@@ -265,7 +265,7 @@ DataStoreBase::dropBuffers()
{
uint32_t numBuffers = _buffers.size();
for (uint32_t bufferId = 0; bufferId < numBuffers; ++bufferId) {
- _states[bufferId].dropBuffer(bufferId, _buffers[bufferId].getBuffer());
+ _states[bufferId].dropBuffer(bufferId, _buffers[bufferId].get_atomic_buffer());
}
_genHolder.clearHoldLists();
}
@@ -421,7 +421,7 @@ DataStoreBase::onActive(uint32_t bufferId, uint32_t typeId, size_t elemsNeeded)
state.onActive(bufferId, typeId,
_typeHandlers[typeId],
elemsNeeded,
- _buffers[bufferId].getBuffer());
+ _buffers[bufferId].get_atomic_buffer());
enableFreeList(bufferId);
}
@@ -463,7 +463,7 @@ DataStoreBase::fallbackResize(uint32_t bufferId, size_t elemsNeeded)
size_t oldAllocElems = state.capacity();
size_t elementSize = state.getTypeHandler()->elementSize();
state.fallbackResize(bufferId, elemsNeeded,
- _buffers[bufferId].getBuffer(),
+ _buffers[bufferId].get_atomic_buffer(),
toHoldBuffer);
GenerationHeldBase::UP
hold(new FallbackHold(oldAllocElems * elementSize,
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.h b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
index e98d9531806..0063f497558 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.h
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
@@ -45,15 +45,15 @@ protected:
private:
class BufferAndTypeId {
public:
- using MemPtr = void *;
BufferAndTypeId() : BufferAndTypeId(nullptr, 0) { }
- BufferAndTypeId(MemPtr buffer, uint32_t typeId) : _buffer(buffer), _typeId(typeId) { }
- MemPtr getBuffer() const { return _buffer; }
- MemPtr & getBuffer() { return _buffer; }
+ BufferAndTypeId(void* buffer, uint32_t typeId) : _buffer(buffer), _typeId(typeId) { }
+ std::atomic<void*>& get_atomic_buffer() noexcept { return _buffer; }
+ void* get_buffer_relaxed() noexcept { return _buffer.load(std::memory_order_relaxed); }
+ const void* get_buffer_acquire() const noexcept { return _buffer.load(std::memory_order_acquire); }
uint32_t getTypeId() const { return _typeId; }
void setTypeId(uint32_t typeId) { _typeId = typeId; }
private:
- MemPtr _buffer;
+ std::atomic<void*> _buffer;
uint32_t _typeId;
};
std::vector<BufferAndTypeId> _buffers; // For fast mapping with known types
@@ -62,7 +62,7 @@ protected:
// The primary buffer is used for allocations of new element(s) if no available slots are found in free lists.
std::vector<uint32_t> _primary_buffer_ids;
- void * getBuffer(uint32_t bufferId) { return _buffers[bufferId].getBuffer(); }
+ void* getBuffer(uint32_t bufferId) { return _buffers[bufferId].get_buffer_relaxed(); }
/**
* Hold list at freeze, when knowing how long elements must be held
@@ -185,7 +185,7 @@ protected:
* Get the primary buffer for the given type id.
*/
void* primary_buffer(uint32_t typeId) {
- return _buffers[_primary_buffer_ids[typeId]].getBuffer();
+ return _buffers[_primary_buffer_ids[typeId]].get_buffer_relaxed();
}
/**
@@ -277,22 +277,22 @@ public:
template <typename EntryType, typename RefType>
EntryType *getEntry(RefType ref) {
- return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset();
+ return static_cast<EntryType *>(_buffers[ref.bufferId()].get_buffer_relaxed()) + ref.offset();
}
template <typename EntryType, typename RefType>
const EntryType *getEntry(RefType ref) const {
- return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset();
+ return static_cast<const EntryType *>(_buffers[ref.bufferId()].get_buffer_acquire()) + ref.offset();
}
template <typename EntryType, typename RefType>
EntryType *getEntryArray(RefType ref, size_t arraySize) {
- return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize);
+ return static_cast<EntryType *>(_buffers[ref.bufferId()].get_buffer_relaxed()) + (ref.offset() * arraySize);
}
template <typename EntryType, typename RefType>
const EntryType *getEntryArray(RefType ref, size_t arraySize) const {
- return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize);
+ return static_cast<const EntryType *>(_buffers[ref.bufferId()].get_buffer_acquire()) + (ref.offset() * arraySize);
}
void dropBuffers();