diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-09 07:54:54 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-09 07:54:54 +0000 |
commit | d26ed36e397a79c1b809f439ab82e06fb95d504c (patch) | |
tree | 686bb6abc3f39ca5251189d950dbec5c2ece6995 | |
parent | 9d770f5feaba1d205485c1bdde1d834a1a835332 (diff) |
Use a thread that can schedule with higher frequency than the max frequency of 33hz that the fnet scheduler can do.
4 files changed, 33 insertions, 36 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 624c85c9cc0..45afc0c3ee3 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -365,28 +365,31 @@ Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { return chunk; } -void +bool Domain::commitIfStale() { vespalib::MonitorGuard guard(_currentChunkMonitor); - commitIfStale(guard); + return commitIfStale(guard); } -void +bool Domain::commitIfStale(const vespalib::MonitorGuard & guard) { assert(guard.monitors(_currentChunkMonitor)); if ((_currentChunk->age() > _config.getChunkAgeLimit()) && ! _currentChunk->getPacket().empty()) { - commitChunk(grabCurrentChunk(guard), guard); + return commitChunk(grabCurrentChunk(guard), guard); } + return false; } -void +bool Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { assert(chunkOrderGuard.monitors(_currentChunkMonitor)); if ( ! chunk->getPacket().empty()) { _singleCommiter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { doCommit(std::move(chunk)); })); + return true; } + return false; } void diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 3688901d23f..5172f0508dc 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -34,7 +34,7 @@ public: SerialNum end() const; SerialNum getSynced() const; void triggerSyncNow(); - void commitIfStale(); + bool commitIfStale(); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -57,7 +57,7 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: - void commitIfStale(const vespalib::MonitorGuard & guard); + bool commitIfStale(const vespalib::MonitorGuard & guard); void commitIfFull(const vespalib::MonitorGuard & guard); class Chunk { public: @@ -75,7 +75,7 @@ private: }; std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); - void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + bool commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); void doCommit(std::unique_ptr<Chunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ec70f32694a..15ce2f537ea 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -72,26 +72,6 @@ SyncHandler::PerformTask() } } -class StaleCommitTask final : public FNET_Task { -public: - StaleCommitTask(FNET_Scheduler * sceduler, TransLogServer & server, double seconds2Wait) - : FNET_Task(sceduler), - _server(server), - _seconds2Wait(seconds2Wait) - {} - ~StaleCommitTask() override = default; - - void PerformTask() override { - if (_server.running()) { - _server.commitIfStale(); - Schedule(_seconds2Wait); - } - } -private: - TransLogServer & _server; - double _seconds2Wait; -}; - } TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, @@ -117,6 +97,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con _threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), + _staleCommitThread(), _domains(), _reqQ(), _fileHeaderContext(fileHeaderContext) @@ -161,16 +142,19 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } start(*_threadPool); - double chunkAgeLimit = vespalib::to_s(_domainConfig.getChunkAgeLimit()); - _staleCommitTask = std::make_unique<StaleCommitTask>(_supervisor->GetScheduler(), *this, chunkAgeLimit); - _staleCommitTask->ScheduleNow(); + _staleCommitThread = std::make_unique<std::thread>([this]() { + while (running()) { + std::this_thread::sleep_for(getChunkAgeLimit()); + commitIfStale(); + } + }); } TransLogServer::~TransLogServer() { - _staleCommitTask->Kill(); stop(); join(); + _staleCommitThread->join(); _commitExecutor.shutdown(); _commitExecutor.sync(); _sessionExecutor.shutdown(); @@ -228,6 +212,12 @@ TransLogServer::run() LOG(info, "TLS Stopped"); } +vespalib::duration +TransLogServer::getChunkAgeLimit() const +{ + Guard domainGuard(_domainMutex); + return _domainConfig.getChunkAgeLimit(); +} TransLogServer & TransLogServer::setDomainConfig(const DomainConfig & cfg) { @@ -239,12 +229,14 @@ TransLogServer::setDomainConfig(const DomainConfig & cfg) { return *this; } -void +bool TransLogServer::commitIfStale() { MonitorGuard domainMonitor(_domainMutex); + bool committedAnything(false); for (const auto &domain : _domains) { - domain.second->commitIfStale(); + committedAnything = committedAnything || domain.second->commitIfStale(); } + return committedAnything; } DomainStats diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 8e0e8041385..8d5d5aa892f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -12,6 +12,7 @@ class FRT_Supervisor; class FNET_Transport; class FNET_Task; +namespace std {class thread; } namespace search::common { class FileHeaderContext; } namespace search::transactionlog { @@ -32,9 +33,10 @@ public: const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; - void commitIfStale(); + bool commitIfStale(); void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; TransLogServer & setDomainConfig(const DomainConfig & cfg); + vespalib::duration getChunkAgeLimit() const; class Session { @@ -87,7 +89,7 @@ private: std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; - std::unique_ptr<FNET_Task> _staleCommitTask; + std::unique_ptr<std::thread> _staleCommitThread; DomainList _domains; mutable std::mutex _domainMutex; // Protects _domains std::condition_variable _domainCondition; |