aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
blob: ff21e3d1594c9ffda660852a12a8befc0a65a60d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "idealstateoperation.h"
#include "mergelimiter.h"
#include "mergemetadata.h"
#include "removebucketoperation.h"
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/storageapi/message/bucket.h>

namespace storage::lib { class Distribution; }

namespace storage::distributor {

class MergeBucketMetricSet;

class MergeOperation : public IdealStateOperation
{
protected:
    bool sourceOnlyCopyChangedDuringMerge(const BucketDatabase::Entry&) const;

    vespalib::steady_time _sentMessageTime;
    std::vector<api::MergeBucketCommand::Node> _mnodes;
    std::unique_ptr<RemoveBucketOperation> _removeOperation;
    BucketInfo _infoBefore;
    MergeLimiter _limiter;

public:

    MergeOperation(const BucketAndNodes& nodes, uint16_t maxNodes = 16)
        : IdealStateOperation(nodes),
          _sentMessageTime(),
          _limiter(maxNodes)
    {}

    ~MergeOperation() override;

    void onStart(DistributorStripeMessageSender& sender) override;
    void onReceive(DistributorStripeMessageSender& sender, const api::StorageReply::SP&) override;
    const char* getName() const noexcept override { return "merge"; };
    std::string getStatus() const override;
    Type getType() const noexcept override { return MERGE_BUCKET; }

    /** Generates ordered list of nodes that should be included in the merge */
    static void generateSortedNodeList(
            const lib::Distribution&, const lib::ClusterState&,
            const document::BucketId&, MergeLimiter&,
            std::vector<MergeMetaData>&);

    bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override;
    bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override;
private:
    static void addIdealNodes(
            const std::vector<uint16_t>& idealNodes,
            const std::vector<MergeMetaData>& nodes,
            std::vector<MergeMetaData>& result);

    static void addCopiesNotAlreadyAdded(
            uint16_t redundancy,
            const std::vector<MergeMetaData>& nodes,
            std::vector<MergeMetaData>& result);

    void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState,
                               DistributorStripeMessageSender& sender);
    bool is_global_bucket_merge() const noexcept;
    bool all_involved_nodes_support_unordered_merge_chaining() const noexcept;
    MergeBucketMetricSet* get_merge_metrics();
};

}