aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operation_sequencer.cpp
blob: b64fcbac40831697e408e2a788a25aa9a7bc1704 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "operation_sequencer.h"
#include <vespa/document/base/documentid.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cassert>

namespace storage::distributor {

void SequencingHandle::release() {
    if (valid()) {
        _sequencer->release(*this);
        _sequencer = nullptr;
    }
}

OperationSequencer::OperationSequencer()  = default;
OperationSequencer::~OperationSequencer() = default;

SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id) {
    const document::GlobalId gid(id.getGlobalId());
    if (!_active_buckets.empty()) {
        auto doc_bucket_id = gid.convertToBucketId();
        // TODO avoid O(n), but sub bucket resolving is tricky and we expect the number
        // of locked buckets to be in the range of 0 to <very small number>.
        for (const auto& entry : _active_buckets) {
            if ((entry.first.getBucketSpace() == bucket_space)
                && entry.first.getBucketId().contains(doc_bucket_id))
            {
                return SequencingHandle(SequencingHandle::BlockedByLockedBucket(entry.second));
            }
        }
    }
    const auto inserted = _active_gids.insert(gid);
    if (inserted.second) {
        return SequencingHandle(*this, gid);
    } else {
        return SequencingHandle(SequencingHandle::BlockedByPendingOperation());
    }
}

SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket,
                                                 const vespalib::string& token) {
    const auto inserted = _active_buckets.insert(std::make_pair(bucket, token));
    if (inserted.second) {
        return SequencingHandle(*this, bucket);
    } else {
        return SequencingHandle(SequencingHandle::BlockedByLockedBucket(inserted.first->second));
    }
}

bool OperationSequencer::is_blocked(const document::Bucket& bucket) const noexcept {
    return (_active_buckets.find(bucket) != _active_buckets.end());
}

void OperationSequencer::release(const SequencingHandle& handle) {
    assert(handle.valid());
    if (handle.has_gid()) {
        _active_gids.erase(handle.gid());
    } else {
        assert(handle.has_bucket());
        _active_buckets.erase(handle.bucket());
    }
}

} // storage::distributor