aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src/vespa/vespalib/datastore/bufferstate.h
blob: c3e6110cc525f4987bf5c1fe8a896b29425f1666 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "buffer_free_list.h"
#include "buffer_stats.h"
#include "buffer_type.h"
#include "entryref.h"
#include <vespa/vespalib/util/generationhandler.h>
#include <vespa/vespalib/util/alloc.h>
#include <vespa/vespalib/util/array.h>

namespace vespalib::datastore {

/**
 * Represents a memory allocated buffer (used in a data store) with its state.
 *
 * This class has no direct knowledge of what kind of data is stored in the buffer.
 * It uses a type handler (BufferTypeBase) to manage allocation and de-allocation of a specific data type.
 *
 * A newly allocated buffer starts in state FREE where no memory is allocated.
 * It then transitions to state ACTIVE via onActive(), where memory is allocated based on calculation from BufferTypeBase.
 * It then transitions to state HOLD via onHold() when the buffer is no longer needed.
 * It is kept in this state until all reader threads are no longer accessing the buffer.
 * Finally, it transitions back to FREE via onFree() and memory is de-allocated.
 *
 * This class also supports use of free lists, where previously allocated elements in the buffer can be re-used.
 * First the element is put on hold, then on the free list (counted as dead) to be re-used.
 */
class BufferState
{
public:
    using Alloc = vespalib::alloc::Alloc;

    enum class State : uint8_t {
        FREE,
        ACTIVE,
        HOLD
    };

private:
    InternalBufferStats _stats;
    BufferFreeList      _free_list;
    std::atomic<BufferTypeBase*> _typeHandler;
    Alloc              _buffer;
    uint32_t           _arraySize;
    uint16_t           _typeId;
    std::atomic<State> _state;
    bool               _disableElemHoldList : 1;
    bool               _compacting : 1;

public:
    /**
     * TODO: Check if per-buffer free lists are useful, or if
     * compaction should always be used to free up whole buffers.
     */

    BufferState();
    BufferState(const BufferState &) = delete;
    BufferState & operator=(const BufferState &) = delete;
    ~BufferState();

    /**
     * Transition from FREE to ACTIVE state.
     *
     * @param bufferId       Id of buffer to be active.
     * @param typeId         Registered data type id for buffer.
     * @param typeHandler    Type handler for registered data type.
     * @param elementsNeeded Number of elements needed to be free in the memory allocated.
     * @param buffer         Start of allocated buffer return value.
     */
    void onActive(uint32_t bufferId, uint32_t typeId, BufferTypeBase *typeHandler,
                  size_t elementsNeeded, std::atomic<void*>& buffer);

    /**
     * Transition from ACTIVE to HOLD state.
     */
    void onHold(uint32_t buffer_id);

    /**
     * Transition from HOLD to FREE state.
     */
    void onFree(std::atomic<void*>& buffer);

    /**
     * Disable hold of elements, just mark elements as dead without cleanup.
     * Typically used when tearing down data structure in a controlled manner.
     */
    void disable_elem_hold_list();

    /**
     * Update stats to reflect that the given elements are put on hold.
     * Returns true if element hold list is disabled for this buffer.
     */
    bool hold_elems(size_t num_elems, size_t extra_bytes);

    /**
     * Free the given elements and update stats accordingly.
     *
     * The given entry ref is put on the free list (if enabled).
     * Hold cleaning of elements is executed on the buffer type.
     */
    void free_elems(EntryRef ref, size_t num_elems, size_t ref_offset);

    BufferStats& stats() { return _stats; }
    const BufferStats& stats() const { return _stats; }

    void enable_free_list(FreeList& type_free_list) { _free_list.enable(type_free_list); }
    void disable_free_list() { _free_list.disable(); }

    size_t size() const { return _stats.size(); }
    size_t capacity() const { return _stats.capacity(); }
    size_t remaining() const { return _stats.remaining(); }
    void dropBuffer(uint32_t buffer_id, std::atomic<void*>& buffer);
    uint32_t getTypeId() const { return _typeId; }
    uint32_t getArraySize() const { return _arraySize; }
    bool getCompacting() const { return _compacting; }
    void setCompacting() { _compacting = true; }
    uint32_t get_used_arrays() const noexcept { return size() / _arraySize; }
    void fallbackResize(uint32_t bufferId, size_t elementsNeeded, std::atomic<void*>& buffer, Alloc &holdBuffer);

    bool isActive(uint32_t typeId) const {
        return (isActive() && (_typeId == typeId));
    }
    bool isActive() const { return (getState() == State::ACTIVE); }
    bool isOnHold() const { return (getState() == State::HOLD); }
    bool isFree() const { return (getState() == State::FREE); }
    State getState() const { return _state.load(std::memory_order_relaxed); }
    const BufferTypeBase *getTypeHandler() const { return _typeHandler.load(std::memory_order_relaxed); }
    BufferTypeBase *getTypeHandler() { return _typeHandler.load(std::memory_order_relaxed); }

    void resume_primary_buffer(uint32_t buffer_id);
};

class BufferAndMeta {
public:
    BufferAndMeta() : BufferAndMeta(nullptr, nullptr, 0, 0) { }
    std::atomic<void*>& get_atomic_buffer() noexcept { return _buffer; }
    void* get_buffer_relaxed() noexcept { return _buffer.load(std::memory_order_relaxed); }
    const void* get_buffer_acquire() const noexcept { return _buffer.load(std::memory_order_acquire); }
    uint32_t getTypeId() const { return _typeId; }
    uint32_t getArraySize() const { return _arraySize; }
    BufferState * get_state_relaxed() { return _state.load(std::memory_order_relaxed); }
    const BufferState * get_state_acquire() const { return _state.load(std::memory_order_acquire); }
    void setTypeId(uint32_t typeId) { _typeId = typeId; }
    void setArraySize(uint32_t arraySize) { _arraySize = arraySize; }
    void set_state(BufferState * state) { _state.store(state, std::memory_order_release); }
private:
    BufferAndMeta(void* buffer, BufferState * state, uint32_t typeId, uint32_t arraySize)
        : _buffer(buffer),
          _state(state),
          _typeId(typeId),
          _arraySize(arraySize)
    { }
    std::atomic<void*>        _buffer;
    std::atomic<BufferState*> _state;
    uint32_t                  _typeId;
    uint32_t                  _arraySize;
};

}