1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "joinbucketssession.h"
#include "bucketdeltapair.h"
#include "i_bucket_create_notifier.h"
#include <cassert>
namespace proton::bucketdb {
JoinBucketsSession::JoinBucketsSession(BucketDBOwner &bucketDB,
IBucketCreateNotifier &bucketCreateNotifier,
const BucketId &source1,
const BucketId &source2,
const BucketId &target)
: BucketSessionBase(bucketDB, bucketCreateNotifier),
_source1Delta(),
_source2Delta(),
_wantTargetActive(false),
_adjustSource1ActiveLids(false),
_adjustSource2ActiveLids(false),
_adjustTargetActiveLids(false),
_source1(source1),
_source2(source2),
_target(target)
{
}
void
JoinBucketsSession::setup()
{
if (_target.valid()) {
_bucketDB->createBucket(_target);
}
BucketState *source1State = nullptr;
BucketState *source2State = nullptr;
bool source1Active = extractInfo(_source1, source1State);
bool source2Active = extractInfo(_source2, source2State);
_wantTargetActive = source1Active || source2Active;
_adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive, false);
_adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive, false);
BucketState *targetState = nullptr;
(void) extractInfo(_target, targetState);
_adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive, true);
}
bool
JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const
{
return _adjustTargetActiveLids ||
(_adjustSource1ActiveLids && movedSource1Docs) ||
(_adjustSource2ActiveLids && movedSource2Docs);
}
void
JoinBucketsSession::applyDeltas(const BucketDeltaPair &deltas)
{
_source1Delta += deltas._delta1;
_source2Delta += deltas._delta2;
}
bool
JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket, BucketState *dst)
{
if (!srcBucket.valid()) {
assert(delta.empty());
return false;
}
BucketState *src = _bucketDB->getBucketStatePtr(srcBucket);
if (delta.empty()) {
return src && src->empty();
}
delta.applyDelta(src, dst);
return src->empty();
}
void
JoinBucketsSession::finish()
{
if (!_target.valid()) {
assert(_source1Delta.empty());
assert(_source2Delta.empty());
return;
}
BucketState *targetState = _bucketDB->getBucketStatePtr(_target);
bool source1Empty = applyDelta(_source1Delta, _source1, targetState);
bool source2Empty = applyDelta(_source2Delta, _source2, targetState);
if (source1Empty) {
_bucketDB->deleteEmptyBucket(_source1);
}
if (source2Empty) {
_bucketDB->deleteEmptyBucket(_source2);
}
if (!_source1Delta.empty() || !_source2Delta.empty()) {
_bucketCreateNotifier.notifyCreateBucket(_bucketDB, _target);
}
}
}
|