aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-08 11:23:46 +0100
committerGitHub <noreply@github.com>2021-12-08 11:23:46 +0100
commit8efb6366e61d947fa89304dfe6e58ec1bb243c12 (patch)
tree573f0e98d544fa380701b8a8f8cd5eba8edb5431 /searchlib
parentd3b509f8cee79912a402a1d825b78fe21c03d6a2 (diff)
parent5d5c235225f27f7997cb6dc0292302a0b19773c8 (diff)
Merge pull request #20397 from vespa-engine/balder/use-multiple-tls-compression-threads
Use a helper pool for the actual compression.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp47
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp22
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h9
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp5
6 files changed, 51 insertions, 44 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index 5740eeb610d..d3c3af3a9ca 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -7,6 +7,7 @@
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/document/util/bytebuffer.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/fastos/file.h>
#include <thread>
@@ -316,43 +317,33 @@ fillDomainTest(Session * s1, size_t numPackets, size_t numEntries)
}
}
-using Counter = std::atomic<size_t>;
-
-class CountDone : public IDestructorCallback {
-public:
- explicit CountDone(Counter & inFlight) noexcept : _inFlight(inFlight) { ++_inFlight; }
- ~CountDone() override { --_inFlight; }
-private:
- Counter & _inFlight;
-};
-
void
-fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries)
+fillDomainTest(IDestructorCallback::SP onDone, TransLogServer & tls, const vespalib::string & domain, size_t numPackets, size_t numEntries)
{
size_t value(0);
- Counter inFlight(0);
- auto domainWriter = s1.getWriter(domain);
- for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
- for(size_t j=0; j < numEntries; j++, value++) {
- Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
+ auto domainWriter = tls.getWriter(domain);
+
+ for (size_t i = 0; i < numPackets; i++) {
+ auto p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE);
+ for (size_t j = 0; j < numEntries; j++, value++) {
+ Packet::Entry e(value + 1, j + 1, vespalib::ConstBufferRef((const char *) &value, sizeof(value)));
p->add(e);
- if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
- domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
+ if (p->sizeBytes() > DEFAULT_PACKET_SIZE) {
+ domainWriter->append(*p, onDone);
p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE);
}
}
- domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
- auto keep = domainWriter->startCommit(Writer::DoneCallback());
- LOG(info, "Inflight %ld", inFlight.load());
- }
- while (inFlight.load() != 0) {
- std::this_thread::sleep_for(10ms);
- LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load());
+ domainWriter->append(*p, onDone);
+ auto keep = domainWriter->startCommit(onDone);
}
-
}
+void
+fillDomainTest(TransLogServer & tls, const vespalib::string & domain, size_t numPackets, size_t numEntries) {
+ vespalib::Gate gate;
+ fillDomainTest(std::make_shared<vespalib::GateCallback>(gate), tls, domain, numPackets, numEntries);
+ gate.await();
+}
void
fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
@@ -545,7 +536,7 @@ partialUpdateTest(const vespalib::string & testDir) {
ASSERT_TRUE( visitor->visit(5, 7) );
for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
ASSERT_TRUE( ca._eof );
- ASSERT_TRUE( ca.map().size() == 1);
+ ASSERT_EQUAL(1u, ca.map().size());
ASSERT_TRUE( ca.hasSerial(7) );
CallBackUpdate ca1;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 7fa87f3ec09..96b94955570 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -12,6 +12,7 @@
#include <algorithm>
#include <thread>
#include <cassert>
+#include <future>
#include <vespa/log/log.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
@@ -394,27 +395,32 @@ Domain::grabCurrentChunk(const UniqueLock & guard) {
void
Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) {
assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock());
- _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
- doCommit(std::move(chunk));
+ if (chunk->getPacket().empty()) return;
+ std::promise<SerializedChunk> promise;
+ std::future<SerializedChunk> future = promise.get_future();
+ _executor.execute(makeLambdaTask([promise=std::move(promise), chunk = std::move(chunk),
+ encoding=_config.getEncoding(), compressionLevel=_config.getCompressionlevel()]() mutable {
+ promise.set_value(SerializedChunk(std::move(chunk), encoding, compressionLevel));
+ }));
+ _singleCommitter->execute( makeLambdaTask([this, future = std::move(future)]() mutable {
+ doCommit(future.get());
}));
}
void
-Domain::doCommit(std::unique_ptr<CommitChunk> chunk) {
- const Packet & packet = chunk->getPacket();
- if (packet.empty()) return;
+Domain::doCommit(const SerializedChunk & serialized) {
- SerializedChunk serialized(packet, _config.getEncoding(), _config.getCompressionlevel());
- DomainPart::SP dp = optionallyRotateFile(packet.range().from());
+ SerialNumRange range = serialized.range();
+ DomainPart::SP dp = optionallyRotateFile(range.from());
dp->commit(serialized);
if (_config.getFSyncOnCommit()) {
dp->sync();
}
cleanSessions();
LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.",
- chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes());
+ serialized.commitChunk().getNumCallBacks(), serialized.getNumEntries(), serialized.getData().size());
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 2e912ad6201..eb3d0b6b10b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -64,7 +64,7 @@ private:
std::unique_ptr<CommitChunk> grabCurrentChunk(const UniqueLock & guard);
void commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard);
- void doCommit(std::unique_ptr<CommitChunk> chunk);
+ void doCommit(const SerializedChunk & serialized);
SerialNum begin(const UniqueLock & guard) const;
SerialNum end(const UniqueLock & guard) const;
size_t byteSize(const UniqueLock & guard) const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
index e3d98cd576d..99370d263ec 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
@@ -135,11 +135,13 @@ encode(vespalib::nbostream & os, const IChunk & chunk, Encoding encoding) {
chunk.getEntries().size(), os.size(), range.from(), range.to(), encoding.getRaw(), realEncoding.getRaw());
}
-SerializedChunk::SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel)
- : _os(),
- _range(packet.range()),
- _numEntries(packet.size())
+SerializedChunk::SerializedChunk(std::unique_ptr<CommitChunk> commitChunk, Encoding encoding, uint8_t compressionLevel)
+ : _commitChunk(std::move(commitChunk)),
+ _os(),
+ _range(_commitChunk->getPacket().range()),
+ _numEntries(_commitChunk->getPacket().size())
{
+ const Packet & packet = _commitChunk->getPacket();
nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size());
IChunk::UP chunk = IChunk::create(encoding, compressionLevel);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index dccfd6617f5..e5daeb810f4 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -38,11 +38,18 @@ std::ostream & operator << (std::ostream & os, Encoding e);
*/
class SerializedChunk {
public:
- SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel);
+ SerializedChunk(std::unique_ptr<CommitChunk> chunk, Encoding encoding, uint8_t compressionLevel);
+ SerializedChunk(SerializedChunk &&) = default;
+ SerializedChunk & operator=(SerializedChunk &&) = default;
+ SerializedChunk(const SerializedChunk &) = delete;
+ SerializedChunk & operator=(const SerializedChunk &) = delete;
vespalib::ConstBufferRef getData() const;
SerialNumRange range() const { return _range; }
size_t getNumEntries() const { return _numEntries; }
+ const CommitChunk & commitChunk() const { return *_commitChunk; }
private:
+ // CommitChunk is required to ensure we do not reply until committed to the TLS.
+ std::unique_ptr<CommitChunk> _commitChunk;
vespalib::nbostream _os;
SerialNumRange _range;
size_t _numEntries;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index ce190d2c093..db2cf2a255d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -578,9 +578,10 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
try {
vespalib::Gate gate;
{
+ auto onDone = make_shared<vespalib::GateCallback>(gate);
// Need to scope in order to drain out all the callbacks.
- domain->append(packet, make_shared<vespalib::GateCallback>(gate));
- auto keep = domain->startCommit(make_shared<vespalib::IgnoreCallback>());
+ domain->append(packet, onDone);
+ auto keep = domain->startCommit(onDone);
}
gate.await();
ret.AddInt32(0);