blob: e9374b9d54be565c6bdec359639bddcd13396e60 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "operation_sequencer.h"
#include <vespa/document/base/documentid.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cassert>
namespace storage::distributor {
void SequencingHandle::release() {
if (valid()) {
_sequencer->release(*this);
_sequencer = nullptr;
}
}
OperationSequencer::OperationSequencer() = default;
OperationSequencer::~OperationSequencer() = default;
SequencingHandle OperationSequencer::try_acquire(document::BucketSpace bucket_space, const document::DocumentId& id) {
const document::GlobalId gid(id.getGlobalId());
if (!_active_buckets.empty()) {
auto doc_bucket_id = gid.convertToBucketId();
// TODO avoid O(n), but sub bucket resolving is tricky and we expect the number
// of locked buckets to be in the range of 0 to <very small number>.
for (const auto& entry : _active_buckets) {
if ((entry.first.getBucketSpace() == bucket_space)
&& entry.first.getBucketId().contains(doc_bucket_id))
{
return SequencingHandle(SequencingHandle::BlockedByLockedBucket(entry.second));
}
}
}
const auto inserted = _active_gids.insert(gid);
if (inserted.second) {
return SequencingHandle(*this, gid);
} else {
return SequencingHandle(SequencingHandle::BlockedByPendingOperation());
}
}
SequencingHandle OperationSequencer::try_acquire(const document::Bucket& bucket,
const vespalib::string& token) {
const auto inserted = _active_buckets.insert(std::make_pair(bucket, token));
if (inserted.second) {
return SequencingHandle(*this, bucket);
} else {
return SequencingHandle(SequencingHandle::BlockedByLockedBucket(inserted.first->second));
}
}
bool OperationSequencer::is_blocked(const document::Bucket& bucket) const noexcept {
return (_active_buckets.find(bucket) != _active_buckets.end());
}
void OperationSequencer::release(const SequencingHandle& handle) {
assert(handle.valid());
if (handle.has_gid()) {
_active_gids.erase(handle.gid());
} else {
assert(handle.has_bucket());
_active_buckets.erase(handle.bucket());
}
}
} // storage::distributor
|