aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/mergehandler.h
blob: bcea51f50e1c3f660fbfa79fb0bb43bf548a0d36 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @class storage::MergeHandler
 *
 * @brief Handles a merge of a single bucket.
 *
 * A merge is a complex operation in many stages covering multiple nodes. It
 * needs to track some state of ongoing merges, and it also needs quite a bit
 * of logic.
 *
 * This class implements tracks the state and implements the logic, such that
 * the rest of the provider layer does not need to concern itself with merges.
 */
#pragma once

#include "merge_bucket_info_syncer.h"
#include <vespa/persistence/spi/bucket.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/util/monitored_refcount.h>
#include <vespa/storageframework/generic/clock/time.h>
#include <atomic>

namespace vespalib { class ISequencedTaskExecutor; }
namespace document { class Document; }
namespace storage {

namespace framework { struct Clock; }

namespace spi {
    struct PersistenceProvider;
    class Context;
    class DocEntry;
}
class PersistenceUtil;
class ApplyBucketDiffState;
class MergeStatus;
class MessageTracker;

class MergeHandler : public MergeBucketInfoSyncer {
private:
    using MessageTrackerUP = std::unique_ptr<MessageTracker>;
    using Timestamp = framework::MicroSecTime;
public:
    enum StateFlag {
        IN_USE                     = 0x01,
        DELETED                    = 0x02,
        DELETED_IN_PLACE           = 0x04
    };

    MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
                 const ClusterContext& cluster_context, const framework::Clock & clock,
                 vespalib::ISequencedTaskExecutor& executor,
                 uint32_t maxChunkSize = 4190208,
                 uint32_t commonMergeChainOptimalizationMinimumSize = 64);

    ~MergeHandler() override;

    bool buildBucketInfoList(
            const spi::Bucket& bucket,
            Timestamp maxTimestamp,
            uint8_t myNodeIndex,
            std::vector<api::GetBucketDiffCommand::Entry>& output,
            spi::Context& context) const;
    void fetchLocalData(const spi::Bucket& bucket,
                        std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
                        uint8_t nodeIndex,
                        spi::Context& context) const;
    void applyDiffLocally(const spi::Bucket& bucket,
                          std::vector<api::ApplyBucketDiffCommand::Entry>& diff,
                          uint8_t nodeIndex,
                          spi::Context& context,
                          std::shared_ptr<ApplyBucketDiffState> async_results) const;
    void sync_bucket_info(const spi::Bucket& bucket) const override;
    void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override;

    MessageTrackerUP handleMergeBucket(api::MergeBucketCommand&, MessageTrackerUP) const;
    MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
    void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
    MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
    void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
    void drain_async_writes();

    // Thread safe, as it's set during live reconfig from the main filestor manager.
    void set_throttle_merge_feed_ops(bool throttle) noexcept {
        _throttle_merge_feed_ops.store(throttle, std::memory_order_relaxed);
    }

    [[nodiscard]] bool throttle_merge_feed_ops() const noexcept {
        return _throttle_merge_feed_ops.load(std::memory_order_relaxed);
    }

private:
    using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>;
    const framework::Clock   &_clock;
    const ClusterContext     &_cluster_context;
    PersistenceUtil          &_env;
    spi::PersistenceProvider &_spi;
    std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
    const uint32_t            _maxChunkSize;
    const uint32_t            _commonMergeChainOptimalizationMinimumSize;
    vespalib::ISequencedTaskExecutor& _executor;
    std::atomic<bool>         _throttle_merge_feed_ops;

    MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
    /** Returns a reply if merge is complete */
    api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
                                             MergeStatus& status,
                                             MessageSender& sender,
                                             spi::Context& context,
                                             std::shared_ptr<ApplyBucketDiffState>& async_results) const;

    /**
     * Invoke either put, remove or unrevertable remove on the SPI
     * depending on the flags in the diff entry.
     */
    void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results,
                        const spi::Bucket&,
                        const api::ApplyBucketDiffCommand::Entry&,
                        const document::DocumentTypeRepo& repo) const;

    /**
     * Fill entries-vector with metadata for bucket up to maxTimestamp,
     * sorted ascendingly on entry timestamp.
     * Throws std::runtime_error upon iteration failure.
     */
    void populateMetaData(const spi::Bucket&,
                          Timestamp maxTimestamp,
                          DocEntryList & entries,
                          spi::Context& context) const;

    std::unique_ptr<document::Document>
    deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const;
};

} // storage