aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src/vespa/vespalib/datastore/buffer_type.h
blob: 3edafbc8e69e426bcbe73af8fceafebf2b4f21ec (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "atomic_entry_ref.h"
#include <string>
#include <vector>

namespace vespalib::alloc { class MemoryAllocator; }

namespace vespalib::datastore {

using EntryCount = uint32_t;

/**
 * Abstract class used to manage allocation and de-allocation of a specific data type in underlying memory buffers in a data store.
 * Each buffer is owned by an instance of BufferState.
 *
 * This class handles allocation of both single elements (_arraySize = 1) and array of elements (_arraySize > 1).
 * The strategy for how to grow buffers is specified as well.
 */
class BufferTypeBase
{
public:
    using EntryCount = vespalib::datastore::EntryCount;
    class CleanContext {
    private:
        std::atomic<size_t> &_extraUsedBytes;
        std::atomic<size_t> &_extraHoldBytes;
    public:
        CleanContext(std::atomic<size_t>& extraUsedBytes, std::atomic<size_t>& extraHoldBytes)
            : _extraUsedBytes(extraUsedBytes),
              _extraHoldBytes(extraHoldBytes)
        {}
        void extraBytesCleaned(size_t value);
    };

    BufferTypeBase(const BufferTypeBase &rhs) = delete;
    BufferTypeBase & operator=(const BufferTypeBase &rhs) = delete;
    BufferTypeBase(BufferTypeBase &&rhs) noexcept;
    BufferTypeBase & operator=(BufferTypeBase &&rhs) noexcept;
    BufferTypeBase(uint32_t entry_size_in, uint32_t buffer_underflow_size_in, uint32_t arraySize, uint32_t min_entries, uint32_t max_entries) noexcept;
    BufferTypeBase(uint32_t entry_size_in, uint32_t buffer_underflow_size_in, uint32_t arraySize, uint32_t min_entries, uint32_t max_entries,
                   uint32_t num_entries_for_new_buffer, float allocGrowFactor) noexcept;
    virtual ~BufferTypeBase();
    virtual void destroy_entries(void *buffer, EntryCount num_entries) = 0;
    virtual void fallback_copy(void *newBuffer, const void *oldBuffer, EntryCount num_entries) = 0;

    /**
     * Return number of reserved entries at start of buffer, to avoid
     * invalid reference.
     */
    virtual EntryCount get_reserved_entries(uint32_t bufferId) const;

    /**
     * Initialize reserved elements at start of buffer.
     */
    virtual void initialize_reserved_entries(void *buffer, EntryCount reserved_entries) = 0;
    size_t entry_size() const noexcept { return _entry_size; }
    uint32_t buffer_underflow_size() const noexcept { return _buffer_underflow_size; }
    virtual void clean_hold(void *buffer, size_t offset, EntryCount num_entries, CleanContext cleanCtx) = 0;
    size_t getArraySize() const noexcept { return _arraySize; }
    virtual void on_active(uint32_t bufferId, std::atomic<EntryCount>* used_entries, std::atomic<EntryCount>* dead_entries, void* buffer);
    void on_hold(uint32_t buffer_id, const std::atomic<EntryCount>* used_entries, const std::atomic<EntryCount>* dead_entries);
    virtual void on_free(EntryCount used_entries);
    void resume_primary_buffer(uint32_t buffer_id, std::atomic<EntryCount>* used_entries, std::atomic<EntryCount>* dead_entries);
    virtual const alloc::MemoryAllocator* get_memory_allocator() const;
    virtual bool is_dynamic_array_buffer_type() const noexcept;

    /**
     * Calculate number of entries to allocate for new buffer given how many free entries are needed.
     */
    virtual size_t calc_entries_to_alloc(uint32_t bufferId, EntryCount free_entries_needed, bool resizing) const;

    void clamp_max_entries(uint32_t max_entries);

    uint32_t get_active_buffers_count() const { return _active_buffers.size(); }
    const std::vector<uint32_t>& get_active_buffers() const noexcept { return _active_buffers; }
    size_t get_max_entries() const { return _max_entries; }
    uint32_t get_scaled_num_entries_for_new_buffer() const;
    uint32_t get_num_entries_for_new_buffer() const noexcept { return _num_entries_for_new_buffer; }
protected:

    struct BufferCounts {
        EntryCount used_entries;
        EntryCount dead_entries;
        BufferCounts() : used_entries(0), dead_entries(0) {}
        BufferCounts(EntryCount used_entries_in, EntryCount dead_entries_in)
            : used_entries(used_entries_in), dead_entries(dead_entries_in)
        {}
    };

    /**
     * Tracks aggregated counts for all buffers that are in state ACTIVE.
     */
    class AggregatedBufferCounts {
    private:
        struct ActiveBufferCounts {
            const std::atomic<EntryCount>* used_ptr;
            const std::atomic<EntryCount>* dead_ptr;
            ActiveBufferCounts() noexcept : used_ptr(nullptr), dead_ptr(nullptr) {}
            ActiveBufferCounts(const std::atomic<EntryCount>* used_ptr_in, const std::atomic<EntryCount>* dead_ptr_in) noexcept
                : used_ptr(used_ptr_in), dead_ptr(dead_ptr_in)
            {}
        };
        std::vector<ActiveBufferCounts> _counts;

    public:
        AggregatedBufferCounts();
        void add_buffer(const std::atomic<EntryCount>* used_entries, const std::atomic<EntryCount>* dead_entries);
        void remove_buffer(const std::atomic<EntryCount>* used_entries, const std::atomic<EntryCount>* dead_entries);
        BufferCounts last_buffer() const;
        BufferCounts all_buffers() const;
        bool empty() const { return _counts.empty(); }
    };

    uint32_t _entry_size;   // Number of bytes in an allocation unit
    uint32_t _arraySize;    // Number of elements in an allocation unit

    /*
     * Buffer underflow size is the size of an area before the start
     * of the logical buffer that is safe to access (part of the same
     * memory alloation as the buffer itself). This allows for data
     * belonging to an entry to be placed at the end of what is normally
     * the last part of the previos entry (e.g. dynamic array size
     * for the dynamic array buffer type).
     */
    uint32_t _buffer_underflow_size;
    uint32_t _min_entries;  // Minimum number of entries to allocate in a buffer
    uint32_t _max_entries;  // Maximum number of entries to allocate in a buffer
    // Number of entries needed before allocating a new buffer instead of just resizing the first one
    uint32_t _num_entries_for_new_buffer;
    float    _allocGrowFactor;
    uint32_t _holdBuffers;
    size_t   _hold_used_entries;  // Number of used entries in all held buffers for this type.
    AggregatedBufferCounts _aggr_counts;
    std::vector<uint32_t>  _active_buffers;
};

/**
 * Concrete class used to manage allocation and de-allocation of elements of type ElemType in data store buffers.
 */
template <typename ElemT, typename EmptyT = ElemT>
class BufferType : public BufferTypeBase
{
public:
    using ElemType = ElemT;
    using EmptyType = EmptyT;
protected:
    static const ElemType& empty_entry() noexcept;

public:
    BufferType() noexcept : BufferType(1,1,1) {}
    BufferType(const BufferType &rhs) = delete;
    BufferType & operator=(const BufferType &rhs) = delete;
    BufferType(BufferType && rhs) noexcept = default;
    BufferType & operator=(BufferType && rhs) noexcept = default;
    BufferType(uint32_t arraySize, uint32_t min_entries, uint32_t max_entries) noexcept;
    BufferType(uint32_t arraySize, uint32_t min_entries, uint32_t max_entries,
               uint32_t num_entries_for_new_buffer, float allocGrowFactor) noexcept;
    ~BufferType() override;
    void destroy_entries(void *buffer, EntryCount num_entries) override;
    void fallback_copy(void *newBuffer, const void *oldBuffer, EntryCount num_entries) override;
    void initialize_reserved_entries(void *buffer, EntryCount reserved_entries) override;
    void clean_hold(void *buffer, size_t offset, EntryCount num_entries, CleanContext cleanCxt) override;
};

extern template class BufferType<char>;
extern template class BufferType<uint8_t>;
extern template class BufferType<uint32_t>;
extern template class BufferType<uint64_t>;
extern template class BufferType<int32_t>;
extern template class BufferType<std::string>;
extern template class BufferType<AtomicEntryRef>;

}