aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/vespa/searchlib/transactionlog/session.cpp
blob: ec77a5f150eaa5fe1a3bf5b44e74222ed7c59d52 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "session.h"
#include "domain.h"
#include "domainpart.h"
#include <vespa/fastlib/io/bufferedfile.h>
#include <cassert>
#include <cinttypes>

#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.session");


namespace search::transactionlog {

vespalib::Executor::Task::UP
Session::createTask(Session::SP session)
{
    return std::make_unique<VisitTask>(std::move(session));
}

Session::VisitTask::VisitTask(Session::SP session)
    : _session(std::move(session))
{
    _session->startVisit();
}
Session::VisitTask::~VisitTask() = default;

void
Session::VisitTask::run()
{
    _session->visitOnly();
}

bool
Session::visit(FastOS_FileInterface & file, DomainPart & dp) {
    Packet packet(size_t(-1));
    bool more = dp.visit(file, _range, packet);

    if ( ! packet.getHandle().empty()) {
        send(packet);
    }
    return more;
}

void
Session::visit()
{
    LOG(debug, "[%d] : Visiting %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to());
    for (DomainPart::SP dpSafe = _domain->findPart(_range.from()); dpSafe.get() && (_range.from() < _range.to()) && (dpSafe.get()->range().from() <= _range.to()); dpSafe = _domain->findPart(_range.from())) {
        // Must use findPart and iterate until no candidate parts found.
        DomainPart * dp(dpSafe.get());
        LOG(debug, "[%d] : Visiting the interval %" PRIu64 " - %" PRIu64 " in domain part [%" PRIu64 ", %" PRIu64 "]", _id, _range.from(), _range.to(), dp->range().from(), dp->range().to());
        Fast_BufferedFile file;
        file.EnableDirectIO();
        for(bool more(true); ok() && more && (_range.from() < _range.to()); ) {
            more = visit(file, *dp);
        }
        // Nothing more in this DomainPart, force switch to next one.
        if (_range.from() < dp->range().to()) {
            _range.from(std::min(dp->range().to(), _range.to()));
        }
    }

    LOG(debug, "[%d] : Done visiting, starting subscribe %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to());
}

void
Session::startVisit() {
    assert(!_visitRunning);
    _visitRunning = true;
}
void
Session::visitOnly()
{
    visit();
    sendDone();
    finalize();
    _visitRunning = false;
}

bool Session::finished() const {
    return _finished || ! _destination->connected();
}

void
Session::finalize()
{
    if (!ok()) {
        LOG(error, "[%d] : Error in %s(%" PRIu64 " - %" PRIu64 "), stopping since I have no idea on what to do.", _id, "visitor", _range.from(), _range.to());
    }
    LOG(debug, "[%d] : Stopped %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to());
    _finished = true;
}

Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
                 std::unique_ptr<Destination> destination) :
    _destination(std::move(destination)),
    _domain(d),
    _range(r),
    _id(sId),
    _visitRunning(false),
    _inSync(false),
    _finished(false),
    _startTime()
{
}

Session::~Session() = default;

bool
Session::send(const Packet & packet)
{
    return _destination->send(_id, _domain->name(), packet);
}

bool
Session::sendDone()
{
    bool retval = _destination->sendDone(_id, _domain->name());
    _inSync = true;
    return retval;
}

}