summaryrefslogtreecommitdiffstats
path: root/searchlib/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-26 21:52:19 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-28 12:05:18 +0000
commit24b812dc562a2727dda0b0f20759d153f0283ee9 (patch)
tree2d00589419c5491294fa53c69dc9f8343dab8ce0 /searchlib/src
parentcef539adea8b1ad6ba2314722c699a0a412082b7 (diff)
- Amortise write cost by grouping mulptiple operations together when writing to TLS.
- Commit memorystructures only then persisting to disk. - Ack operations back to user when both are completed. - Do not schedule a new commit task until both the tls and the memory structures have been comitted.
Diffstat (limited to 'searchlib/src')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp66
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h5
2 files changed, 66 insertions, 5 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 9e0f1a8a1aa..bd7feec0598 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -113,7 +113,12 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
}
}
-Domain::~Domain() { }
+Domain::~Domain() {
+ MonitorGuard guard(_currentChunkMonitor);
+ guard.broadcast();
+ commitChunk(grabCurrentChunk(guard), guard);
+ _singleCommitter->shutdown().sync();
+}
DomainInfo
Domain::getDomainInfo() const
@@ -318,22 +323,73 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
return dp;
}
+void
+Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if (_lastSerial >= packet.range().from()) {
+ throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
+ packet.range().from(), _lastSerial));
+ } else {
+ _lastSerial = packet.range().to();
+ }
+ _currentChunk->add(packet, std::move(onDone));
+ commitIfFull(guard);
+}
+
Domain::CommitResult
Domain::startCommit(DoneCallback onDone) {
- (void) onDone;
+ vespalib::MonitorGuard guard(_currentChunkMonitor);
+ if ( !_currentChunk->empty() ) {
+ auto completed = grabCurrentChunk(guard);
+ completed->setCommitDoneCallback(std::move(onDone));
+ CommitResult result(completed->createCommitResult());
+ commitChunk(std::move(completed), guard);
+ return result;
+ }
return CommitResult();
}
void
-Domain::append(const Packet & packet, Writer::DoneCallback onDone)
-{
- (void) onDone;
+Domain::commitIfFull(const vespalib::MonitorGuard &guard) {
+ if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
+ auto completed = std::move(_currentChunk);
+ _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks());
+ commitChunk(std::move(completed), guard);
+ }
+}
+
+std::unique_ptr<CommitChunk>
+Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
+ assert(guard.monitors(_currentChunkMonitor));
+ auto chunk = std::move(_currentChunk);
+ _currentChunk = createCommitChunk(_config);
+ return chunk;
+}
+
+void
+Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) {
+ assert(chunkOrderGuard.monitors(_currentChunkMonitor));
+ _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
+ doCommit(std::move(chunk));
+ }));
+}
+
+void
+Domain::doCommit(std::unique_ptr<CommitChunk> chunk) {
+ const Packet & packet = chunk->getPacket();
+ if (packet.empty()) return;
+
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
DomainPart::SP dp = optionallyRotateFile(entry.serial());
dp->commit(entry.serial(), packet);
+ if (_config.getFSyncOnCommit()) {
+ dp->sync();
+ }
cleanSessions();
+ LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.",
+ chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes());
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 7e77e6ef0ef..041ec27cf23 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -56,6 +56,11 @@ public:
uint64_t size() const;
Domain & setConfig(const DomainConfig & cfg);
private:
+ void commitIfFull(const vespalib::MonitorGuard & guard);
+
+ std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
+ void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
+ void doCommit(std::unique_ptr<CommitChunk> chunk);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
size_t byteSize(const vespalib::LockGuard & guard) const;