aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/bucketdb/abstract_bucket_map.h
blob: 5c94fd680e7d2d5cf1273834efe4aee061b4530d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "read_guard.h"
#include <vespa/document/bucket/bucketid.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/vespalib/util/memoryusage.h>
#include <vespa/vespalib/util/time.h>
#include <cassert>
#include <functional>
#include <iosfwd>
#include <map>

namespace storage::bucketdb {

/*
 * Interface for content node bucket database implementations.
 *
 * Allows for multiple divergent implementations to exist of the
 * bucket database in a transition period.
 */
template <typename ValueT>
class AbstractBucketMap {
public:
    using key_type    = uint64_t; // Always a raw u64 bucket key.
    using mapped_type = ValueT;
    using size_type   = size_t;
    using BucketId    = document::BucketId;
    struct WrappedEntry;
private:
    // Responsible for releasing lock in map when out of scope.
    class LockKeeper {
    public:
        LockKeeper() noexcept
            : _map(nullptr), _key() {}
        LockKeeper(AbstractBucketMap& map, key_type key) noexcept
            : _map(&map), _key(key) {}
        LockKeeper(LockKeeper && rhs) noexcept
            : _map(rhs._map),
              _key(rhs._key)
        {
            rhs._map = nullptr;
        }
        LockKeeper & operator=(LockKeeper && rhs) noexcept {
            if (&rhs == this) return *this;
            cleanup();
            _map = rhs._map;
            _key = rhs._key;
            rhs._map = nullptr;
            return *this;
        }
        ~LockKeeper() { cleanup(); }
        AbstractBucketMap & map() { return *_map; }
        void unlock() {
            _map->unlock(_key);
            _map = nullptr;
        }
        [[nodiscard]] bool locked() const noexcept { return _map != nullptr; }
        const key_type & key() const noexcept { return _key; }
    private:
        void cleanup() {
            if (_map) unlock();
        }
        AbstractBucketMap * _map;
        key_type            _key;
    };
public:

    struct WrappedEntry {
        WrappedEntry() noexcept
            : _lockKeeper(),
              _value(),
              _clientId(nullptr),
              _exists(false),
              _preExisted(false)
        {}
        WrappedEntry(AbstractBucketMap& map, const key_type& key, const mapped_type& val,
                     const char* clientId, bool preExisted_) noexcept
            : _lockKeeper(map, key),
              _value(val),
              _clientId(clientId),
              _exists(true),
              _preExisted(preExisted_) {}
        WrappedEntry(AbstractBucketMap& map, const key_type& key, const char* clientId) noexcept
            : _lockKeeper(map, key),
              _value(),
              _clientId(clientId),
              _exists(false),
              _preExisted(false) {}
        WrappedEntry(WrappedEntry&&) noexcept = default;
        WrappedEntry& operator=(WrappedEntry&&) noexcept = default;
        ~WrappedEntry();

        mapped_type* operator->() { return &_value; }
        const mapped_type* operator->() const { return &_value; }
        mapped_type& operator*() { return _value; }
        const mapped_type& operator*() const { return _value; }

        const mapped_type *get() const { return &_value; }
        mapped_type *get() { return &_value; }

        void write();
        void remove();
        void unlock();
        [[nodiscard]] bool exists() const { return _exists; }
        [[nodiscard]] bool preExisted() const { return _preExisted; }
        [[nodiscard]] bool locked() const { return _lockKeeper.locked(); }
        const key_type& getKey() const { return _lockKeeper.key(); };

        BucketId getBucketId() const {
            return BucketId(BucketId::keyToBucketId(getKey()));
        }
    protected:
        LockKeeper  _lockKeeper;
        mapped_type _value;
        const char* _clientId;
        bool        _exists;
        bool        _preExisted;
    };

    struct LockId {
        key_type _key;
        const char* _owner;

        LockId() noexcept : _key(0), _owner("none - empty token") {}
        LockId(key_type key, const char* owner) noexcept
            : _key(key), _owner(owner)
        {
            assert(_owner);
        }

        size_t hash() const noexcept { return _key; }
        size_t operator%(size_t val) const noexcept { return _key % val; }
        bool operator==(const LockId& id) const noexcept { return (_key == id._key); }
        operator key_type() const { return _key; }
    };

    using EntryMap = std::map<BucketId, WrappedEntry>; // TODO ordered std::vector instead? map interface needed?

    enum Decision { ABORT, UPDATE, REMOVE, CONTINUE, DECISION_COUNT };

    AbstractBucketMap() = default;
    virtual ~AbstractBucketMap() = default;

    virtual void insert(const key_type& key, const mapped_type& value,
                        const char* client_id, bool has_lock,
                        bool& pre_existed) = 0;
    virtual bool erase(const key_type& key, const char* clientId, bool has_lock) = 0;

    virtual WrappedEntry get(const key_type& key, const char* clientId, bool createIfNonExisting) = 0;
    WrappedEntry get(const key_type& key, const char* clientId) {
        return get(key, clientId, false);
    }
    /**
     * Returns all buckets in the bucket database that can contain the given
     * bucket, and all buckets that that bucket contains.
     */
    virtual EntryMap getAll(const BucketId& bucketId, const char* clientId) = 0;
    /**
     * Returns all buckets in the bucket database that can contain the given
     * bucket. Usually, there should be only one such bucket, but in the case
     * of inconsistent splitting, there may be more than one.
     */
    virtual EntryMap getContained(const BucketId& bucketId, const char* clientId) = 0;
    /**
     * Returns true iff bucket has no superbuckets or sub-buckets in the
     * database. Usage assumption is that any operation that can cause the
     * bucket to become inconsistent will require taking its lock, so by
     * requiring the lock to be provided here we avoid race conditions.
     */
    virtual bool isConsistent(const WrappedEntry& entry) const = 0;

    static constexpr uint32_t DEFAULT_CHUNK_SIZE = 1000;

    /**
     * Iterate over the entire database contents, holding the global database
     * mutex for `chunkSize` processed entries at a time, yielding the current
     * thread between each chunk to allow other threads to get a chance at
     * acquiring a bucket lock.
     *
     * TODO deprecate in favor of snapshotting once fully on B-tree DB
     *
     * Type erasure of functor needed due to virtual indirection.
     */
    void for_each_chunked(std::function<Decision(uint64_t, const ValueT&)> func, const char* clientId,
                          vespalib::duration yieldTime = 10us, uint32_t chunkSize = DEFAULT_CHUNK_SIZE)
    {
        do_for_each_chunked(std::move(func), clientId, yieldTime, chunkSize);
    }

    void for_each_mutable_unordered(std::function<Decision(uint64_t, ValueT&)> func, const char* clientId)
    {
        do_for_each_mutable_unordered(std::move(func), clientId);
    }

    void for_each(std::function<Decision(uint64_t, const ValueT&)> func, const char* clientId) {
        do_for_each(std::move(func), clientId);
    }

    std::unique_ptr<bucketdb::ReadGuard<ValueT>> acquire_read_guard() const {
        return do_acquire_read_guard();
    }

    [[nodiscard]] virtual size_type size() const noexcept = 0;
    [[nodiscard]] virtual size_type getMemoryUsage() const noexcept = 0;
    [[nodiscard]] virtual vespalib::MemoryUsage detailed_memory_usage() const noexcept = 0;
    [[nodiscard]] virtual bool empty() const noexcept = 0;

    virtual void showLockClients(vespalib::asciistream& out) const = 0;

    virtual void print(std::ostream& out, bool verbose, const std::string& indent) const = 0;
private:
    virtual void unlock(const key_type& key) = 0; // Only for bucket lock guards
    virtual void do_for_each_chunked(std::function<Decision(uint64_t, const ValueT&)> func, const char* clientId,
                                     vespalib::duration yieldTime, uint32_t chunkSize) = 0;
    virtual void do_for_each_mutable_unordered(std::function<Decision(uint64_t, ValueT&)> func, const char* clientId) = 0;
    virtual void do_for_each(std::function<Decision(uint64_t, const ValueT&)> func, const char* clientId) = 0;
    virtual std::unique_ptr<bucketdb::ReadGuard<ValueT>> do_acquire_read_guard() const = 0;
};

template <typename ValueT>
std::ostream& operator<<(std::ostream& os, const AbstractBucketMap<ValueT>& map) {
    map.print(os, false, "");
    return os;
}

template <typename ValueT>
AbstractBucketMap<ValueT>::WrappedEntry::~WrappedEntry() = default;

template <typename ValueT>
void AbstractBucketMap<ValueT>::WrappedEntry::write() {
    assert(_lockKeeper.locked());
    assert(_value.verifyLegal());
    bool b;
    _lockKeeper.map().insert(_lockKeeper.key(), _value, _clientId, true, b);
    _lockKeeper.unlock();
}

template <typename ValueT>
void AbstractBucketMap<ValueT>::WrappedEntry::remove() {
    assert(_lockKeeper.locked());
    assert(_exists);
    _lockKeeper.map().erase(_lockKeeper.key(), _clientId, true);
    _lockKeeper.unlock();
}

template <typename ValueT>
void AbstractBucketMap<ValueT>::WrappedEntry::unlock() {
    assert(_lockKeeper.locked());
    _lockKeeper.unlock();
}

}