aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operations/operation.h
blob: 2ed4448b9e38ed2c204001d848473e1b911d4ccf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "cancel_scope.h"
#include <vespa/vdslib/state/nodetype.h>
#include <vespa/storage/distributor/distributormessagesender.h>
#include <vespa/vespalib/util/time.h>

namespace storage {

namespace api {
    class StorageMessage;
    class StorageReply;
}

class StorageComponent;

namespace distributor {

class CancelScope;
class DistributorStripeOperationContext;
class PendingMessageTracker;
class OperationSequencer;

class Operation
{
public:
    using SP = std::shared_ptr<Operation>;

    Operation();

    virtual ~Operation();

    /**
       Tell the callback that storage is shutting down. Reply to any pending
       stuff.
    */
    virtual void onClose(DistributorStripeMessageSender&) = 0;

    /**
       When a reply has been received, the storagelink will call receive()
       on the owner of the message that was replied to.
    */
    virtual void receive(DistributorStripeMessageSender& sender,
                         const std::shared_ptr<api::StorageReply> & msg)
    {
        onReceive(sender, msg);
    }

    [[nodiscard]] virtual const char* getName() const noexcept = 0;

    [[nodiscard]] virtual std::string getStatus() const;

    [[nodiscard]] virtual std::string toString() const {
        return getName();
    }

    /**
       Starts the callback, sending any messages etc. Sets _startTime to current time
    */
    virtual void start(DistributorStripeMessageSender& sender, vespalib::system_time startTime);
    void start(DistributorStripeMessageSender& sender);

    /**
     * Explicitly cancel the operation. Cancelled operations may or may not (depending on
     * the operation implementation) be immediately aborted, but they should either way
     * never insert any bucket information _for cancelled nodes_ into the bucket DB after
     * cancel() has been called.
     */
    void cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope);

    [[nodiscard]] const CancelScope& cancel_scope() const noexcept { return _cancel_scope; }

    /**
     * Whether cancel() has been invoked at least once on this instance. This does not
     * distinguish between cancellations caused by ownership transfers and those caused
     * by nodes becoming unavailable; Operation implementations that care about this need
     * to inspect cancel_scope() themselves.
     */
    [[nodiscard]] bool is_cancelled() const noexcept { return _cancel_scope.is_cancelled(); }

    /**
     * Returns true if we are blocked to start this operation given
     * the pending messages.
     */
    [[nodiscard]] virtual bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const {
        return false;
    }

    /*
     * Called by blocking operation starter if operation was blocked
     */
    virtual void on_blocked();

    /*
     * Called by throttling operation starter if operation was throttled
     */
    virtual void on_throttled();

    /**
        Transfers message settings such as priority, timeout, etc. from one message to another.
    */
    static void copyMessageSettings(const api::StorageCommand& source,
                                    api::StorageCommand& target);

private:
    /**
       Implementation of start for the callback
     */
    virtual void onStart(DistributorStripeMessageSender& sender) = 0;

    virtual void onReceive(DistributorStripeMessageSender& sender,
                           const std::shared_ptr<api::StorageReply> & msg) = 0;

protected:
    virtual void on_cancel(DistributorStripeMessageSender& sender, const CancelScope& cancel_scope) {
        (void)sender;
        (void)cancel_scope;
    }

    static constexpr vespalib::duration MAX_TIMEOUT = 3600s;

    vespalib::system_time _startTime;
    CancelScope           _cancel_scope;
};

}

}