aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/bucketdb/bucketdatabase.h
blob: d3fdce8f0d8cb972ff70ff59103e29abfc4a451c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * Interface for bucket database implementations in the distributor.
 */
#pragma once

#include "db_merger.h"
#include "read_guard.h"
#include <vespa/vespalib/util/printable.h>
#include <vespa/storage/bucketdb/bucketinfo.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/vespalib/util/memoryusage.h>

namespace storage {

class BucketDatabase : public vespalib::Printable
{
public:
    template <typename BucketInfoType>
    class EntryBase {
        document::BucketId _bucketId;
        BucketInfoType _info;

    public:
        EntryBase() noexcept : _bucketId(0), _info() {} // Invalid entry
        EntryBase(const document::BucketId& bId, BucketInfoType&& bucketInfo) noexcept
            : _bucketId(bId),
              _info(std::move(bucketInfo))
        {}
        explicit EntryBase(const document::BucketId& bId) noexcept : _bucketId(bId), _info() {}
        EntryBase(EntryBase &&) noexcept = default;
        EntryBase & operator=(EntryBase &&) noexcept = default;
        EntryBase(const EntryBase &) = default;
        EntryBase & operator=(const EntryBase &) = default;
        bool operator==(const EntryBase& other) const noexcept {
            return (_bucketId == other._bucketId && _info == other._info);
        }
        bool valid() const noexcept { return (_bucketId.getRawId() != 0); }
        std::string toString() const;

        const document::BucketId& getBucketId() const noexcept { return _bucketId; }
        const BucketInfoType& getBucketInfo() const noexcept { return _info; }
        BucketInfoType& getBucketInfo() noexcept { return _info; }
        BucketInfoType* operator->() noexcept { return &_info; }
        const BucketInfoType* operator->() const noexcept { return &_info; }

        static EntryBase createInvalid() noexcept {
            return EntryBase();
        }
    };

    using Entry = EntryBase<BucketInfo>;
    // TODO avoid needing to touch memory just to get to the const entry ref
    // TODO  -> lazy ID to ConstArrayRef resolve
    using ConstEntryRef = EntryBase<ConstBucketInfoRef>;

    struct EntryProcessor {
        virtual ~EntryProcessor() = default;
        /** Return false to stop iterating. */
        virtual bool process(const ConstEntryRef& e) = 0;
    };

    /*
     * Interface class used by process_update() for updating an entry
     * with a single call to the bucket database.
     */
    struct EntryUpdateProcessor {
        virtual ~EntryUpdateProcessor() = default;
        virtual Entry create_entry(const document::BucketId& bucket) const = 0;
        /*
         * Modifies entry.
         * returns true if modified entry should be kept.
         * returns false if entry should be removed.
         */
        virtual bool process_entry(Entry &entry) const = 0;
    };

    ~BucketDatabase() override = default;

    virtual Entry get(const document::BucketId& bucket) const = 0;
    virtual void remove(const document::BucketId& bucket) = 0;

    /**
     * Puts all entries that may contain the given bucket id
     * into the given entry vector, including itself if found.
     */
    virtual void getParents(const document::BucketId& childBucket,
                            std::vector<Entry>& entries) const = 0;

    /**
     * Puts the sum of entries from getParents() and getChildren() into
     * the given vector.
     */
    virtual void getAll(const document::BucketId& bucket,
                        std::vector<Entry>& entries) const = 0;

    /**
     * Updates the entry for the given bucket. Adds the bucket to the bucket
     * database if it wasn't found.
     */
    virtual void update(const Entry& newEntry) = 0;

    virtual void process_update(const document::BucketId& bucket, EntryUpdateProcessor &processor, bool create_if_nonexisting) = 0;

    virtual void for_each_lower_bound(EntryProcessor&, const document::BucketId& at_or_after) const = 0;
    void for_each_lower_bound(EntryProcessor& proc) const {
        for_each_lower_bound(proc, document::BucketId());
    }

    virtual void for_each_upper_bound(EntryProcessor&, const document::BucketId& after) const = 0;
    void for_each_upper_bound(EntryProcessor& proc) const {
        for_each_upper_bound(proc, document::BucketId());
    }

    using TrailingInserter = bucketdb::TrailingInserter<Entry>;
    using Merger           = bucketdb::Merger<Entry>;
    using MergingProcessor = bucketdb::MergingProcessor<Entry>;

    /**
     * Iterate over the bucket database in bucket key order, allowing an arbitrary
     * number of buckets to be inserted, updated and skipped in a way that is
     * optimized for the backing DB implementation.
     *
     * Merging happens in two stages:
     *  1) The MergeProcessor argument's merge() function is invoked for each existing
     *     bucket in the database. At this point new buckets ordered before the iterated
     *     bucket may be inserted and the iterated bucket may be skipped or updated.
     *  2) The MergeProcessor argument's insert_remaining_at_end() function is invoked
     *     once when all buckets have been iterated over. This enables the caller to
     *     insert new buckets that sort after the last iterated bucket.
     *
     * Changes made to the database are not guaranteed to be visible until
     * merge() returns.
     */
    virtual void merge(MergingProcessor&) = 0;

    /**
     * Get the first bucket that does _not_ compare less than or equal to
     * value in standard reverse bucket bit order (i.e. the next bucket in
     * DB iteration order after value).
     *
     * If no such bucket exists, an invalid (empty) entry should be returned.
     * If upperBound is used as part of database iteration, such a return value
     * in effect signals that the end of the database has been reached.
     */
    virtual Entry upperBound(const document::BucketId& value) const = 0;

    Entry getNext(const document::BucketId& last) const;
    
    virtual uint64_t size() const = 0;
    virtual void clear() = 0;

    // FIXME: make const as soon as Judy distributor bucket database
    // has been removed, as it has no such function and will always
    // mutate its internal database!
    virtual document::BucketId getAppropriateBucket(
            uint16_t minBits,
            const document::BucketId& bid) = 0;
    /**
     * Based on the minimum split bits and the existing buckets,
     * creates the correct new bucket in the bucket database,
     * and returns the resulting entry.
     */
    BucketDatabase::Entry createAppropriateBucket(
            uint16_t minBits,
            const document::BucketId& bid);

    virtual uint32_t childCount(const document::BucketId&) const = 0;

    using ReadGuard = bucketdb::ReadGuard<Entry, ConstEntryRef>;

    virtual std::unique_ptr<ReadGuard> acquire_read_guard() const {
        return std::unique_ptr<bucketdb::ReadGuard<Entry, ConstEntryRef>>();
    }

    [[nodiscard]] virtual vespalib::MemoryUsage memory_usage() const noexcept = 0;
};

template <typename BucketInfoType>
std::ostream& operator<<(std::ostream& o, const BucketDatabase::EntryBase<BucketInfoType>& e);

} // storage