From 4345766f07ccf252c2aa1f6b62254296557c4263 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 11 Sep 2020 15:53:19 +0000 Subject: GC redundant executor and use lambda for simple stuff. --- .../src/vespa/searchlib/transactionlog/domain.cpp | 49 ++++++++-------------- .../src/vespa/searchlib/transactionlog/domain.h | 11 ++--- .../searchlib/transactionlog/translogserver.cpp | 17 ++++---- .../searchlib/transactionlog/translogserver.h | 3 +- 4 files changed, 29 insertions(+), 51 deletions(-) diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index aaf70964068..293e316f799 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -30,15 +30,12 @@ using std::make_shared; namespace search::transactionlog { -VESPA_THREAD_STACK_TAG(domain_commit_executor); - -Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, - Executor & sessionExecutor, const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) +Domain::Domain(const string &domainName, const string & baseDir, Executor & executor, + const DomainConfig & cfg, const FileHeaderContext &fileHeaderContext) : _config(cfg), _lastSerial(0), _singleCommiter(std::make_unique(1, 128*1024)), - _commitExecutor(commitExecutor), - _sessionExecutor(sessionExecutor), + _executor(executor), _sessionId(1), _syncMonitor(), _pendingSync(false), @@ -64,10 +61,10 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm const int64_t lastPart = partIdVector.empty() ? 0 : partIdVector.back(); for (const int64_t partId : partIdVector) { if ( partId != -1) { - _sessionExecutor.execute(makeTask(makeClosure(this, &Domain::addPart, partId, partId == lastPart))); + _executor.execute(makeTask(makeClosure(this, &Domain::addPart, partId, partId == lastPart))); } } - _sessionExecutor.sync(); + _executor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { _parts[lastPart] = std::make_shared(_name, dir(), lastPart, _config.getEncoding(), _config.getCompressionlevel(), _fileHeaderContext, false); @@ -76,6 +73,11 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm _lastSerial = end(); } +vespalib::Executor::Task::UP +Domain::execute(vespalib::Executor::Task::UP task) { + return _executor.execute(std::move(task)); +} + Domain & Domain::setConfig(const DomainConfig & cfg) { _config = cfg; @@ -101,27 +103,6 @@ void Domain::addPart(int64_t partId, bool isLastPart) { } } -class Sync : public vespalib::Executor::Task -{ -public: - Sync(Monitor &syncMonitor, const DomainPart::SP &dp, bool &pendingSync) : - _syncMonitor(syncMonitor), - _dp(dp), - _pendingSync(pendingSync) - { } -private: - void run() override { - _dp->sync(); - MonitorGuard guard(_syncMonitor); - _pendingSync = false; - guard.broadcast(); - } - - Monitor & _syncMonitor; - DomainPart::SP _dp; - bool & _pendingSync; -}; - Domain::~Domain() { } DomainInfo @@ -216,8 +197,12 @@ Domain::triggerSyncNow() MonitorGuard guard(_syncMonitor); if (!_pendingSync) { _pendingSync = true; - DomainPart::SP dp(_parts.rbegin()->second); - _commitExecutor.execute(std::make_unique(_syncMonitor, dp, _pendingSync)); + _executor.execute(makeLambdaTask([this, domainPart=_parts.rbegin()->second]() { + domainPart->sync(); + MonitorGuard monitorGuard(_syncMonitor); + _pendingSync = false; + monitorGuard.broadcast(); + })); } } @@ -383,7 +368,7 @@ Domain::startSession(int sessionId) int Domain::closeSession(int sessionId) { - _commitExecutor.sync(); + _executor.sync(); int retval(-1); DurationSeconds sessionRunTime(0); { diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 9c8b578a2c9..1e407188b11 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -18,8 +18,8 @@ public: using SP = std::shared_ptr; using Executor = vespalib::SyncableThreadExecutor; using DomainPartSP = std::shared_ptr; - Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, - Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); + Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & executor, + const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); ~Domain(); @@ -51,9 +51,7 @@ public: getDir(const vespalib::string & base, const vespalib::string & domain) { return base + "/" + domain; } - vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) { - return _sessionExecutor.execute(std::move(task)); - } + vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task); uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: @@ -76,8 +74,7 @@ private: DomainConfig _config; SerialNum _lastSerial; std::unique_ptr _singleCommiter; - Executor & _commitExecutor; - Executor & _sessionExecutor; + Executor & _executor; std::atomic _sessionId; vespalib::Monitor _syncMonitor; bool _pendingSync; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 93b65002492..42bf057d3cf 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -72,6 +72,8 @@ SyncHandler::PerformTask() } } +VESPA_THREAD_STACK_TAG(tls_executor); + } TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, @@ -92,8 +94,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con _name(name), _baseDir(baseDir), _domainConfig(cfg), - _commitExecutor(maxThreads, 128*1024), - _sessionExecutor(maxThreads, 128*1024), + _executor(maxThreads, 128 * 1024, tls_executor), _threadPool(std::make_unique(1024*120)), _transport(std::make_unique()), _supervisor(std::make_unique(_transport.get())), @@ -110,8 +111,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = make_shared(domainName, dir(), _commitExecutor, - _sessionExecutor, cfg, _fileHeaderContext); + auto domain = make_shared(domainName, dir(), _executor, cfg, _fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -147,10 +147,8 @@ TransLogServer::~TransLogServer() { stop(); join(); - _commitExecutor.shutdown(); - _commitExecutor.sync(); - _sessionExecutor.shutdown(); - _sessionExecutor.sync(); + _executor.shutdown(); + _executor.sync(); _transport->ShutDown(true); } @@ -449,8 +447,7 @@ TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = std::make_shared(domainName, dir(), _commitExecutor, - _sessionExecutor, _domainConfig, _fileHeaderContext); + domain = std::make_shared(domainName, dir(), _executor, _domainConfig, _fileHeaderContext); Guard domainGuard(_domainMutex); _domains[domain->name()] = domain; writeDomainDir(domainGuard, dir(), domainList(), _domains); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index c6443f33b55..ed5d475bb73 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -83,8 +83,7 @@ private: vespalib::string _name; vespalib::string _baseDir; DomainConfig _domainConfig; - vespalib::ThreadStackExecutor _commitExecutor; - vespalib::ThreadStackExecutor _sessionExecutor; + vespalib::ThreadStackExecutor _executor; std::unique_ptr _threadPool; std::unique_ptr _transport; std::unique_ptr _supervisor; -- cgit v1.2.3