diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-02-24 10:46:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-24 10:46:39 +0100 |
commit | 122aa5b6f4f7c61b8662df6e2115a91af640de33 (patch) | |
tree | 22b2b3ee0c2bce5eb1b7107e0de7ad3e77524ad7 /vespalib | |
parent | c1ef92cf92c1c7e2789b9ad9abcb89457cc421ac (diff) | |
parent | e3561b6cad8fbb804dc70acc98a6a032e2988997 (diff) |
Merge pull request #21343 from vespa-engine/toregge/use-atomic-ops-for-reading-and-writing-data-store-buffer-pointer
Use atomic read with acquire memory ordering when reading data store …
Diffstat (limited to 'vespalib')
4 files changed, 34 insertions, 35 deletions
diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp index df5e350c8dd..e0104c8bb71 100644 --- a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp +++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp @@ -101,9 +101,9 @@ void BufferState::onActive(uint32_t bufferId, uint32_t typeId, BufferTypeBase *typeHandler, size_t elementsNeeded, - void *&buffer) + std::atomic<void*>& buffer) { - assert(buffer == nullptr); + assert(buffer.load(std::memory_order_relaxed) == nullptr); assert(_buffer.get() == nullptr); assert(_state == FREE); assert(_typeHandler == nullptr); @@ -125,15 +125,15 @@ BufferState::onActive(uint32_t bufferId, uint32_t typeId, auto allocator = typeHandler->get_memory_allocator(); _buffer = (allocator != nullptr) ? Alloc::alloc_with_allocator(allocator) : Alloc::alloc(0, MemoryAllocator::HUGEPAGE_SIZE); _buffer.create(alloc.bytes).swap(_buffer); - buffer = _buffer.get(); - assert(buffer != nullptr || alloc.elements == 0u); + assert(_buffer.get() != nullptr || alloc.elements == 0u); + buffer.store(_buffer.get(), std::memory_order_release); _allocElems = alloc.elements; _state = ACTIVE; _typeHandler = typeHandler; assert(typeId <= std::numeric_limits<uint16_t>::max()); _typeId = typeId; _arraySize = _typeHandler->getArraySize(); - typeHandler->onActive(bufferId, &_usedElems, &_deadElems, buffer); + typeHandler->onActive(bufferId, &_usedElems, &_deadElems, buffer.load(std::memory_order::relaxed)); } @@ -161,9 +161,9 @@ BufferState::onHold(uint32_t buffer_id) void -BufferState::onFree(void *&buffer) +BufferState::onFree(std::atomic<void*>& buffer) { - assert(buffer == _buffer.get()); + assert(buffer.load(std::memory_order_relaxed) == _buffer.get()); assert(_state == HOLD); assert(_typeHandler != nullptr); assert(_deadElems <= _usedElems); @@ -171,7 +171,7 @@ BufferState::onFree(void *&buffer) _typeHandler->destroyElements(buffer, _usedElems); Alloc::alloc().swap(_buffer); _typeHandler->onFree(_usedElems); - buffer = nullptr; + buffer.store(nullptr, std::memory_order_release); _usedElems = 0; _allocElems = 0; _deadElems = 0u; @@ -191,13 +191,13 @@ BufferState::onFree(void *&buffer) void -BufferState::dropBuffer(uint32_t buffer_id, void *&buffer) +BufferState::dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer) { if (_state == FREE) { - assert(buffer == nullptr); + assert(buffer.load(std::memory_order_relaxed) == nullptr); return; } - assert(buffer != nullptr || _allocElems == 0); + assert(buffer.load(std::memory_order_relaxed) != nullptr || _allocElems == 0); if (_state == ACTIVE) { onHold(buffer_id); } @@ -205,7 +205,7 @@ BufferState::dropBuffer(uint32_t buffer_id, void *&buffer) onFree(buffer); } assert(_state == FREE); - assert(buffer == nullptr); + assert(buffer.load(std::memory_order_relaxed) == nullptr); } @@ -282,7 +282,7 @@ BufferState::disableElemHoldList() void BufferState::fallbackResize(uint32_t bufferId, size_t elementsNeeded, - void *&buffer, + std::atomic<void*>& buffer, Alloc &holdBuffer) { assert(_state == ACTIVE); @@ -292,13 +292,12 @@ BufferState::fallbackResize(uint32_t bufferId, assert(alloc.elements >= _usedElems + elementsNeeded); assert(alloc.elements > _allocElems); Alloc newBuffer = _buffer.create(alloc.bytes); - _typeHandler->fallbackCopy(newBuffer.get(), buffer, _usedElems); + _typeHandler->fallbackCopy(newBuffer.get(), buffer.load(std::memory_order_relaxed), _usedElems); holdBuffer.swap(_buffer); std::atomic_thread_fence(std::memory_order_release); _buffer = std::move(newBuffer); - buffer = _buffer.get(); + buffer.store(_buffer.get(), std::memory_order_release); _allocElems = alloc.elements; - std::atomic_thread_fence(std::memory_order_release); } void diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.h b/vespalib/src/vespa/vespalib/datastore/bufferstate.h index ee06c928f54..7862f58cfe1 100644 --- a/vespalib/src/vespa/vespalib/datastore/bufferstate.h +++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.h @@ -94,7 +94,7 @@ public: * @param buffer Start of allocated buffer return value. */ void onActive(uint32_t bufferId, uint32_t typeId, BufferTypeBase *typeHandler, - size_t elementsNeeded, void *&buffer); + size_t elementsNeeded, std::atomic<void*>& buffer); /** * Transition from ACTIVE to HOLD state. @@ -104,7 +104,7 @@ public: /** * Transition from HOLD to FREE state. */ - void onFree(void *&buffer); + void onFree(std::atomic<void*>& buffer); /** * Set list of buffer states with nonempty free lists. @@ -157,7 +157,7 @@ public: void cleanHold(void *buffer, size_t offset, ElemCount numElems) { _typeHandler->cleanHold(buffer, offset, numElems, BufferTypeBase::CleanContext(_extraUsedBytes, _extraHoldBytes)); } - void dropBuffer(uint32_t buffer_id, void *&buffer); + void dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer); uint32_t getTypeId() const { return _typeId; } uint32_t getArraySize() const { return _arraySize; } size_t getDeadElems() const { return _deadElems; } @@ -167,7 +167,7 @@ public: bool getCompacting() const { return _compacting; } void setCompacting() { _compacting = true; } uint32_t get_used_arrays() const noexcept { return _usedElems / _arraySize; } - void fallbackResize(uint32_t bufferId, size_t elementsNeeded, void *&buffer, Alloc &holdBuffer); + void fallbackResize(uint32_t bufferId, size_t elementsNeeded, std::atomic<void*>& buffer, Alloc &holdBuffer); bool isActive(uint32_t typeId) const { return ((_state == ACTIVE) && (_typeId == typeId)); diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp index a4207dd13bd..43e47296029 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp +++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp @@ -242,7 +242,7 @@ DataStoreBase::doneHoldBuffer(uint32_t bufferId) { assert(_hold_buffer_count > 0); --_hold_buffer_count; - _states[bufferId].onFree(_buffers[bufferId].getBuffer()); + _states[bufferId].onFree(_buffers[bufferId].get_atomic_buffer()); } void @@ -265,7 +265,7 @@ DataStoreBase::dropBuffers() { uint32_t numBuffers = _buffers.size(); for (uint32_t bufferId = 0; bufferId < numBuffers; ++bufferId) { - _states[bufferId].dropBuffer(bufferId, _buffers[bufferId].getBuffer()); + _states[bufferId].dropBuffer(bufferId, _buffers[bufferId].get_atomic_buffer()); } _genHolder.clearHoldLists(); } @@ -421,7 +421,7 @@ DataStoreBase::onActive(uint32_t bufferId, uint32_t typeId, size_t elemsNeeded) state.onActive(bufferId, typeId, _typeHandlers[typeId], elemsNeeded, - _buffers[bufferId].getBuffer()); + _buffers[bufferId].get_atomic_buffer()); enableFreeList(bufferId); } @@ -463,7 +463,7 @@ DataStoreBase::fallbackResize(uint32_t bufferId, size_t elemsNeeded) size_t oldAllocElems = state.capacity(); size_t elementSize = state.getTypeHandler()->elementSize(); state.fallbackResize(bufferId, elemsNeeded, - _buffers[bufferId].getBuffer(), + _buffers[bufferId].get_atomic_buffer(), toHoldBuffer); GenerationHeldBase::UP hold(new FallbackHold(oldAllocElems * elementSize, diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.h b/vespalib/src/vespa/vespalib/datastore/datastorebase.h index e98d9531806..0063f497558 100644 --- a/vespalib/src/vespa/vespalib/datastore/datastorebase.h +++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.h @@ -45,15 +45,15 @@ protected: private: class BufferAndTypeId { public: - using MemPtr = void *; BufferAndTypeId() : BufferAndTypeId(nullptr, 0) { } - BufferAndTypeId(MemPtr buffer, uint32_t typeId) : _buffer(buffer), _typeId(typeId) { } - MemPtr getBuffer() const { return _buffer; } - MemPtr & getBuffer() { return _buffer; } + BufferAndTypeId(void* buffer, uint32_t typeId) : _buffer(buffer), _typeId(typeId) { } + std::atomic<void*>& get_atomic_buffer() noexcept { return _buffer; } + void* get_buffer_relaxed() noexcept { return _buffer.load(std::memory_order_relaxed); } + const void* get_buffer_acquire() const noexcept { return _buffer.load(std::memory_order_acquire); } uint32_t getTypeId() const { return _typeId; } void setTypeId(uint32_t typeId) { _typeId = typeId; } private: - MemPtr _buffer; + std::atomic<void*> _buffer; uint32_t _typeId; }; std::vector<BufferAndTypeId> _buffers; // For fast mapping with known types @@ -62,7 +62,7 @@ protected: // The primary buffer is used for allocations of new element(s) if no available slots are found in free lists. std::vector<uint32_t> _primary_buffer_ids; - void * getBuffer(uint32_t bufferId) { return _buffers[bufferId].getBuffer(); } + void* getBuffer(uint32_t bufferId) { return _buffers[bufferId].get_buffer_relaxed(); } /** * Hold list at freeze, when knowing how long elements must be held @@ -185,7 +185,7 @@ protected: * Get the primary buffer for the given type id. */ void* primary_buffer(uint32_t typeId) { - return _buffers[_primary_buffer_ids[typeId]].getBuffer(); + return _buffers[_primary_buffer_ids[typeId]].get_buffer_relaxed(); } /** @@ -277,22 +277,22 @@ public: template <typename EntryType, typename RefType> EntryType *getEntry(RefType ref) { - return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset(); + return static_cast<EntryType *>(_buffers[ref.bufferId()].get_buffer_relaxed()) + ref.offset(); } template <typename EntryType, typename RefType> const EntryType *getEntry(RefType ref) const { - return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset(); + return static_cast<const EntryType *>(_buffers[ref.bufferId()].get_buffer_acquire()) + ref.offset(); } template <typename EntryType, typename RefType> EntryType *getEntryArray(RefType ref, size_t arraySize) { - return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize); + return static_cast<EntryType *>(_buffers[ref.bufferId()].get_buffer_relaxed()) + (ref.offset() * arraySize); } template <typename EntryType, typename RefType> const EntryType *getEntryArray(RefType ref, size_t arraySize) const { - return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize); + return static_cast<const EntryType *>(_buffers[ref.bufferId()].get_buffer_acquire()) + (ref.offset() * arraySize); } void dropBuffers(); |