aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-06 21:45:36 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-12-06 21:45:36 +0000
commit290255929457149590206c74f36b824bb7258225 (patch)
treecb189cd2b52ac519dc44ffb0465da4aae69d3fc0 /searchlib
parente3b3ac158de3da28e2b5e63f37c96c1a68919282 (diff)
Use a helper pool for the actual compression.
First make a promise, then pass the cunk over to the helper pool. Pass the future to the single write thread that will ensure proper sequencing.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp33
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp24
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h8
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp5
6 files changed, 49 insertions, 33 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index 5740eeb610d..a6474f9a320 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -7,6 +7,7 @@
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/document/util/bytebuffer.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/fastos/file.h>
#include <thread>
@@ -332,24 +333,24 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP
size_t value(0);
Counter inFlight(0);
auto domainWriter = s1.getWriter(domain);
- for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
- for(size_t j=0; j < numEntries; j++, value++) {
- Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
- p->add(e);
- if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
- domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
- p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE);
+ vespalib::Gate gate;
+ {
+ auto onDone = std::make_shared<vespalib::GateCallback>(gate);
+ for (size_t i = 0; i < numPackets; i++) {
+ auto p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE);
+ for (size_t j = 0; j < numEntries; j++, value++) {
+ Packet::Entry e(value + 1, j + 1, vespalib::ConstBufferRef((const char *) &value, sizeof(value)));
+ p->add(e);
+ if (p->sizeBytes() > DEFAULT_PACKET_SIZE) {
+ domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
+ p = std::make_unique<Packet>(DEFAULT_PACKET_SIZE);
+ }
}
+ domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
+ auto keep = domainWriter->startCommit(onDone);
}
- domainWriter->append(*p, std::make_shared<CountDone>(inFlight));
- auto keep = domainWriter->startCommit(Writer::DoneCallback());
- LOG(info, "Inflight %ld", inFlight.load());
- }
- while (inFlight.load() != 0) {
- std::this_thread::sleep_for(10ms);
- LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load());
}
+ gate.await();
}
@@ -545,7 +546,7 @@ partialUpdateTest(const vespalib::string & testDir) {
ASSERT_TRUE( visitor->visit(5, 7) );
for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
ASSERT_TRUE( ca._eof );
- ASSERT_TRUE( ca.map().size() == 1);
+ ASSERT_EQUAL(1u, ca.map().size());
ASSERT_TRUE( ca.hasSerial(7) );
CallBackUpdate ca1;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 7fa87f3ec09..7a6356fb076 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -12,6 +12,7 @@
#include <algorithm>
#include <thread>
#include <cassert>
+#include <future>
#include <vespa/log/log.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
@@ -394,27 +395,32 @@ Domain::grabCurrentChunk(const UniqueLock & guard) {
void
Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) {
assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock());
- _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
- doCommit(std::move(chunk));
+ if (chunk->getPacket().empty()) return;
+ std::promise<SerializedChunk> promise;
+ std::future<SerializedChunk> future = promise.get_future();
+ _executor.execute(makeLambdaTask([promise=std::move(promise), chunk = std::move(chunk),
+ encoding=_config.getEncoding(), compressionLevel=_config.getCompressionlevel()]() mutable {
+ promise.set_value(SerializedChunk(std::move(chunk), encoding, compressionLevel));
+ }));
+ _singleCommitter->execute( makeLambdaTask([this, future = std::move(future)]() mutable {
+ doCommit(future.get());
}));
}
void
-Domain::doCommit(std::unique_ptr<CommitChunk> chunk) {
- const Packet & packet = chunk->getPacket();
- if (packet.empty()) return;
+Domain::doCommit(const SerializedChunk & serialized) {
- SerializedChunk serialized(packet, _config.getEncoding(), _config.getCompressionlevel());
- DomainPart::SP dp = optionallyRotateFile(packet.range().from());
+ SerialNumRange range = serialized.range();
+ DomainPart::SP dp = optionallyRotateFile(range.from());
dp->commit(serialized);
if (_config.getFSyncOnCommit()) {
dp->sync();
}
cleanSessions();
- LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.",
- chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes());
+ LOG(debug, "Releasing unknown acks and %zu entries and %zu bytes.",
+ /*chunk->getNumCallBacks(),*/ serialized.getNumEntries(), serialized.getData().size());
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 2e912ad6201..eb3d0b6b10b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -64,7 +64,7 @@ private:
std::unique_ptr<CommitChunk> grabCurrentChunk(const UniqueLock & guard);
void commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard);
- void doCommit(std::unique_ptr<CommitChunk> chunk);
+ void doCommit(const SerializedChunk & serialized);
SerialNum begin(const UniqueLock & guard) const;
SerialNum end(const UniqueLock & guard) const;
size_t byteSize(const UniqueLock & guard) const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
index e3d98cd576d..99370d263ec 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
@@ -135,11 +135,13 @@ encode(vespalib::nbostream & os, const IChunk & chunk, Encoding encoding) {
chunk.getEntries().size(), os.size(), range.from(), range.to(), encoding.getRaw(), realEncoding.getRaw());
}
-SerializedChunk::SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel)
- : _os(),
- _range(packet.range()),
- _numEntries(packet.size())
+SerializedChunk::SerializedChunk(std::unique_ptr<CommitChunk> commitChunk, Encoding encoding, uint8_t compressionLevel)
+ : _commitChunk(std::move(commitChunk)),
+ _os(),
+ _range(_commitChunk->getPacket().range()),
+ _numEntries(_commitChunk->getPacket().size())
{
+ const Packet & packet = _commitChunk->getPacket();
nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size());
IChunk::UP chunk = IChunk::create(encoding, compressionLevel);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index dccfd6617f5..d1de1c9c893 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -38,11 +38,17 @@ std::ostream & operator << (std::ostream & os, Encoding e);
*/
class SerializedChunk {
public:
- SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel);
+ SerializedChunk(std::unique_ptr<CommitChunk> chunk, Encoding encoding, uint8_t compressionLevel);
+ SerializedChunk(SerializedChunk &&) = default;
+ SerializedChunk & operator=(SerializedChunk &&) = default;
+ SerializedChunk(const SerializedChunk &) = delete;
+ SerializedChunk & operator=(const SerializedChunk &) = delete;
vespalib::ConstBufferRef getData() const;
SerialNumRange range() const { return _range; }
size_t getNumEntries() const { return _numEntries; }
+ const CommitChunk & commitChunk() const { return *_commitChunk; }
private:
+ std::unique_ptr<CommitChunk> _commitChunk;
vespalib::nbostream _os;
SerialNumRange _range;
size_t _numEntries;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index ce190d2c093..db2cf2a255d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -578,9 +578,10 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
try {
vespalib::Gate gate;
{
+ auto onDone = make_shared<vespalib::GateCallback>(gate);
// Need to scope in order to drain out all the callbacks.
- domain->append(packet, make_shared<vespalib::GateCallback>(gate));
- auto keep = domain->startCommit(make_shared<vespalib::IgnoreCallback>());
+ domain->append(packet, onDone);
+ auto keep = domain->startCommit(onDone);
}
gate.await();
ret.AddInt32(0);