aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src/vespa/vespalib/datastore/datastorebase.h
blob: 97ead5024089185ed5c5a002b609af0359dfed2a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "bufferstate.h"
#include <vespa/vespalib/util/address_space.h>
#include <vespa/vespalib/util/generationholder.h>
#include <vespa/vespalib/util/memoryusage.h>
#include <vector>
#include <deque>
#include <atomic>

namespace vespalib::datastore {

/**
 * Abstract class used to store data of potential different types in underlying memory buffers.
 *
 * Reference to stored data is via a 32-bit handle (EntryRef).
 */
class DataStoreBase
{
public:
    /**
     * Hold list before freeze, before knowing how long elements must be held.
     */
    class ElemHold1ListElem
    {
    public:
        EntryRef _ref;
        size_t   _len;  // Aligned length

        ElemHold1ListElem(EntryRef ref, size_t len)
            : _ref(ref),
              _len(len)
        { }
    };

protected:
    using generation_t = vespalib::GenerationHandler::generation_t;
    using sgeneration_t = vespalib::GenerationHandler::sgeneration_t;

private:
    class BufferAndTypeId {
    public:
        using MemPtr = void *;
        BufferAndTypeId() : BufferAndTypeId(nullptr, 0) { }
        BufferAndTypeId(MemPtr buffer, uint32_t typeId) : _buffer(buffer), _typeId(typeId) { }
        MemPtr getBuffer() const { return _buffer; }
        MemPtr & getBuffer() { return _buffer; }
        uint32_t getTypeId() const { return _typeId; }
        void setTypeId(uint32_t typeId) { _typeId = typeId; }
    private:
        MemPtr     _buffer;
        uint32_t   _typeId;
    };
    std::vector<BufferAndTypeId> _buffers; // For fast mapping with known types
protected:
    // Provides a mapping from typeId -> primary buffer for that type.
    // The primary buffer is used for allocations of new element(s) if no available slots are found in free lists.
    std::vector<uint32_t> _primary_buffer_ids;

    void * getBuffer(uint32_t bufferId) { return _buffers[bufferId].getBuffer(); }

    /**
     * Hold list at freeze, when knowing how long elements must be held
     */
    class ElemHold2ListElem : public ElemHold1ListElem
    {
    public:
        generation_t _generation;

        ElemHold2ListElem(const ElemHold1ListElem &hold1, generation_t generation)
            : ElemHold1ListElem(hold1),
              _generation(generation)
        { }
    };

    using ElemHold1List = vespalib::Array<ElemHold1ListElem>;
    using ElemHold2List = std::deque<ElemHold2ListElem>;

    /**
     * Class used to hold the old buffer as part of fallbackResize().
     */
    class FallbackHold : public vespalib::GenerationHeldBase
    {
    public:
        BufferState::Alloc _buffer;
        size_t             _usedElems;
        BufferTypeBase    *_typeHandler;
        uint32_t           _typeId;

        FallbackHold(size_t bytesSize, BufferState::Alloc &&buffer, size_t usedElems,
                     BufferTypeBase *typeHandler, uint32_t typeId);

        ~FallbackHold() override;
    };

    class BufferHold;

public:
    class MemStats
    {
    public:
        size_t _allocElems;
        size_t _usedElems;
        size_t _deadElems;
        size_t _holdElems;
        size_t _allocBytes;
        size_t _usedBytes;
        size_t _deadBytes;
        size_t _holdBytes;
        uint32_t _freeBuffers;
        uint32_t _activeBuffers;
        uint32_t _holdBuffers;

        MemStats()
            : _allocElems(0),
              _usedElems(0),
              _deadElems(0),
              _holdElems(0),
              _allocBytes(0),
              _usedBytes(0),
              _deadBytes(0),
              _holdBytes(0),
              _freeBuffers(0),
              _activeBuffers(0),
              _holdBuffers(0)
        { }

        MemStats& operator+=(const MemStats &rhs) {
            _allocElems += rhs._allocElems;
            _usedElems += rhs._usedElems;
            _deadElems += rhs._deadElems;
            _holdElems += rhs._holdElems;
            _allocBytes += rhs._allocBytes;
            _usedBytes += rhs._usedBytes;
            _deadBytes += rhs._deadBytes;
            _holdBytes += rhs._holdBytes;
            _freeBuffers += rhs._freeBuffers;
            _activeBuffers += rhs._activeBuffers;
            _holdBuffers += rhs._holdBuffers;
            return *this;
        }
    };

private:
    std::vector<BufferState> _states;
protected:
    std::vector<BufferTypeBase *> _typeHandlers; // TypeId -> handler

    std::vector<BufferState::FreeListList> _freeListLists;
    bool _freeListsEnabled;
    bool _initializing;

    ElemHold1List _elemHold1List;
    ElemHold2List _elemHold2List;

    const uint32_t _numBuffers;
    uint32_t       _hold_buffer_count;
    const size_t   _maxArrays;
    mutable std::atomic<uint64_t> _compaction_count;

    vespalib::GenerationHolder _genHolder;

    DataStoreBase(uint32_t numBuffers, size_t maxArrays);
    DataStoreBase(const DataStoreBase &) = delete;
    DataStoreBase &operator=(const DataStoreBase &) = delete;

    virtual ~DataStoreBase();

    /**
     * Get the next buffer id after the given buffer id.
     */
    uint32_t nextBufferId(uint32_t bufferId) {
        uint32_t ret = bufferId + 1;
        if (ret == _numBuffers)
            ret = 0;
        return ret;
    }

    /**
     * Get the primary buffer for the given type id.
     */
    void* primary_buffer(uint32_t typeId) {
        return _buffers[_primary_buffer_ids[typeId]].getBuffer();
    }

    /**
     * Trim elem hold list, freeing elements that no longer needs to be held.
     *
     * @param usedGen       lowest generation that is still used.
     */
    virtual void trimElemHoldList(generation_t usedGen) = 0;

    virtual void clearElemHoldList() = 0;

    template <typename BufferStateActiveFilter>
    uint32_t startCompactWorstBuffer(uint32_t initWorstBufferId, BufferStateActiveFilter &&filterFunc);
    void markCompacting(uint32_t bufferId);
public:
    uint32_t addType(BufferTypeBase *typeHandler);
    void init_primary_buffers();

    /**
     * Ensure that the primary buffer for the given type has a given number of elements free at end.
     * Switch to new buffer if current buffer is too full.
     *
     * @param typeId      Registered data type for buffer.
     * @param elemsNeeded Number of elements needed to be free.
     */
    void ensureBufferCapacity(uint32_t typeId, size_t elemsNeeded) {
        if (__builtin_expect(elemsNeeded >
                             _states[_primary_buffer_ids[typeId]].remaining(),
                             false)) {
            switch_or_grow_primary_buffer(typeId, elemsNeeded);
        }
    }

    /**
     * Put buffer on hold list, as part of compaction.
     *
     * @param bufferId      Id of buffer to be held.
     */
    void holdBuffer(uint32_t bufferId);

    /**
     * Switch to a new primary buffer, typically in preparation for compaction
     * or when the current primary buffer no longer has free space.
     *
     * @param typeId      Registered data type for buffer.
     * @param elemsNeeded Number of elements needed to be free.
     */
    void switch_primary_buffer(uint32_t typeId, size_t elemsNeeded);

    bool consider_grow_active_buffer(uint32_t type_id, size_t elems_needed);
    void switch_or_grow_primary_buffer(uint32_t typeId, size_t elemsNeeded);

    vespalib::MemoryUsage getMemoryUsage() const;

    vespalib::AddressSpace getAddressSpaceUsage() const;

    /**
     * Get the primary buffer id for the given type id.
     */
    uint32_t get_primary_buffer_id(uint32_t typeId) const { return _primary_buffer_ids[typeId]; }
    const BufferState &getBufferState(uint32_t bufferId) const { return _states[bufferId]; }
    BufferState &getBufferState(uint32_t bufferId) { return _states[bufferId]; }
    uint32_t getNumBuffers() const { return _numBuffers; }
    bool hasElemHold1() const { return !_elemHold1List.empty(); }

    /**
     * Transfer element holds from hold1 list to hold2 list.
     */
    void transferElemHoldList(generation_t generation);

    /**
     * Transfer holds from hold1 to hold2 lists, assigning generation.
     */
    void transferHoldLists(generation_t generation);

    /**
     * Hold of buffer has ended.
     */
    void doneHoldBuffer(uint32_t bufferId);

    /**
     * Trim hold lists, freeing buffers that no longer needs to be held.
     *
     * @param usedGen       lowest generation that is still used.
     */
    void trimHoldLists(generation_t usedGen);

    void clearHoldLists();

    template <typename EntryType, typename RefType>
    EntryType *getEntry(RefType ref) {
        return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset();
    }

    template <typename EntryType, typename RefType>
    const EntryType *getEntry(RefType ref) const {
        return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + ref.offset();
    }

    template <typename EntryType, typename RefType>
    EntryType *getEntryArray(RefType ref, size_t arraySize) {
        return static_cast<EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize);
    }

    template <typename EntryType, typename RefType>
    const EntryType *getEntryArray(RefType ref, size_t arraySize) const {
        return static_cast<const EntryType *>(_buffers[ref.bufferId()].getBuffer()) + (ref.offset() * arraySize);
    }

    void dropBuffers();


    void incDead(uint32_t bufferId, size_t deadElems) {
        BufferState &state = _states[bufferId];
        state.incDeadElems(deadElems);
    }

    /**
     * Enable free list management.
     * This only works for fixed size elements.
     */
    void enableFreeLists();

    /**
     * Disable free list management.
     */
    void disableFreeLists();

    /**
     * Enable free list management.
     * This only works for fixed size elements.
     */
    void enableFreeList(uint32_t bufferId);

    /**
     * Disable free list management.
     */
    void disableFreeList(uint32_t bufferId);
    void disableElemHoldList();

    bool has_free_lists_enabled() const { return _freeListsEnabled; }

    /**
     * Returns the free list for the given type id.
     */
    BufferState::FreeListList &getFreeList(uint32_t typeId) {
        return _freeListLists[typeId];
    }

    /**
     * Returns aggregated memory statistics for all buffers in this data store.
     */
    MemStats getMemStats() const;

    /**
     * Assume that no readers are present while data structure is being initialized.
     */
    void setInitializing(bool initializing) { _initializing = initializing; }

private:
    /**
     * Switch buffer state to active for the given buffer.
     *
     * @param bufferId    Id of buffer to be active.
     * @param typeId      Registered data type for buffer.
     * @param elemsNeeded Number of elements needed to be free.
     */
    void onActive(uint32_t bufferId, uint32_t typeId, size_t elemsNeeded);

    void inc_hold_buffer_count();
public:
    uint32_t getTypeId(uint32_t bufferId) const {
        return _buffers[bufferId].getTypeId();
    }

    std::vector<uint32_t> startCompact(uint32_t typeId);

    void finishCompact(const std::vector<uint32_t> &toHold);
    void fallbackResize(uint32_t bufferId, size_t elementsNeeded);

    vespalib::GenerationHolder &getGenerationHolder() {
        return _genHolder;
    }

    uint32_t startCompactWorstBuffer(uint32_t typeId);
    std::vector<uint32_t> startCompactWorstBuffers(bool compactMemory, bool compactAddressSpace);
    uint64_t get_compaction_count() const { return _compaction_count.load(std::memory_order_relaxed); }
    void inc_compaction_count() const { ++_compaction_count; }
    bool has_held_buffers() const noexcept { return _hold_buffer_count != 0u; }
};

}