aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/externaloperationhandler.h
blob: 50a2019a2ae8741a1c5278b1a571264f44ef4de4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/storage/distributor/distributor_stripe_component.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <atomic>
#include <chrono>
#include <mutex>

namespace documentapi { class TestAndSetCondition; }
namespace storage::lib { class ClusterState; }

namespace storage::distributor {

class DistributorMetricSet;
class DirectDispatchSender;
class MaintenanceOperationGenerator;
class OperationSequencer;
class OperationOwner;
class PersistenceOperationMetricSet;
class SequencingHandle;
class TopLevelDistributor;
class UuidGenerator;

class ExternalOperationHandler : public api::MessageHandler
{
public:
    using Clock = std::chrono::system_clock;
    using TimePoint = std::chrono::time_point<Clock>;

    bool onGet(const std::shared_ptr<api::GetCommand>&) override;
    bool onPut(const std::shared_ptr<api::PutCommand>&) override;
    bool onUpdate(const std::shared_ptr<api::UpdateCommand>&) override;
    bool onRemove(const std::shared_ptr<api::RemoveCommand>&) override;
    bool onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>&) override;
    bool onStatBucket(const std::shared_ptr<api::StatBucketCommand>&) override;
    bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override;
    bool onGetBucketList(const std::shared_ptr<api::GetBucketListCommand>&) override;

    ExternalOperationHandler(DistributorNodeContext& node_ctx,
                             DistributorStripeOperationContext& op_ctx,
                             DistributorMetricSet& metrics,
                             ChainedMessageSender& msg_sender,
                             OperationSequencer& operation_sequencer,
                             NonTrackingMessageSender& non_tracking_sender,
                             DocumentSelectionParser& parser,
                             const MaintenanceOperationGenerator& gen,
                             OperationOwner& operation_owner);

    ~ExternalOperationHandler() override;

    bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg,
                       Operation::SP& operation);

    void rejectFeedBeforeTimeReached(TimePoint timePoint) noexcept {
        _rejectFeedBeforeTimeReached = timePoint;
    }

    // Returns true iff message was handled and should not be processed further by the caller.
    bool try_handle_message_outside_main_thread(const std::shared_ptr<api::StorageMessage>& msg);

    void close_pending();

    void set_concurrent_gets_enabled(bool enabled) noexcept {
        _concurrent_gets_enabled.store(enabled, std::memory_order_relaxed);
    }

    bool concurrent_gets_enabled() const noexcept {
        return _concurrent_gets_enabled.load(std::memory_order_relaxed);
    }

    void set_use_weak_internal_read_consistency_for_gets(bool use_weak) noexcept {
        _use_weak_internal_read_consistency_for_gets.store(use_weak, std::memory_order_relaxed);
    }

    bool use_weak_internal_read_consistency_for_gets() const noexcept {
        return _use_weak_internal_read_consistency_for_gets.load(std::memory_order_relaxed);
    }

    // Exposed for testing
    OperationSequencer& operation_sequencer() noexcept {
        return _operation_sequencer;
    }

private:
    DistributorNodeContext& _node_ctx;
    DistributorStripeOperationContext& _op_ctx;
    DistributorMetricSet& _metrics;
    ChainedMessageSender& _msg_sender;
    OperationSequencer& _operation_sequencer;
    DocumentSelectionParser& _parser;
    std::unique_ptr<DirectDispatchSender> _direct_dispatch_sender;
    const MaintenanceOperationGenerator& _operationGenerator;
    Operation::SP _op;
    TimePoint _rejectFeedBeforeTimeReached;
    OperationOwner& _distributor_operation_owner;
    mutable std::mutex _non_main_thread_ops_mutex;
    OperationOwner _non_main_thread_ops_owner;
    std::unique_ptr<UuidGenerator> _uuid_generator;
    std::atomic<bool> _concurrent_gets_enabled;
    std::atomic<bool> _use_weak_internal_read_consistency_for_gets;

    template <typename Func>
    void bounce_or_invoke_read_only_op(api::StorageCommand& cmd,
                                       const document::Bucket& bucket,
                                       PersistenceOperationMetricSet& metrics,
                                       Func f);

    void bounce_with_wrong_distribution(api::StorageCommand& cmd, const lib::ClusterState& cluster_state);
    // Bounce with the current _default_ space cluster state.
    void bounce_with_wrong_distribution(api::StorageCommand& cmd);
    void bounce_with_busy_during_state_transition(api::StorageCommand& cmd,
                                                  const lib::ClusterState& current_state,
                                                  const lib::ClusterState& pending_state);
    void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result);
    void bounce_with_feed_blocked(api::StorageCommand& cmd);
    std::shared_ptr<Operation> try_generate_get_operation(const std::shared_ptr<api::GetCommand>&);

    bool checkSafeTimeReached(api::StorageCommand& cmd);
    api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime);
    bool checkTimestampMutationPreconditions(
            api::StorageCommand& cmd,
            const document::BucketId &bucketId,
            PersistenceOperationMetricSet& persistenceMetrics);
    std::shared_ptr<api::StorageMessage> makeConcurrentMutationRejectionReply(
            api::StorageCommand& cmd,
            const document::DocumentId& docId,
            PersistenceOperationMetricSet& persistenceMetrics) const;
    bool allowMutation(const SequencingHandle& handle) const;

    api::InternalReadConsistency desired_get_read_consistency() const noexcept;

    DistributorMetricSet& getMetrics() { return _metrics; }
};

}