diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-08 11:23:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-08 11:23:46 +0100 |
commit | 8efb6366e61d947fa89304dfe6e58ec1bb243c12 (patch) | |
tree | 573f0e98d544fa380701b8a8f8cd5eba8edb5431 /searchlib | |
parent | d3b509f8cee79912a402a1d825b78fe21c03d6a2 (diff) | |
parent | 5d5c235225f27f7997cb6dc0292302a0b19773c8 (diff) |
Merge pull request #20397 from vespa-engine/balder/use-multiple-tls-compression-threads
Use a helper pool for the actual compression.
Diffstat (limited to 'searchlib')
6 files changed, 51 insertions, 44 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 5740eeb610d..d3c3af3a9ca 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -7,6 +7,7 @@ #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/document/util/bytebuffer.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/fastos/file.h> #include <thread> @@ -316,43 +317,33 @@ fillDomainTest(Session * s1, size_t numPackets, size_t numEntries) } } -using Counter = std::atomic<size_t>; - -class CountDone : public IDestructorCallback { -public: - explicit CountDone(Counter & inFlight) noexcept : _inFlight(inFlight) { ++_inFlight; } - ~CountDone() override { --_inFlight; } -private: - Counter & _inFlight; -}; - void -fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries) +fillDomainTest(IDestructorCallback::SP onDone, TransLogServer & tls, const vespalib::string & domain, size_t numPackets, size_t numEntries) { size_t value(0); - Counter inFlight(0); - auto domainWriter = s1.getWriter(domain); - for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); - for(size_t j=0; j < numEntries; j++, value++) { - Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); + auto domainWriter = tls.getWriter(domain); + + for (size_t i = 0; i < numPackets; i++) { + auto p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE); + for (size_t j = 0; j < numEntries; j++, value++) { + Packet::Entry e(value + 1, j + 1, vespalib::ConstBufferRef((const char *) &value, sizeof(value))); p->add(e); - if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { - domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); + if (p->sizeBytes() > DEFAULT_PACKET_SIZE) { + domainWriter->append(*p, onDone); p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE); } } - domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); - auto keep = domainWriter->startCommit(Writer::DoneCallback()); - LOG(info, "Inflight %ld", inFlight.load()); - } - while (inFlight.load() != 0) { - std::this_thread::sleep_for(10ms); - LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load()); + domainWriter->append(*p, onDone); + auto keep = domainWriter->startCommit(onDone); } - } +void +fillDomainTest(TransLogServer & tls, const vespalib::string & domain, size_t numPackets, size_t numEntries) { + vespalib::Gate gate; + fillDomainTest(std::make_shared<vespalib::GateCallback>(gate), tls, domain, numPackets, numEntries); + gate.await(); +} void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) @@ -545,7 +536,7 @@ partialUpdateTest(const vespalib::string & testDir) { ASSERT_TRUE( visitor->visit(5, 7) ); for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } ASSERT_TRUE( ca._eof ); - ASSERT_TRUE( ca.map().size() == 1); + ASSERT_EQUAL(1u, ca.map().size()); ASSERT_TRUE( ca.hasSerial(7) ); CallBackUpdate ca1; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 7fa87f3ec09..96b94955570 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -12,6 +12,7 @@ #include <algorithm> #include <thread> #include <cassert> +#include <future> #include <vespa/log/log.h> #include <vespa/vespalib/util/threadstackexecutor.h> @@ -394,27 +395,32 @@ Domain::grabCurrentChunk(const UniqueLock & guard) { void Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) { assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock()); - _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { - doCommit(std::move(chunk)); + if (chunk->getPacket().empty()) return; + std::promise<SerializedChunk> promise; + std::future<SerializedChunk> future = promise.get_future(); + _executor.execute(makeLambdaTask([promise=std::move(promise), chunk = std::move(chunk), + encoding=_config.getEncoding(), compressionLevel=_config.getCompressionlevel()]() mutable { + promise.set_value(SerializedChunk(std::move(chunk), encoding, compressionLevel)); + })); + _singleCommitter->execute( makeLambdaTask([this, future = std::move(future)]() mutable { + doCommit(future.get()); })); } void -Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { - const Packet & packet = chunk->getPacket(); - if (packet.empty()) return; +Domain::doCommit(const SerializedChunk & serialized) { - SerializedChunk serialized(packet, _config.getEncoding(), _config.getCompressionlevel()); - DomainPart::SP dp = optionallyRotateFile(packet.range().from()); + SerialNumRange range = serialized.range(); + DomainPart::SP dp = optionallyRotateFile(range.from()); dp->commit(serialized); if (_config.getFSyncOnCommit()) { dp->sync(); } cleanSessions(); LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", - chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); + serialized.commitChunk().getNumCallBacks(), serialized.getNumEntries(), serialized.getData().size()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 2e912ad6201..eb3d0b6b10b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -64,7 +64,7 @@ private: std::unique_ptr<CommitChunk> grabCurrentChunk(const UniqueLock & guard); void commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard); - void doCommit(std::unique_ptr<CommitChunk> chunk); + void doCommit(const SerializedChunk & serialized); SerialNum begin(const UniqueLock & guard) const; SerialNum end(const UniqueLock & guard) const; size_t byteSize(const UniqueLock & guard) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp index e3d98cd576d..99370d263ec 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -135,11 +135,13 @@ encode(vespalib::nbostream & os, const IChunk & chunk, Encoding encoding) { chunk.getEntries().size(), os.size(), range.from(), range.to(), encoding.getRaw(), realEncoding.getRaw()); } -SerializedChunk::SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel) - : _os(), - _range(packet.range()), - _numEntries(packet.size()) +SerializedChunk::SerializedChunk(std::unique_ptr<CommitChunk> commitChunk, Encoding encoding, uint8_t compressionLevel) + : _commitChunk(std::move(commitChunk)), + _os(), + _range(_commitChunk->getPacket().range()), + _numEntries(_commitChunk->getPacket().size()) { + const Packet & packet = _commitChunk->getPacket(); nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); IChunk::UP chunk = IChunk::create(encoding, compressionLevel); diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index dccfd6617f5..e5daeb810f4 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -38,11 +38,18 @@ std::ostream & operator << (std::ostream & os, Encoding e); */ class SerializedChunk { public: - SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel); + SerializedChunk(std::unique_ptr<CommitChunk> chunk, Encoding encoding, uint8_t compressionLevel); + SerializedChunk(SerializedChunk &&) = default; + SerializedChunk & operator=(SerializedChunk &&) = default; + SerializedChunk(const SerializedChunk &) = delete; + SerializedChunk & operator=(const SerializedChunk &) = delete; vespalib::ConstBufferRef getData() const; SerialNumRange range() const { return _range; } size_t getNumEntries() const { return _numEntries; } + const CommitChunk & commitChunk() const { return *_commitChunk; } private: + // CommitChunk is required to ensure we do not reply until committed to the TLS. + std::unique_ptr<CommitChunk> _commitChunk; vespalib::nbostream _os; SerialNumRange _range; size_t _numEntries; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ce190d2c093..db2cf2a255d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -578,9 +578,10 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) try { vespalib::Gate gate; { + auto onDone = make_shared<vespalib::GateCallback>(gate); // Need to scope in order to drain out all the callbacks. - domain->append(packet, make_shared<vespalib::GateCallback>(gate)); - auto keep = domain->startCommit(make_shared<vespalib::IgnoreCallback>()); + domain->append(packet, onDone); + auto keep = domain->startCommit(onDone); } gate.await(); ret.AddInt32(0); |