aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
blob: b806ce80a3a924ffdf729a945d6b5a006458c9d1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include <vespa/persistence/spi/bucket.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/storage/persistence/filestorage/filestormetrics.h>
#include <vespa/vespalib/util/retain_guard.h>
#include <future>
#include <memory>
#include <optional>
#include <vector>

namespace document { class DocumentId; }
namespace storage::api { class StorageReply; }
namespace storage::spi { class Result; }

namespace storage {

struct MessageSender;
class MessageTracker;
class MergeBucketInfoSyncer;

/*
 * State of all bucket diff entry spi operation (putAsync or removeAsync)
 * for one or more ApplyBucketDiffCommand / ApplyBucketDiffReply.
 */
class ApplyBucketDiffState {
    class Deleter;
    const MergeBucketInfoSyncer&            _merge_bucket_info_syncer;
    MergeHandlerMetrics&                    _merge_handler_metrics;
    framework::MilliSecTimer                _start_time;
    spi::Bucket                             _bucket;
    vespalib::string                        _fail_message;
    std::atomic_flag                        _failed_flag;
    bool                                    _stale_bucket_info;
    std::optional<std::promise<vespalib::string>> _promise;
    std::unique_ptr<MessageTracker>         _tracker;
    std::shared_ptr<api::StorageReply>      _delayed_reply;
    MessageSender*                          _sender;
    FileStorThreadMetrics::Op*              _op_metrics;
    std::optional<framework::MilliSecTimer> _op_start_time;
    vespalib::RetainGuard                   _retain_guard;
    std::optional<framework::MilliSecTimer> _merge_start_time;

    ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
public:
    static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
    ~ApplyBucketDiffState();
    void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
    void wait();
    void check();
    void mark_stale_bucket_info();
    void sync_bucket_info();
    std::future<vespalib::string> get_future();
    void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
    void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, FileStorThreadMetrics::Op* op_metrics, const framework::MilliSecTimer& op_start_time, std::shared_ptr<api::StorageReply>&& delayed_reply);
    void set_tracker(std::unique_ptr<MessageTracker>&& tracker);
    void set_merge_start_time(const framework::MilliSecTimer& merge_start_time);
    const spi::Bucket& get_bucket() const noexcept { return _bucket; }
};

}