aboutsummaryrefslogtreecommitdiffstats
path: root/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
blob: 859c86875200430312f21d9b0ce2fa13833eb6e0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * \class storage::spi::dummy::DummyPersistence
 * \ingroup dummy
 *
 * \brief Simple implementation of the persistence SPI.
 */

#pragma once

#include <vespa/persistence/spi/abstractpersistenceprovider.h>
#include <vespa/persistence/spi/docentry.h>
#include <vespa/document/base/globalid.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <atomic>
#include <map>
#include <mutex>
#include <condition_variable>

namespace document {
class DocumentTypeRepo;
    class FieldSet;
    namespace select { class Node; }
}

namespace storage::spi::dummy {

enum class LockMode {
    Exclusive,
    Shared
};

struct BucketEntry
{
    DocEntry::SP entry;
    GlobalId gid;

    BucketEntry(DocEntry::SP e, const GlobalId& g) noexcept
        : entry(std::move(e)),
          gid(g)
    { }
};

struct BucketContent {
    typedef vespalib::hash_map<
        document::GlobalId,
        DocEntry::SP,
        document::GlobalId::hash
    > GidMapType;

    using SP = std::shared_ptr<BucketContent>;

    std::vector<BucketEntry> _entries;
    GidMapType _gidMap;
    mutable BucketInfo _info;
    mutable std::atomic<bool> _inUse;
    mutable bool _outdatedInfo;
    bool _active;

    BucketContent() noexcept;
    ~BucketContent();


    uint32_t computeEntryChecksum(const BucketEntry&) const;
    BucketChecksum updateRollingChecksum(uint32_t entryChecksum);

    /**
     * Get bucket info, potentially recomputing it if it's outdated. In the
     * latter case, the cached bucket info will be updated.
     */
    const BucketInfo& getBucketInfo() const;
    BucketInfo& getMutableBucketInfo() { return _info; }
    bool hasTimestamp(Timestamp) const;
    void insert(DocEntry::SP);
    DocEntry::SP getEntry(const DocumentId&) const;
    DocEntry::SP getEntry(Timestamp) const;
    void eraseEntry(Timestamp t);
    void setActive(bool active = true) {
        _active = active;
        _info = BucketInfo(_info.getChecksum(),
                           _info.getDocumentCount(),
                           _info.getDocumentSize(),
                           _info.getEntryCount(),
                           _info.getUsedSize(),
                           _info.getReady(),
                           active ? BucketInfo::ACTIVE : BucketInfo::NOT_ACTIVE);
    }
    bool isActive() const { return _active; }
    void setOutdatedInfo(bool outdated) { _outdatedInfo = outdated; }
    bool hasOutdatedInfo() const { return _outdatedInfo; }
};

struct Iterator {
    using UP = std::unique_ptr<Iterator>;
    Bucket _bucket;
    std::vector<Timestamp> _leftToIterate;
    std::shared_ptr<document::FieldSet> _fieldSet;
};

class DummyPersistence;

class BucketContentGuard
{
    BucketContentGuard(const BucketContentGuard&);
    BucketContentGuard& operator=(const BucketContentGuard&);
public:
    using UP = std::unique_ptr<BucketContentGuard>;

    BucketContentGuard(DummyPersistence& persistence,
                       BucketContent& content,
                       LockMode lock_mode)
        : _persistence(persistence),
          _content(content),
          _lock_mode(lock_mode)
    {
    }
    ~BucketContentGuard();

    BucketContent& getContent() noexcept {
        return _content;
    }

    BucketContent* operator->() noexcept {
        return &_content;
    }

    BucketContent& operator*() noexcept {
        return _content;
    }
private:
    DummyPersistence& _persistence;
    BucketContent& _content;
    LockMode _lock_mode;
};

class DummyPersistence : public AbstractPersistenceProvider
{
public:
    DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo>& repo);
    ~DummyPersistence() override;

    Result initialize() override;
    BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;

    void setModifiedBuckets(BucketIdListResult::List result);

    // Important: any subsequent mutations to the bucket set in fake_info will reset
    // the bucket info due to implicit recalculation of bucket info.
    void set_fake_bucket_set(const std::vector<std::pair<Bucket, BucketInfo>>& fake_info);

    /**
     * Returns the list set by setModifiedBuckets(), then clears
     * the list.
     */
    BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;

    Result setClusterState(BucketSpace bucketSpace, const ClusterState& newState) override;
    void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override;
    BucketInfoResult getBucketInfo(const Bucket&) const override;
    GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override;
    void putAsync(const Bucket&, Timestamp, DocumentSP, OperationComplete::UP) override;
    void removeAsync(const Bucket& b, std::vector<spi::IdAndTimestamp> ids, OperationComplete::UP) override;
    void updateAsync(const Bucket&, Timestamp, DocumentUpdateSP, OperationComplete::UP) override;

    CreateIteratorResult
    createIterator(const Bucket &bucket, FieldSetSP fs, const Selection &, IncludedVersions, Context &context) override;

    IterateResult iterate(IteratorId, uint64_t maxByteSize) const override;
    Result destroyIterator(IteratorId) override;

    void createBucketAsync(const Bucket&, OperationComplete::UP) noexcept override;
    void deleteBucketAsync(const Bucket&, OperationComplete::UP) noexcept override;

    Result split(const Bucket& source, const Bucket& target1, const Bucket& target2) override;

    Result join(const Bucket& source1, const Bucket& source2, const Bucket& target) override;

    std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override;
    std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override;
    std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); }

    /**
     * The following methods are used only for unit testing.
     * DummyPersistence is used many places to test the framework around it.
     */

    /*
     * Dumps the contents of a bucket to a string and returns it.
     */
    std::string dumpBucket(const Bucket&) const;

    /**
     * Returns true if the given bucket has been tagged as active.
     */
    bool isActive(const Bucket&) const;

    const ClusterState& getClusterState() const {
        return *_clusterState;
    }

private:
    void verifyInitialized() const noexcept __attribute__((noinline));
    friend class BucketContentGuard;
    // Const since funcs only alter mutable field in BucketContent
    BucketContentGuard::UP acquireBucketWithLock(const Bucket& b, LockMode lock_mode = LockMode::Exclusive) const;
    void releaseBucketNoLock(const BucketContent& bc, LockMode lock_mode = LockMode::Exclusive) const noexcept;
    void internal_create_bucket(const Bucket &b);

    mutable bool _initialized;
    std::shared_ptr<const document::DocumentTypeRepo> _repo;
    using Content = vespalib::hash_map<Bucket, BucketContent::SP, document::BucketId::hash>;

    Content _content;
    IteratorId _nextIterator;
    mutable std::map<IteratorId, Iterator::UP> _iterators;
    mutable std::mutex      _monitor;
    std::condition_variable _cond;

    std::unique_ptr<ClusterState> _clusterState;
    std::weak_ptr<BucketExecutor> _bucket_executor;

    mutable BucketIdListResult::List _modifiedBuckets;
    std::unique_ptr<document::select::Node> parseDocumentSelection(const string& documentSelection, bool allowLeaf);
    Content::const_iterator find(const Bucket & bucket) const __attribute__((noinline));
};

}