aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operation_sequencer.h
blob: dd38df9267ee8528ca7cca2dda73071d2308e6aa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include <vespa/document/base/globalid.h>
#include <vespa/document/bucket/bucket.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <utility>
#include <variant>

namespace document {
class DocumentId;
}

namespace storage::distributor {

class OperationSequencer;

/**
 * Represents a move-only handle which effectively holds a guard for
 * allowing sequenced operations towards a particular document ID or
 * bucket ID.
 *
 * Destroying a handle will implicitly release the guard, allowing
 * new sequenced operations towards the ID.
 */
class SequencingHandle {
public:
    struct BlockedByPendingOperation {};
    struct BlockedByLockedBucket {
        vespalib::string lock_token;

        BlockedByLockedBucket() = default;
        explicit BlockedByLockedBucket(vespalib::stringref token) : lock_token(token) {}
    };
private:
    OperationSequencer* _sequencer;
    using HandleVariant = std::variant<
            document::Bucket,
            document::GlobalId,
            BlockedByPendingOperation,
            BlockedByLockedBucket
    >;
    HandleVariant _handle;
public:
    SequencingHandle() noexcept
        : _sequencer(nullptr),
          _handle()
    {}

    explicit SequencingHandle(BlockedByPendingOperation blocked_by)
        : _sequencer(nullptr),
          _handle(blocked_by)
    {}

    explicit SequencingHandle(BlockedByLockedBucket blocked_by)
            : _sequencer(nullptr),
              _handle(std::move(blocked_by))
    {}

    SequencingHandle(OperationSequencer& sequencer, const document::GlobalId& gid) noexcept
        : _sequencer(&sequencer),
          _handle(gid)
    {
    }

    SequencingHandle(OperationSequencer& sequencer, const document::Bucket& bucket)
        : _sequencer(&sequencer),
          _handle(bucket)
    {
    }

    ~SequencingHandle() {
        release();
    }

    SequencingHandle(const SequencingHandle&) = delete;
    SequencingHandle& operator=(const SequencingHandle&) = delete;

    SequencingHandle(SequencingHandle&& rhs) noexcept
        : _sequencer(rhs._sequencer),
          _handle(std::move(rhs._handle))
    {
        rhs._sequencer = nullptr;
    }

    SequencingHandle& operator=(SequencingHandle&& rhs) noexcept {
        if (&rhs != this) {
            std::swap(_sequencer, rhs._sequencer);
            std::swap(_handle, rhs._handle);
        }
        return *this;
    }

    [[nodiscard]] bool valid() const noexcept { return (_sequencer != nullptr); }
    [[nodiscard]] bool is_blocked() const noexcept {
        return (std::holds_alternative<BlockedByPendingOperation>(_handle) ||
                std::holds_alternative<BlockedByLockedBucket>(_handle));
    }
    [[nodiscard]] bool is_blocked_by_pending_operation() const noexcept {
        return std::holds_alternative<BlockedByPendingOperation>(_handle);
    }
    [[nodiscard]] bool is_blocked_by_bucket() const noexcept {
        return std::holds_alternative<BlockedByLockedBucket>(_handle);
    }
    [[nodiscard]] bool is_bucket_blocked_with_token(vespalib::stringref token) const noexcept {
        return (std::holds_alternative<BlockedByLockedBucket>(_handle) &&
                (std::get<BlockedByLockedBucket>(_handle).lock_token == token));
    }
    [[nodiscard]] bool has_bucket() const noexcept {
        return std::holds_alternative<document::Bucket>(_handle);
    }
    const document::Bucket& bucket() const noexcept {
        return std::get<document::Bucket>(_handle); // FIXME can actually throw
    }
    [[nodiscard]] bool has_gid() const noexcept {
        return std::holds_alternative<document::GlobalId>(_handle);
    }
    const document::GlobalId& gid() const noexcept {
        return std::get<document::GlobalId>(_handle); // FIXME can actually throw
    }
    void release();
};

/**
 * An operation sequencer allows for efficiently checking if an operation is
 * already pending for a given document ID (with very high probability; false
 * positives are possible, but false negatives are not).
 *
 * When a SequencingHandle is acquired for a given ID, no further valid handles
 * can be acquired for that ID until the original handle has been destroyed.
 */
class OperationSequencer {
    using GidSet      = vespalib::hash_set<document::GlobalId, document::GlobalId::hash>;
    using BucketLocks = vespalib::hash_map<document::Bucket, vespalib::string, document::Bucket::hash>;

    GidSet      _active_gids;
    BucketLocks _active_buckets;

    friend class SequencingHandle;
public:
    OperationSequencer();
    ~OperationSequencer();

    // Returns a handle with valid() == true iff no concurrent operations are
    // already active for `id` _and_ the there are no active bucket locks for
    // any bucket that may contain `id`.
    SequencingHandle try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id);

    SequencingHandle try_acquire(const document::Bucket& bucket, const vespalib::string& token);

    bool is_blocked(const document::Bucket&) const noexcept;
private:
    void release(const SequencingHandle& handle);
};

} // storage::distributor