aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/bucketdb/joinbucketssession.cpp
blob: fd6daea56a6de7dfcf145bd848b15b038a4127ce (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "joinbucketssession.h"
#include "bucketdeltapair.h"
#include "i_bucket_create_notifier.h"
#include <cassert>

namespace proton::bucketdb {

JoinBucketsSession::JoinBucketsSession(BucketDBOwner &bucketDB,
                                       IBucketCreateNotifier &bucketCreateNotifier,
                                       const BucketId &source1,
                                       const BucketId &source2,
                                       const BucketId &target)
    : BucketSessionBase(bucketDB, bucketCreateNotifier),
      _source1Delta(),
      _source2Delta(),
      _wantTargetActive(false),
      _adjustSource1ActiveLids(false),
      _adjustSource2ActiveLids(false),
      _adjustTargetActiveLids(false),
      _source1(source1),
      _source2(source2),
      _target(target)
{
}


void
JoinBucketsSession::setup()
{
    if (_target.valid()) {
        _bucketDB->createBucket(_target);
    }
    BucketState *source1State = nullptr;
    BucketState *source2State = nullptr;
    bool source1Active = extractInfo(_source1, source1State);
    bool source2Active = extractInfo(_source2, source2State);
    _wantTargetActive = source1Active || source2Active;

    _adjustSource1ActiveLids = calcFixupNeed(source1State, _wantTargetActive, false);
    _adjustSource2ActiveLids = calcFixupNeed(source2State, _wantTargetActive, false);
    BucketState *targetState = nullptr;
    (void) extractInfo(_target, targetState);
    _adjustTargetActiveLids = calcFixupNeed(targetState, _wantTargetActive, true);
}


bool
JoinBucketsSession::mustFixupTargetActiveLids(bool movedSource1Docs, bool movedSource2Docs) const
{
    return _adjustTargetActiveLids ||
        (_adjustSource1ActiveLids && movedSource1Docs) ||
        (_adjustSource2ActiveLids && movedSource2Docs);
}


void
JoinBucketsSession::applyDeltas(const BucketDeltaPair &deltas)
{
    _source1Delta += deltas._delta1;
    _source2Delta += deltas._delta2;
}


bool
JoinBucketsSession::applyDelta(const BucketState &delta, BucketId &srcBucket, BucketState *dst)
{
    if (!srcBucket.valid()) {
        assert(delta.empty());
        return false;
    }
    BucketState *src = _bucketDB->getBucketStatePtr(srcBucket);
    if (delta.empty()) {
        return src && src->empty();
    }
    delta.applyDelta(src, dst);
    return src->empty();
}


void
JoinBucketsSession::finish()
{
    if (!_target.valid()) {
        assert(_source1Delta.empty());
        assert(_source2Delta.empty());
        return;
    }
    BucketState *targetState = _bucketDB->getBucketStatePtr(_target);
    bool source1Empty = applyDelta(_source1Delta, _source1, targetState);
    bool source2Empty = applyDelta(_source2Delta, _source2, targetState);
    if (source1Empty) {
        _bucketDB->deleteEmptyBucket(_source1);
    }
    if (source2Empty) {
        _bucketDB->deleteEmptyBucket(_source2);
    }
    if (!_source1Delta.empty() || !_source2Delta.empty()) {
        _bucketCreateNotifier.notifyCreateBucket(_bucketDB, _target);
    }
}

}