aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operationowner.h
blob: 828d776f1a6c163ffa78fc0e7ed504eda94547a4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "sentmessagemap.h"
#include "distributormessagesender.h"
#include "operationstarter.h"

namespace storage::framework { struct Clock; }

namespace storage::distributor {

class CancelScope;
class Operation;

/**
   Storage link that keeps track of running operations.
 */
class OperationOwner : public OperationStarter {
public:

    class Sender : public DistributorStripeMessageSender {
    public:
        Sender(OperationOwner& owner,
               DistributorStripeMessageSender& sender,
               const std::shared_ptr<Operation>& cb)
            : _owner(owner),
              _sender(sender),
              _cb(cb) 
         {}

        void sendCommand(const std::shared_ptr<api::StorageCommand> &) override;
        void sendReply(const std::shared_ptr<api::StorageReply> & msg) override;

        OperationOwner& getOwner() {
            return _owner;
        }

        int getDistributorIndex() const override {
            return _sender.getDistributorIndex();
        }
        
        const ClusterContext & cluster_context() const override {
            return _sender.cluster_context();
        }

        PendingMessageTracker& getPendingMessageTracker() override {
            return _sender.getPendingMessageTracker();
        }

        const PendingMessageTracker& getPendingMessageTracker() const override {
            return _sender.getPendingMessageTracker();
        }

        const OperationSequencer& operation_sequencer() const noexcept override {
            return _sender.operation_sequencer();
        }

        OperationSequencer& operation_sequencer() noexcept override {
            return _sender.operation_sequencer();
        }

    private:
        OperationOwner& _owner;
        DistributorStripeMessageSender& _sender;
        std::shared_ptr<Operation> _cb;
    };

    OperationOwner(DistributorStripeMessageSender& sender,
                   const framework::Clock& clock)
    : _sender(sender),
      _clock(clock) {
    }
    ~OperationOwner() override;

    /**
       Handles replies from storage, mapping from a message id to an operation.

       If the operation was found, returns it in result.first. If the operation was
       done after the reply was processed (no more pending commands), returns true

     */
    bool handleReply(const std::shared_ptr<api::StorageReply>& reply);

    SentMessageMap& getSentMessageMap() {
        return _sentMessageMap;
    };

    bool start(const std::shared_ptr<Operation>& operation, Priority priority) override;

    /**
     * If the given message exists, remove it from the internal operation mapping.
     *
     * Returns the operation the message belonged to, if any.
     */
    [[nodiscard]] std::shared_ptr<Operation> erase(api::StorageMessage::Id msgId);

    /**
     * Returns a strong ref to the pending operation with the given msg_id if it exists.
     * Otherwise returns an empty shared_ptr.
     */
    [[nodiscard]] std::shared_ptr<Operation> find_by_id(api::StorageMessage::Id msg_id) const noexcept;

    [[nodiscard]] bool try_cancel_by_id(api::StorageMessage::Id msg_id, const CancelScope& cancel_scope);

    [[nodiscard]] DistributorStripeMessageSender& sender() noexcept { return _sender; }

    void onClose();
    uint32_t size() const { return _sentMessageMap.size(); }
    std::string toString() const;

private:
    SentMessageMap _sentMessageMap;
    DistributorStripeMessageSender& _sender;
    const framework::Clock& _clock;
};

}