aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/bucketdbupdater.h
blob: ee07c46754f06ef3d5521253bed26e3b570426b6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "bucketlistmerger.h"
#include "messageguard.h"
#include "distributorcomponent.h"
#include "distributormessagesender.h"
#include "pendingclusterstate.h"
#include "distributor_bucket_space_component.h"
#include "outdated_nodes_map.h"
#include <vespa/document/bucket/bucketid.h>
#include <vespa/storageapi/messageapi/returncode.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/common/storagelink.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storageframework/generic/memory/memorymanagerinterface.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <set>
#include <deque>
#include <list>

namespace storage::distributor {

class Distributor;

class BucketDBUpdater : public framework::StatusReporter,
                        public api::MessageHandler
{
public:
    using OutdatedNodes = dbtransition::OutdatedNodes;
    using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
    BucketDBUpdater(Distributor& owner,
                    DistributorBucketSpaceRepo &bucketSpaceRepo,
                    DistributorBucketSpace& bucketSpace,
                    DistributorMessageSender& sender,
                    DistributorComponentRegister& compReg);
    ~BucketDBUpdater();

    void flush();
    BucketOwnership checkOwnershipInPendingState(const document::BucketId&) const;
    void recheckBucketInfo(uint32_t nodeIdx, const document::BucketId& bid);

    bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
    bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
    bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
    bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
    void resendDelayedMessages();
    void storageDistributionChanged(const lib::Distribution&);

    vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const framework::HttpUrlPath&) const;
    vespalib::string getReportContentType(const framework::HttpUrlPath&) const override;
    bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override;
    void print(std::ostream& out, bool verbose, const std::string& indent) const;
    DistributorComponent& getDistributorComponent() { return _bucketSpaceComponent; }

    /**
     * Returns whether the current PendingClusterState indicates that there has
     * been a transfer of bucket ownership amongst the distributors in the
     * cluster. This method only makes sense to call when _pendingClusterState
     * is active, such as from within a enableClusterState() call.
     */
    bool bucketOwnershipHasChanged() const {
        return ((_pendingClusterState.get() != nullptr)
                && _pendingClusterState->hasBucketOwnershipTransfer());
    }

private:
    DistributorBucketSpaceComponent _bucketSpaceComponent;
    class MergeReplyGuard {
    public:
        MergeReplyGuard(BucketDBUpdater& updater, const std::shared_ptr<api::MergeBucketReply>& reply)
            : _updater(updater), _reply(reply) {}

        ~MergeReplyGuard();

        // Used when we're flushing and simply want to drop the reply rather
        // than send it down
        void resetReply() { _reply.reset(); }
    private:
        BucketDBUpdater& _updater;
        std::shared_ptr<api::MergeBucketReply> _reply;
    };

    struct BucketRequest {
        BucketRequest()
            : targetNode(0), bucket(0), timestamp(0) {};

        BucketRequest(uint16_t t, uint64_t currentTime, const document::BucketId& b,
                      const std::shared_ptr<MergeReplyGuard>& guard)
            : targetNode(t),
              bucket(b),
              timestamp(currentTime),
              _mergeReplyGuard(guard) {};

        uint16_t targetNode;
        document::BucketId bucket;
        uint64_t timestamp;

        std::shared_ptr<MergeReplyGuard> _mergeReplyGuard;
    };

    struct EnqueuedBucketRecheck {
        uint16_t node;
        document::BucketId bucket;

        EnqueuedBucketRecheck() : node(0), bucket() {}

        EnqueuedBucketRecheck(uint16_t _node, const document::BucketId& _bucket)
          : node(_node),
            bucket(_bucket)
        {}

        bool operator<(const EnqueuedBucketRecheck& o) const {
            if (node != o.node) {
                return node < o.node;
            }
            return bucket < o.bucket;
        }
        bool operator==(const EnqueuedBucketRecheck& o) const {
            return node == o.node && bucket == o.bucket;
        }
    };

    bool hasPendingClusterState() const;
    bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
    bool bucketOwnedAccordingToPendingState(const document::BucketId& bucketId) const;
    bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
    void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
                                       const BucketRequest& req);
    bool isPendingClusterStateCompleted() const;
    void processCompletedPendingClusterState();
    void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
                                     const BucketRequest& req);
    void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
                                       uint16_t targetNode, BucketListMerger::BucketList& newList);
    void sendRequestBucketInfo(uint16_t node, const document::BucketId& bucket,
                               const std::shared_ptr<MergeReplyGuard>& mergeReply);
    void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
                              BucketListMerger::BucketList& existing) const;
    void ensureTransitionTimerStarted();
    void completeTransitionTimer();
    /**
     * Adds all buckets contained in the bucket database
     * that are either contained
     * in bucketId, or that bucketId is contained in, that have copies
     * on the given node.
     */
    void findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId,
                                      BucketListMerger::BucketList& existing);

    /**
       Updates the bucket database from the information generated by the given
       bucket list merger.
    */
    void updateDatabase(uint16_t node, BucketListMerger& merger);

    void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);

    void removeSuperfluousBuckets(const lib::Distribution& newDistribution, const lib::ClusterState& newState);

    void replyToPreviousPendingClusterStateIfAny();

    void enableCurrentClusterStateInDistributor();
    void addCurrentStateToClusterStateHistory();
    void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::BucketId&);
    void sendAllQueuedBucketRechecks();

    friend class BucketDBUpdater_Test;
    friend class MergeOperation_Test;

    class BucketListGenerator
    {
    public:
        BucketListGenerator(uint16_t node, BucketListMerger::BucketList& entries)
            : _node(node), _entries(entries) {};

        bool process(BucketDatabase::Entry&);

    private:
        uint16_t _node;
        BucketListMerger::BucketList& _entries;
    };

    /**
       Removes all copies of buckets that are on nodes that are down.
    */
    class NodeRemover : public BucketDatabase::MutableEntryProcessor
    {
    public:
        NodeRemover(const lib::ClusterState& oldState,
                    const lib::ClusterState& s,
                    const document::BucketIdFactory& factory,
                    uint16_t localIndex,
                    const lib::Distribution& distribution,
                    const char* upStates)
            : _oldState(oldState),
              _state(s),
              _factory(factory),
              _localIndex(localIndex),
              _distribution(distribution),
              _upStates(upStates) {}

        ~NodeRemover();
        bool process(BucketDatabase::Entry& e) override;
        void logRemove(const document::BucketId& bucketId, const char* msg) const;
        bool distributorOwnsBucket(const document::BucketId&) const;

        const std::vector<document::BucketId>& getBucketsToRemove() const {
            return _removedBuckets;
        }
    private:
        void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
        void removeEmptyBucket(const document::BucketId& bucketId);

        const lib::ClusterState _oldState;
        const lib::ClusterState _state;
        std::vector<document::BucketId> _removedBuckets;

        const document::BucketIdFactory& _factory;
        uint16_t _localIndex;
        const lib::Distribution& _distribution;
        const char* _upStates;
    };

    std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
    std::map<uint64_t, BucketRequest> _sentMessages;
    std::unique_ptr<PendingClusterState> _pendingClusterState;
    std::list<PendingClusterState::Summary> _history;
    DistributorMessageSender& _sender;
    std::set<EnqueuedBucketRecheck> _enqueuedRechecks;
    OutdatedNodesMap         _outdatedNodesMap;
    framework::MilliSecTimer _transitionTimer;
};

}