diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-31 00:19:10 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-06 11:13:09 +0000 |
commit | 76ec6e75656dd944ea61de46dbfd5d1173f3bb81 (patch) | |
tree | af58ebb5bab8b956a6e1805af14288b1db472998 | |
parent | 466bc196eee4571e2197624f17b8a7d8aee38cf0 (diff) |
Group commits to TLS and sync (resurrected).
3 files changed, 120 insertions, 6 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 6b47393336a..4df5727c7e9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -40,6 +40,7 @@ DomainConfig::DomainConfig() Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), + _currentChunk(std::make_unique<Chunk>()), _lastSerial(0), _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)), _commitExecutor(commitExecutor), @@ -79,6 +80,8 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm vespalib::File::sync(dir()); } _lastSerial = end(); + _self = _threadPool.NewThread(this); + assert(_self); } Domain & @@ -87,6 +90,16 @@ Domain::setConfig(const DomainConfig & cfg) { return *this; } +void +Domain::Run(FastOS_ThreadInterface *thisThread, void *) { + + while (!thisThread->GetBreakFlag()) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + guard.wait(_config.getChunkAgeLimit()); + commitIfStale(guard); + } +} + void Domain::addPart(int64_t partId, bool isLastPart) { auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), _config.getCompressionlevel(), _fileHeaderContext, isLastPart); @@ -127,7 +140,17 @@ private: bool & _pendingSync; }; -Domain::~Domain() { } +Domain::~Domain() { + if (_self) { + _self->SetBreakFlag(); + { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + } + _self->Join(); + } + _singleCommiter->shutdown().sync(); +} DomainInfo Domain::getDomainInfo() const @@ -308,10 +331,79 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } +Domain::Chunk::Chunk() + : _data(size_t(-1)), + _callBacks(), + _firstArrivalTime() +{} + +Domain::Chunk::~Chunk() = default; + void -Domain::commit(const Packet & packet, Writer::DoneCallback onDone) -{ - (void) onDone; +Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) { + if (_callBacks.empty()) { + _firstArrivalTime = vespalib::steady_clock::now(); + } + _data.merge(packet); + _callBacks.emplace_back(std::move(onDone)); +} + +vespalib::duration +Domain::Chunk::age() const { + if (_callBacks.empty()) { + return 0ms; + } + return (vespalib::steady_clock::now() - _firstArrivalTime); +} + +void +Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + if (! (_lastSerial < packet.range().from())) { + throw runtime_error(make_string("Incomming serial number(%ld) must be bigger than the last one (%ld).", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } + _currentChunk->add(packet, std::move(onDone)); + commitIfFull(guard); +} + +void +Domain::commitIfFull(const vespalib::MonitorGuard &guard) { + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { + auto completed = grabCurrentChunk(guard); + if (completed) { + commitChunk(std::move(completed), guard); + } + } +} + +std::unique_ptr<Domain::Chunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = std::make_unique<Chunk>(); + return chunk; +} + +void +Domain::commitIfStale(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + if (_currentChunk->age() > _config.getChunkAgeLimit()) { + commitChunk(grabCurrentChunk(guard), guard); + } +} + +void +Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + _singleCommiter->execute(vespalib::makeLambdaTask([this, chunk = std::move(chunk)] () mutable { doCommit(std::move(chunk)); } )); +} + +void +Domain::doCommit(std::unique_ptr<Chunk> chunk) { + const Packet & packet = chunk->getPacket(); DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index a6c6dad5fe8..ad9e10ba426 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -60,7 +60,7 @@ struct DomainInfo { typedef std::map<vespalib::string, DomainInfo> DomainStats; -class Domain +class Domain final : public FastOS_Runnable { public: using SP = std::shared_ptr<Domain>; @@ -68,7 +68,7 @@ public: Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); - ~Domain(); + ~Domain() override; DomainInfo getDomainInfo() const; const vespalib::string & name() const { return _name; } @@ -103,6 +103,26 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: + void Run(FastOS_ThreadInterface *thisThread, void *arguments) override; + void commitIfStale(const vespalib::MonitorGuard & guard); + void commitIfFull(const vespalib::MonitorGuard & guard); + class Chunk { + public: + Chunk(); + ~Chunk(); + void add(const Packet & packet, Writer::DoneCallback onDone); + size_t sizeBytes() const { return _data.sizeBytes(); } + const Packet & getPacket() const { return _data; } + vespalib::duration age() const; + private: + Packet _data; + std::vector<Writer::DoneCallback> _callBacks; + vespalib::steady_time _firstArrivalTime; + }; + + std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + void doCommit(std::unique_ptr<Chunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; @@ -120,6 +140,7 @@ private: using DurationSeconds = std::chrono::duration<double>; DomainConfig _config; + std::unique_ptr<Chunk> _currentChunk; SerialNum _lastSerial; std::unique_ptr<Executor> _singleCommiter; Executor & _commitExecutor; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 96e75c35a81..416cdb566ce 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -427,6 +427,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) _packets.insert(std::make_pair(firstSerial, std::move(packet))); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } + sync(); } void DomainPart::sync() |