summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2022-02-23 13:58:08 +0100
committerTor Egge <Tor.Egge@online.no>2022-02-23 13:58:08 +0100
commitd5aa2d5cc8a6a904f5b03f0c51545bdfd1529f62 (patch)
treed10086533b4053f19a80e17926773258405a4850 /vespalib
parent70710b4e95fbf655775ca6baf2bc9cdef5b15d6f (diff)
Use atomic read with acquire memory ordering when reading data store buffer pointer in reader thread.
Use atomic write with release memory ordering when updating data store buffer pointer in writer thread.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/vespa/vespalib/datastore/bufferstate.cpp31
-rw-r--r--vespalib/src/vespa/vespalib/datastore/bufferstate.h8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/datastore/datastorebase.h23
4 files changed, 35 insertions, 35 deletions
diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
index df5e350c8dd..e0104c8bb71 100644
--- a/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.cpp
@@ -101,9 +101,9 @@ void
BufferState::onActive(uint32_t bufferId, uint32_t typeId,
BufferTypeBase *typeHandler,
size_t elementsNeeded,
- void *&buffer)
+ std::atomic<void*>& buffer)
{
- assert(buffer == nullptr);
+ assert(buffer.load(std::memory_order_relaxed) == nullptr);
assert(_buffer.get() == nullptr);
assert(_state == FREE);
assert(_typeHandler == nullptr);
@@ -125,15 +125,15 @@ BufferState::onActive(uint32_t bufferId, uint32_t typeId,
auto allocator = typeHandler->get_memory_allocator();
_buffer = (allocator != nullptr) ? Alloc::alloc_with_allocator(allocator) : Alloc::alloc(0, MemoryAllocator::HUGEPAGE_SIZE);
_buffer.create(alloc.bytes).swap(_buffer);
- buffer = _buffer.get();
- assert(buffer != nullptr || alloc.elements == 0u);
+ assert(_buffer.get() != nullptr || alloc.elements == 0u);
+ buffer.store(_buffer.get(), std::memory_order_release);
_allocElems = alloc.elements;
_state = ACTIVE;
_typeHandler = typeHandler;
assert(typeId <= std::numeric_limits<uint16_t>::max());
_typeId = typeId;
_arraySize = _typeHandler->getArraySize();
- typeHandler->onActive(bufferId, &_usedElems, &_deadElems, buffer);
+ typeHandler->onActive(bufferId, &_usedElems, &_deadElems, buffer.load(std::memory_order::relaxed));
}
@@ -161,9 +161,9 @@ BufferState::onHold(uint32_t buffer_id)
void
-BufferState::onFree(void *&buffer)
+BufferState::onFree(std::atomic<void*>& buffer)
{
- assert(buffer == _buffer.get());
+ assert(buffer.load(std::memory_order_relaxed) == _buffer.get());
assert(_state == HOLD);
assert(_typeHandler != nullptr);
assert(_deadElems <= _usedElems);
@@ -171,7 +171,7 @@ BufferState::onFree(void *&buffer)
_typeHandler->destroyElements(buffer, _usedElems);
Alloc::alloc().swap(_buffer);
_typeHandler->onFree(_usedElems);
- buffer = nullptr;
+ buffer.store(nullptr, std::memory_order_release);
_usedElems = 0;
_allocElems = 0;
_deadElems = 0u;
@@ -191,13 +191,13 @@ BufferState::onFree(void *&buffer)
void
-BufferState::dropBuffer(uint32_t buffer_id, void *&buffer)
+BufferState::dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer)
{
if (_state == FREE) {
- assert(buffer == nullptr);
+ assert(buffer.load(std::memory_order_relaxed) == nullptr);
return;
}
- assert(buffer != nullptr || _allocElems == 0);
+ assert(buffer.load(std::memory_order_relaxed) != nullptr || _allocElems == 0);
if (_state == ACTIVE) {
onHold(buffer_id);
}
@@ -205,7 +205,7 @@ BufferState::dropBuffer(uint32_t buffer_id, void *&buffer)
onFree(buffer);
}
assert(_state == FREE);
- assert(buffer == nullptr);
+ assert(buffer.load(std::memory_order_relaxed) == nullptr);
}
@@ -282,7 +282,7 @@ BufferState::disableElemHoldList()
void
BufferState::fallbackResize(uint32_t bufferId,
size_t elementsNeeded,
- void *&buffer,
+ std::atomic<void*>& buffer,
Alloc &holdBuffer)
{
assert(_state == ACTIVE);
@@ -292,13 +292,12 @@ BufferState::fallbackResize(uint32_t bufferId,
assert(alloc.elements >= _usedElems + elementsNeeded);
assert(alloc.elements > _allocElems);
Alloc newBuffer = _buffer.create(alloc.bytes);
- _typeHandler->fallbackCopy(newBuffer.get(), buffer, _usedElems);
+ _typeHandler->fallbackCopy(newBuffer.get(), buffer.load(std::memory_order_relaxed), _usedElems);
holdBuffer.swap(_buffer);
std::atomic_thread_fence(std::memory_order_release);
_buffer = std::move(newBuffer);
- buffer = _buffer.get();
+ buffer.store(_buffer.get(), std::memory_order_release);
_allocElems = alloc.elements;
- std::atomic_thread_fence(std::memory_order_release);
}
void
diff --git a/vespalib/src/vespa/vespalib/datastore/bufferstate.h b/vespalib/src/vespa/vespalib/datastore/bufferstate.h
index ee06c928f54..7862f58cfe1 100644
--- a/vespalib/src/vespa/vespalib/datastore/bufferstate.h
+++ b/vespalib/src/vespa/vespalib/datastore/bufferstate.h
@@ -94,7 +94,7 @@ public:
* @param buffer Start of allocated buffer return value.
*/
void onActive(uint32_t bufferId, uint32_t typeId, BufferTypeBase *typeHandler,
- size_t elementsNeeded, void *&buffer);
+ size_t elementsNeeded, std::atomic<void*>& buffer);
/**
* Transition from ACTIVE to HOLD state.
@@ -104,7 +104,7 @@ public:
/**
* Transition from HOLD to FREE state.
*/
- void onFree(void *&buffer);
+ void onFree(std::atomic<void*>& buffer);
/**
* Set list of buffer states with nonempty free lists.
@@ -157,7 +157,7 @@ public:
void cleanHold(void *buffer, size_t offset, ElemCount numElems) {
_typeHandler->cleanHold(buffer, offset, numElems, BufferTypeBase::CleanContext(_extraUsedBytes, _extraHoldBytes));
}
- void dropBuffer(uint32_t buffer_id, void *&buffer);
+ void dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer);
uint32_t getTypeId() const { return _typeId; }
uint32_t getArraySize() const { return _arraySize; }
size_t getDeadElems() const { return _deadElems; }
@@ -167,7 +167,7 @@ public:
bool getCompacting() const { return _compacting; }
void setCompacting() { _compacting = true; }
uint32_t get_used_arrays() const noexcept { return _usedElems / _arraySize; }
- void fallbackResize(uint32_t bufferId, size_t elementsNeeded, void *&buffer, Alloc &holdBuffer);
+ void fallbackResize(uint32_t bufferId, size_t elementsNeeded, std::atomic<void*>& buffer, Alloc &holdBuffer);
bool isActive(uint32_t typeId) const {
return ((_state == ACTIVE) && (_typeId == typeId));
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
index d2b23a1a5cf..b31af7e18b8 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.cpp
@@ -242,7 +242,7 @@ DataStoreBase::doneHoldBuffer(uint32_t bufferId)
{
assert(_hold_buffer_count > 0);
--_hold_buffer_count;
- _states[bufferId].onFree(_buffers[bufferId].getBuffer());
+ _states[bufferId].onFree(_buffers[bufferId].get_atomic_buffer());
}
void
@@ -265,7 +265,7 @@ DataStoreBase::dropBuffers()
{
uint32_t numBuffers = _buffers.size();
for (uint32_t bufferId = 0; bufferId < numBuffers; ++bufferId) {
- _states[bufferId].dropBuffer(bufferId, _buffers[bufferId].getBuffer());
+ _states[bufferId].dropBuffer(bufferId, _buffers[bufferId].get_atomic_buffer());
}
_genHolder.clearHoldLists();
}
@@ -421,7 +421,7 @@ DataStoreBase::onActive(uint32_t bufferId, uint32_t typeId, size_t elemsNeeded)
state.onActive(bufferId, typeId,
_typeHandlers[typeId],
elemsNeeded,
- _buffers[bufferId].getBuffer());
+ _buffers[bufferId].get_atomic_buffer());
enableFreeList(bufferId);
}
@@ -463,7 +463,7 @@ DataStoreBase::fallbackResize(uint32_t bufferId, size_t elemsNeeded)
size_t oldAllocElems = state.capacity();
size_t elementSize = state.getTypeHandler()->elementSize();
state.fallbackResize(bufferId, elemsNeeded,
- _buffers[bufferId].getBuffer(),
+ _buffers[bufferId].get_atomic_buffer(),
toHoldBuffer);
GenerationHeldBase::UP
hold(new FallbackHold(oldAllocElems * elementSize,
diff --git a/vespalib/src/vespa/vespalib/datastore/datastorebase.h b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
index e98d9531806..8361c628af6 100644
--- a/vespalib/src/vespa/vespalib/datastore/datastorebase.h
+++ b/vespalib/src/vespa/vespalib/datastore/datastorebase.h
@@ -45,15 +45,16 @@ protected:
private:
class BufferAndTypeId {
public:
- using MemPtr = void *;
BufferAndTypeId() : BufferAndTypeId(nullptr, 0) { }
- BufferAndTypeId(MemPtr buffer, uint32_t typeId) : _buffer(buffer), _typeId(typeId) { }
- MemPtr getBuffer() const { return _buffer; }
- MemPtr & getBuffer() { return _buffer; }
+ BufferAndTypeId(void* buffer, uint32_t typeId) : _buffer(buffer), _typeId(typeId) { }
+ std::atomic<void*>& get_atomic_buffer() noexcept { return _buffer; }
+ void* get_buffer_relaxed() noexcept { return _buffer.load(std::memory_order_relaxed); }
+ const void* get_buffer_relaxed() const noexcept { return _buffer.load(std::memory_order_relaxed); }
+ const void* get_buffer_acquire() const noexcept { return _buffer.load(std::memory_order_acquire); }
uint32_t getTypeId() const { return _typeId; }
void setTypeId(uint32_t typeId) { _typeId = typeId; }
private:
- MemPtr _buffer;
+ std::atomic<void*> _buffer;
uint32_t _typeId;
};
std::vector<BufferAndTypeId> _buffers; // For fast mapping with known types
@@ -62,7 +63,7 @@ protected:
// The primary buffer is used for allocations of new element(s) if no available slots are found in free lists.
std::vector<uint32_t> _primary_buffer_ids;
- void * getBuffer(uint32_t bufferId) { return _buffers[bufferId].getBuffer(); }
+ void* getBuffer(uint32_t bufferId) { return _buffers[bufferId].get_buffer_relaxed(); }
/**
* Hold list at freeze, when knowing how long elements must be held
@@ -185,7 +186,7 @@ protected:
* Get the primary buffer for the given type id.
*/
void* primary_buffer(uint32_t typeId) {
- return _buffers[_primary_buffer_ids[typeId]].getBuffer();
+ return _buffers[_primary_buffer_ids[typeId]].get_buffer_relaxed();
}
/**
@@ -277,22 +278,22 @@ public:
template <typename EntryType, typename RefType>
EntryType *getEntry(RefType ref) {
- return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset();
+ return static_cast<EntryType *>(_buffers[ref.bufferId()].get_buffer_relaxed()) + ref.offset();
}
template <typename EntryType, typename RefType>
const EntryType *getEntry(RefType ref) const {
- return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset();
+ return static_cast<const EntryType *>(_buffers[ref.bufferId()].get_buffer_acquire()) + ref.offset();
}
template <typename EntryType, typename RefType>
EntryType *getEntryArray(RefType ref, size_t arraySize) {
- return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize);
+ return static_cast<EntryType *>(_buffers[ref.bufferId()].get_buffer_relaxed()) + (ref.offset() * arraySize);
}
template <typename EntryType, typename RefType>
const EntryType *getEntryArray(RefType ref, size_t arraySize) const {
- return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize);
+ return static_cast<const EntryType *>(_buffers[ref.bufferId()].get_buffer_acquire()) + (ref.offset() * arraySize);
}
void dropBuffers();