aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-26 21:30:47 +0200
committerGitHub <noreply@github.com>2020-09-26 21:30:47 +0200
commitf6683a698da0a7478a00dcc57c9abf2dccc35f4a (patch)
tree60d8dae4397bdff0068e255b3e26efb8f4662ba8 /searchlib
parent511057bb713a56f99a9b73e2ab3de998f72aee67 (diff)
Revert "Revert "Revert "Revert "Revert "Balder/group operations to tls and commit in batches"""""
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp66
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp7
5 files changed, 11 insertions, 81 deletions
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index defce8c3421..540895b2404 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -15,7 +15,7 @@ basedir string default="tmp" restart
## Use fsync after each commit.
## If not the below interval is used.
-usefsync bool default=false
+usefsync bool default=false restart
##Number of threads available for visiting/subscription.
maxthreads int default=4 restart
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index bd7feec0598..9e0f1a8a1aa 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -113,12 +113,7 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
}
}
-Domain::~Domain() {
- MonitorGuard guard(_currentChunkMonitor);
- guard.broadcast();
- commitChunk(grabCurrentChunk(guard), guard);
- _singleCommitter->shutdown().sync();
-}
+Domain::~Domain() { }
DomainInfo
Domain::getDomainInfo() const
@@ -323,73 +318,22 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
return dp;
}
-void
-Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
- vespalib::MonitorGuard guard(_currentChunkMonitor);
- if (_lastSerial >= packet.range().from()) {
- throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
- packet.range().from(), _lastSerial));
- } else {
- _lastSerial = packet.range().to();
- }
- _currentChunk->add(packet, std::move(onDone));
- commitIfFull(guard);
-}
-
Domain::CommitResult
Domain::startCommit(DoneCallback onDone) {
- vespalib::MonitorGuard guard(_currentChunkMonitor);
- if ( !_currentChunk->empty() ) {
- auto completed = grabCurrentChunk(guard);
- completed->setCommitDoneCallback(std::move(onDone));
- CommitResult result(completed->createCommitResult());
- commitChunk(std::move(completed), guard);
- return result;
- }
+ (void) onDone;
return CommitResult();
}
void
-Domain::commitIfFull(const vespalib::MonitorGuard &guard) {
- if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
- auto completed = std::move(_currentChunk);
- _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks());
- commitChunk(std::move(completed), guard);
- }
-}
-
-std::unique_ptr<CommitChunk>
-Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
- assert(guard.monitors(_currentChunkMonitor));
- auto chunk = std::move(_currentChunk);
- _currentChunk = createCommitChunk(_config);
- return chunk;
-}
-
-void
-Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) {
- assert(chunkOrderGuard.monitors(_currentChunkMonitor));
- _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
- doCommit(std::move(chunk));
- }));
-}
-
-void
-Domain::doCommit(std::unique_ptr<CommitChunk> chunk) {
- const Packet & packet = chunk->getPacket();
- if (packet.empty()) return;
-
+Domain::append(const Packet & packet, Writer::DoneCallback onDone)
+{
+ (void) onDone;
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
DomainPart::SP dp = optionallyRotateFile(entry.serial());
dp->commit(entry.serial(), packet);
- if (_config.getFSyncOnCommit()) {
- dp->sync();
- }
cleanSessions();
- LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.",
- chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes());
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 041ec27cf23..7e77e6ef0ef 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -56,11 +56,6 @@ public:
uint64_t size() const;
Domain & setConfig(const DomainConfig & cfg);
private:
- void commitIfFull(const vespalib::MonitorGuard & guard);
-
- std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
- void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
- void doCommit(std::unique_ptr<CommitChunk> chunk);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
size_t byteSize(const vespalib::LockGuard & guard) const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index b7e02894e6b..8855183226d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -252,7 +252,7 @@ DomainPart::buildPacketMapping(bool allowTruncate)
DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate)
- : _encoding(encoding),
+ : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression
_compressionLevel(compressionLevel),
_lock(),
_fileLock(),
@@ -396,19 +396,16 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
if (_range.from() == 0) {
_range.from(firstSerial);
}
- IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
for (size_t i(0); h.size() > 0; i++) {
//LOG(spam,
//"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
//h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
+ IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
Packet::Entry entry;
entry.deserialize(h);
if (_range.to() < entry.serial()) {
chunk->add(entry);
- if (_encoding.getCompression() == Encoding::Compression::none) {
- write(*_transLog, *chunk);
- chunk = IChunk::create(_encoding, _compressionLevel);
- }
+ write(*_transLog, *chunk);
_sz++;
_range.to(entry.serial());
} else {
@@ -416,9 +413,6 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
entry.serial(), _range.to()));
}
}
- if ( ! chunk->getEntries().empty()) {
- write(*_transLog, *chunk);
- }
bool merged(false);
LockGuard guard(_lock);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 0c0c9186e12..7be3dd708a5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -572,11 +572,8 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
vespalib::Gate gate;
- {
- // Need to scope in order to drain out all the callbacks.
- domain->append(packet, make_shared<GateCallback>(gate));
- auto keep = domain->startCommit(make_shared<IgnoreCallback>());
- }
+ domain->append(packet, make_shared<GateCallback>(gate));
+ auto keep = domain->startCommit(make_shared<IgnoreCallback>());
gate.await();
ret.AddInt32(0);
ret.AddString("ok");