diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-06 21:45:36 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-06 21:45:36 +0000 |
commit | 290255929457149590206c74f36b824bb7258225 (patch) | |
tree | cb189cd2b52ac519dc44ffb0465da4aae69d3fc0 /searchlib | |
parent | e3b3ac158de3da28e2b5e63f37c96c1a68919282 (diff) |
Use a helper pool for the actual compression.
First make a promise, then pass the cunk over to the helper pool.
Pass the future to the single write thread that will ensure proper sequencing.
Diffstat (limited to 'searchlib')
6 files changed, 49 insertions, 33 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 5740eeb610d..a6474f9a320 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -7,6 +7,7 @@ #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/document/util/bytebuffer.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/fastos/file.h> #include <thread> @@ -332,24 +333,24 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP size_t value(0); Counter inFlight(0); auto domainWriter = s1.getWriter(domain); - for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); - for(size_t j=0; j < numEntries; j++, value++) { - Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - p->add(e); - if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { - domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); - p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE); + vespalib::Gate gate; + { + auto onDone = std::make_shared<vespalib::GateCallback>(gate); + for (size_t i = 0; i < numPackets; i++) { + auto p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE); + for (size_t j = 0; j < numEntries; j++, value++) { + Packet::Entry e(value + 1, j + 1, vespalib::ConstBufferRef((const char *) &value, sizeof(value))); + p->add(e); + if (p->sizeBytes() > DEFAULT_PACKET_SIZE) { + domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); + p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE); + } } + domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); + auto keep = domainWriter->startCommit(onDone); } - domainWriter->append(*p, std::make_shared<CountDone>(inFlight)); - auto keep = domainWriter->startCommit(Writer::DoneCallback()); - LOG(info, "Inflight %ld", inFlight.load()); - } - while (inFlight.load() != 0) { - std::this_thread::sleep_for(10ms); - LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load()); } + gate.await(); } @@ -545,7 +546,7 @@ partialUpdateTest(const vespalib::string & testDir) { ASSERT_TRUE( visitor->visit(5, 7) ); for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } ASSERT_TRUE( ca._eof ); - ASSERT_TRUE( ca.map().size() == 1); + ASSERT_EQUAL(1u, ca.map().size()); ASSERT_TRUE( ca.hasSerial(7) ); CallBackUpdate ca1; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 7fa87f3ec09..7a6356fb076 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -12,6 +12,7 @@ #include <algorithm> #include <thread> #include <cassert> +#include <future> #include <vespa/log/log.h> #include <vespa/vespalib/util/threadstackexecutor.h> @@ -394,27 +395,32 @@ Domain::grabCurrentChunk(const UniqueLock & guard) { void Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) { assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock()); - _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { - doCommit(std::move(chunk)); + if (chunk->getPacket().empty()) return; + std::promise<SerializedChunk> promise; + std::future<SerializedChunk> future = promise.get_future(); + _executor.execute(makeLambdaTask([promise=std::move(promise), chunk = std::move(chunk), + encoding=_config.getEncoding(), compressionLevel=_config.getCompressionlevel()]() mutable { + promise.set_value(SerializedChunk(std::move(chunk), encoding, compressionLevel)); + })); + _singleCommitter->execute( makeLambdaTask([this, future = std::move(future)]() mutable { + doCommit(future.get()); })); } void -Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { - const Packet & packet = chunk->getPacket(); - if (packet.empty()) return; +Domain::doCommit(const SerializedChunk & serialized) { - SerializedChunk serialized(packet, _config.getEncoding(), _config.getCompressionlevel()); - DomainPart::SP dp = optionallyRotateFile(packet.range().from()); + SerialNumRange range = serialized.range(); + DomainPart::SP dp = optionallyRotateFile(range.from()); dp->commit(serialized); if (_config.getFSyncOnCommit()) { dp->sync(); } cleanSessions(); - LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", - chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); + LOG(debug, "Releasing unknown acks and %zu entries and %zu bytes.", + /*chunk->getNumCallBacks(),*/ serialized.getNumEntries(), serialized.getData().size()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 2e912ad6201..eb3d0b6b10b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -64,7 +64,7 @@ private: std::unique_ptr<CommitChunk> grabCurrentChunk(const UniqueLock & guard); void commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard); - void doCommit(std::unique_ptr<CommitChunk> chunk); + void doCommit(const SerializedChunk & serialized); SerialNum begin(const UniqueLock & guard) const; SerialNum end(const UniqueLock & guard) const; size_t byteSize(const UniqueLock & guard) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp index e3d98cd576d..99370d263ec 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -135,11 +135,13 @@ encode(vespalib::nbostream & os, const IChunk & chunk, Encoding encoding) { chunk.getEntries().size(), os.size(), range.from(), range.to(), encoding.getRaw(), realEncoding.getRaw()); } -SerializedChunk::SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel) - : _os(), - _range(packet.range()), - _numEntries(packet.size()) +SerializedChunk::SerializedChunk(std::unique_ptr<CommitChunk> commitChunk, Encoding encoding, uint8_t compressionLevel) + : _commitChunk(std::move(commitChunk)), + _os(), + _range(_commitChunk->getPacket().range()), + _numEntries(_commitChunk->getPacket().size()) { + const Packet & packet = _commitChunk->getPacket(); nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); IChunk::UP chunk = IChunk::create(encoding, compressionLevel); diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index dccfd6617f5..d1de1c9c893 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -38,11 +38,17 @@ std::ostream & operator << (std::ostream & os, Encoding e); */ class SerializedChunk { public: - SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel); + SerializedChunk(std::unique_ptr<CommitChunk> chunk, Encoding encoding, uint8_t compressionLevel); + SerializedChunk(SerializedChunk &&) = default; + SerializedChunk & operator=(SerializedChunk &&) = default; + SerializedChunk(const SerializedChunk &) = delete; + SerializedChunk & operator=(const SerializedChunk &) = delete; vespalib::ConstBufferRef getData() const; SerialNumRange range() const { return _range; } size_t getNumEntries() const { return _numEntries; } + const CommitChunk & commitChunk() const { return *_commitChunk; } private: + std::unique_ptr<CommitChunk> _commitChunk; vespalib::nbostream _os; SerialNumRange _range; size_t _numEntries; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ce190d2c093..db2cf2a255d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -578,9 +578,10 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) try { vespalib::Gate gate; { + auto onDone = make_shared<vespalib::GateCallback>(gate); // Need to scope in order to drain out all the callbacks. - domain->append(packet, make_shared<vespalib::GateCallback>(gate)); - auto keep = domain->startCommit(make_shared<vespalib::IgnoreCallback>()); + domain->append(packet, onDone); + auto keep = domain->startCommit(onDone); } gate.await(); ret.AddInt32(0); |