diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-08 02:19:59 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-08 09:51:17 +0100 |
commit | 06670bd70b273e99574872eea1c6b523cf94cbd5 (patch) | |
tree | 7cb54d444c64b184bffd1629e8f1565352646aa9 /searchlib | |
parent | aff5ea6fe2fe923f720b08d2bf8a5fc4ed35f13c (diff) |
Stop using fnet scheduler for handling of tls domainSync rpc.
Diffstat (limited to 'searchlib')
4 files changed, 35 insertions, 20 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 9f1e6bde06b..8dcca2c7b89 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -45,6 +45,7 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec _sessionId(1), _syncMonitor(), _pendingSync(false), + _done_sync_tasks(), _name(domainName), _parts(), _lock(), @@ -206,13 +207,16 @@ Domain::getSynced() const void -Domain::triggerSyncNow() +Domain::triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task) { { std::unique_lock guard(_currentChunkMonitor); commitAndTransferResponses(guard); } std::unique_lock guard(_syncMonitor); + if (done_sync_task) { + _done_sync_tasks.push_back(std::move(done_sync_task)); + } if (!_pendingSync) { _pendingSync = true; _executor.execute(makeLambdaTask([this, domainPart= getActivePart()]() { @@ -220,6 +224,11 @@ Domain::triggerSyncNow() std::lock_guard monitorGuard(_syncMonitor); _pendingSync = false; _syncCond.notify_all(); + for (auto &task : _done_sync_tasks) { + auto failed_task = _executor.execute(std::move(task)); + assert(!failed_task); + } + _done_sync_tasks.clear(); })); } } @@ -316,7 +325,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) { DomainPart::SP dp = getActivePart(); if (dp->byteSize() > _config.getPartSizeLimit()) { waitPendingSync(_syncMonitor, _syncCond, _pendingSync); - triggerSyncNow(); + triggerSyncNow({}); waitPendingSync(_syncMonitor, _syncCond, _pendingSync); dp->close(); dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _config.getEncoding(), diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index c9eb6385b15..5a80758dd0b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -36,7 +36,7 @@ public: SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; - void triggerSyncNow(); + void triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -92,6 +92,7 @@ private: std::mutex _syncMonitor; std::condition_variable _syncCond; bool _pendingSync; + std::vector<std::unique_ptr<vespalib::Executor::Task>> _done_sync_tasks; vespalib::string _name; DomainPartList _parts; mutable std::mutex _lock; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 0c0c9186e12..376de530c6e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -6,6 +6,7 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fnet/task.h> @@ -28,24 +29,25 @@ namespace search::transactionlog { namespace { -class SyncHandler : public FNET_Task +class SyncHandler : public std::enable_shared_from_this<SyncHandler> { + std::atomic<bool> & _closed; FRT_RPCRequest & _req; Domain::SP _domain; TransLogServer::Session::SP _session; SerialNum _syncTo; public: - SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, - const TransLogServer::Session::SP &session, SerialNum syncTo); + SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo) noexcept; - ~SyncHandler() override; - void PerformTask() override; + ~SyncHandler(); + void poll(); }; -SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, - const TransLogServer::Session::SP &session, SerialNum syncTo) - : FNET_Task(supervisor->GetScheduler()), +SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo) noexcept + : _closed(closed), _req(*req), _domain(domain), _session(session), @@ -56,20 +58,19 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const SyncHandler::~SyncHandler() = default; void -SyncHandler::PerformTask() +SyncHandler::poll() { SerialNum synced(_domain->getSynced()); if (_session->getDown() || _domain->getMarkedDeleted() || + _closed.load(std::memory_order_acquire) || synced >= _syncTo) { FRT_Values &rvals = *_req.GetReturn(); rvals.AddInt32(0); rvals.AddInt64(synced); _req.Return(); - delete this; } else { - _domain->triggerSyncNow(); - Schedule(0.05); // Retry in 0.05 seconds + _domain->triggerSyncNow(vespalib::makeLambdaTask([self = shared_from_this()]() { self->poll(); })); } } @@ -101,7 +102,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _domains(), _reqQ(), - _fileHeaderContext(fileHeaderContext) + _fileHeaderContext(fileHeaderContext), + _closed(false) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) == 0) { @@ -146,8 +148,10 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con TransLogServer::~TransLogServer() { + _closed = true; stop(); join(); + _executor.sync(); _executor.shutdown(); _executor.sync(); _transport->ShutDown(true); @@ -719,10 +723,9 @@ TransLogServer::domainSync(FRT_RPCRequest *req) req->Return(); return; } - - SyncHandler *syncHandler = new SyncHandler(_supervisor.get(), req, domain, session, syncTo); - - syncHandler->ScheduleNow(); + + auto syncHandler = std::make_shared<SyncHandler>(_closed, req, domain, session, syncTo); + syncHandler->poll(); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 37133615c1e..3c6efa20550 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -7,6 +7,7 @@ #include <vespa/document/util/queue.h> #include <vespa/fnet/frt/invokable.h> #include <shared_mutex> +#include <atomic> class FRT_Supervisor; class FNET_Transport; @@ -92,6 +93,7 @@ private: std::mutex _fileLock; // Protects the creating and deleting domains including file system operations. document::Queue<FRT_RPCRequest *> _reqQ; const common::FileHeaderContext &_fileHeaderContext; + std::atomic<bool> _closed; }; } |