aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-22 19:51:01 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-22 20:19:14 +0000
commit9905ffa957edab94cbe9e0129ccbcb4c4052801f (patch)
tree8ce9740f103d9dc0fb4d1d067afc548df5280bac /searchlib
parentfa2ffff33152bfaf3706f68aeb74d542a155a00e (diff)
Revert "Revert "- Group commits to the TLS and sync to disk before acking.""
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp66
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp7
5 files changed, 81 insertions, 11 deletions
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index 540895b2404..defce8c3421 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -15,7 +15,7 @@ basedir string default="tmp" restart
## Use fsync after each commit.
## If not the below interval is used.
-usefsync bool default=false restart
+usefsync bool default=false
##Number of threads available for visiting/subscription.
maxthreads int default=4 restart
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 9e0f1a8a1aa..bd7feec0598 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -113,7 +113,12 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
}
}
-Domain::~Domain() { }
+Domain::~Domain() {
+ MonitorGuard guard(_currentChunkMonitor);
+ guard.broadcast();
+ commitChunk(grabCurrentChunk(guard), guard);
+ _singleCommitter->shutdown().sync();
+}
DomainInfo
Domain::getDomainInfo() const
@@ -318,22 +323,73 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
return dp;
}
+void
+Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if (_lastSerial >= packet.range().from()) {
+ throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
+ packet.range().from(), _lastSerial));
+ } else {
+ _lastSerial = packet.range().to();
+ }
+ _currentChunk->add(packet, std::move(onDone));
+ commitIfFull(guard);
+}
+
Domain::CommitResult
Domain::startCommit(DoneCallback onDone) {
- (void) onDone;
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if ( !_currentChunk->empty() ) {
+ auto completed = grabCurrentChunk(guard);
+ completed->setCommitDoneCallback(std::move(onDone));
+ CommitResult result(completed->createCommitResult());
+ commitChunk(std::move(completed), guard);
+ return result;
+ }
return CommitResult();
}
void
-Domain::append(const Packet & packet, Writer::DoneCallback onDone)
-{
- (void) onDone;
+Domain::commitIfFull(const vespalib::MonitorGuard &guard) {
+ if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
+ auto completed = std::move(_currentChunk);
+ _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks());
+ commitChunk(std::move(completed), guard);
+ }
+}
+
+std::unique_ptr<CommitChunk>
+Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
+ assert(guard.monitors(_currentChunkMonitor));
+ auto chunk = std::move(_currentChunk);
+ _currentChunk = createCommitChunk(_config);
+ return chunk;
+}
+
+void
+Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) {
+ assert(chunkOrderGuard.monitors(_currentChunkMonitor));
+ _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
+ doCommit(std::move(chunk));
+ }));
+}
+
+void
+Domain::doCommit(std::unique_ptr<CommitChunk> chunk) {
+ const Packet & packet = chunk->getPacket();
+ if (packet.empty()) return;
+
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
DomainPart::SP dp = optionallyRotateFile(entry.serial());
dp->commit(entry.serial(), packet);
+ if (_config.getFSyncOnCommit()) {
+ dp->sync();
+ }
cleanSessions();
+ LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.",
+ chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes());
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 7e77e6ef0ef..041ec27cf23 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -56,6 +56,11 @@ public:
uint64_t size() const;
Domain & setConfig(const DomainConfig & cfg);
private:
+ void commitIfFull(const vespalib::MonitorGuard & guard);
+
+ std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
+ void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
+ void doCommit(std::unique_ptr<CommitChunk> chunk);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
size_t byteSize(const vespalib::LockGuard & guard) const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 8855183226d..b7e02894e6b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate)
DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate)
- : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression
+ : _encoding(encoding),
_compressionLevel(compressionLevel),
_lock(),
_fileLock(),
@@ -396,16 +396,19 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
if (_range.from() == 0) {
_range.from(firstSerial);
}
+ IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
for (size_t i(0); h.size() > 0; i++) {
//LOG(spam,
//"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
//h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
- IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
Packet::Entry entry;
entry.deserialize(h);
if (_range.to() < entry.serial()) {
chunk->add(entry);
- write(*_transLog, *chunk);
+ if (_encoding.getCompression() == Encoding::Compression::none) {
+ write(*_transLog, *chunk);
+ chunk = IChunk::create(_encoding, _compressionLevel);
+ }
_sz++;
_range.to(entry.serial());
} else {
@@ -413,6 +416,9 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
entry.serial(), _range.to()));
}
}
+ if ( ! chunk->getEntries().empty()) {
+ write(*_transLog, *chunk);
+ }
bool merged(false);
LockGuard guard(_lock);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 7be3dd708a5..0c0c9186e12 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -572,8 +572,11 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
vespalib::Gate gate;
- domain->append(packet, make_shared<GateCallback>(gate));
- auto keep = domain->startCommit(make_shared<IgnoreCallback>());
+ {
+ // Need to scope in order to drain out all the callbacks.
+ domain->append(packet, make_shared<GateCallback>(gate));
+ auto keep = domain->startCommit(make_shared<IgnoreCallback>());
+ }
gate.await();
ret.AddInt32(0);
ret.AddString("ok");