1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "session.h"
#include "domain.h"
#include "domainpart.h"
#include <vespa/fastlib/io/bufferedfile.h>
#include <cassert>
#include <cinttypes>
#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.session");
namespace search::transactionlog {
vespalib::Executor::Task::UP
Session::createTask(Session::SP session)
{
return std::make_unique<VisitTask>(std::move(session));
}
Session::VisitTask::VisitTask(Session::SP session)
: _session(std::move(session))
{
_session->startVisit();
}
Session::VisitTask::~VisitTask() = default;
void
Session::VisitTask::run()
{
_session->visitOnly();
}
bool
Session::visit(FastOS_FileInterface & file, DomainPart & dp) {
Packet packet(size_t(-1));
bool more = dp.visit(file, _range, packet);
if ( ! packet.getHandle().empty()) {
send(packet);
}
return more;
}
void
Session::visit()
{
LOG(debug, "[%d] : Visiting %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to());
for (DomainPart::SP dpSafe = _domain->findPart(_range.from()); dpSafe.get() && (_range.from() < _range.to()) && (dpSafe.get()->range().from() <= _range.to()); dpSafe = _domain->findPart(_range.from())) {
// Must use findPart and iterate until no candidate parts found.
DomainPart * dp(dpSafe.get());
LOG(debug, "[%d] : Visiting the interval %" PRIu64 " - %" PRIu64 " in domain part [%" PRIu64 ", %" PRIu64 "]", _id, _range.from(), _range.to(), dp->range().from(), dp->range().to());
Fast_BufferedFile file;
file.EnableDirectIO();
for(bool more(true); ok() && more && (_range.from() < _range.to()); ) {
more = visit(file, *dp);
}
// Nothing more in this DomainPart, force switch to next one.
if (_range.from() < dp->range().to()) {
_range.from(std::min(dp->range().to(), _range.to()));
}
}
LOG(debug, "[%d] : Done visiting, starting subscribe %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to());
}
void
Session::startVisit() {
assert(!_visitRunning);
_visitRunning = true;
}
void
Session::visitOnly()
{
visit();
sendDone();
finalize();
_visitRunning = false;
}
bool Session::finished() const {
return _finished || ! _destination->connected();
}
void
Session::finalize()
{
if (!ok()) {
LOG(error, "[%d] : Error in %s(%" PRIu64 " - %" PRIu64 "), stopping since I have no idea on what to do.", _id, "visitor", _range.from(), _range.to());
}
LOG(debug, "[%d] : Stopped %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to());
_finished = true;
}
Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
std::unique_ptr<Destination> destination) :
_destination(std::move(destination)),
_domain(d),
_range(r),
_id(sId),
_visitRunning(false),
_inSync(false),
_finished(false),
_startTime()
{
}
Session::~Session() = default;
bool
Session::send(const Packet & packet)
{
return _destination->send(_id, _domain->name(), packet);
}
bool
Session::sendDone()
{
bool retval = _destination->sendDone(_id, _domain->name());
_inSync = true;
return retval;
}
}
|